4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
49 # pylint has a bug here, doesn't see this import
50 import ganeti.http.client # pylint: disable-msg=W0611
53 # Timeout for connecting to nodes (seconds)
54 _RPC_CONNECT_TIMEOUT = 5
56 _RPC_CLIENT_HEADERS = [
57 "Content-type: %s" % http.HTTP_APP_JSON,
61 # Various time constants for the timeout table
62 _TMO_URGENT = 60 # one minute
63 _TMO_FAST = 5 * 60 # five minutes
64 _TMO_NORMAL = 15 * 60 # 15 minutes
65 _TMO_SLOW = 3600 # one hour
69 # Timeout table that will be built later by decorators
70 # Guidelines for choosing timeouts:
71 # - call used during watcher: timeout -> 1min, _TMO_URGENT
72 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73 # - other calls: 15 min, _TMO_NORMAL
74 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
81 """Initializes the module-global HTTP client manager.
83 Must be called before using any RPC function and while exactly one thread is
87 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88 # one thread running. This check is just a safety measure -- it doesn't
90 assert threading.activeCount() == 1, \
91 "Found more than one active thread when initializing pycURL"
93 logging.info("Using PycURL %s", pycurl.version)
95 pycurl.global_init(pycurl.GLOBAL_ALL)
99 """Stops the module-global HTTP client manager.
101 Must be called before quitting the program and while exactly one thread is
105 pycurl.global_cleanup()
108 def _ConfigRpcCurl(curl):
109 noded_cert = str(constants.NODED_CERT_FILE)
111 curl.setopt(pycurl.FOLLOWLOCATION, False)
112 curl.setopt(pycurl.CAINFO, noded_cert)
113 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114 curl.setopt(pycurl.SSL_VERIFYPEER, True)
115 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116 curl.setopt(pycurl.SSLCERT, noded_cert)
117 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118 curl.setopt(pycurl.SSLKEY, noded_cert)
119 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
122 # Aliasing this module avoids the following warning by epydoc: "Warning: No
123 # information available for ganeti.rpc._RpcThreadLocal's base threading.local"
124 _threading = threading
127 class _RpcThreadLocal(_threading.local):
128 def GetHttpClientPool(self):
129 """Returns a per-thread HTTP client pool.
131 @rtype: L{http.client.HttpClientPool}
136 except AttributeError:
137 pool = http.client.HttpClientPool(_ConfigRpcCurl)
143 # Remove module alias (see above)
147 _thread_local = _RpcThreadLocal()
150 def _RpcTimeout(secs):
151 """Timeout decorator.
153 When applied to a rpc call_* function, it updates the global timeout
154 table with the given function/timeout.
159 assert name.startswith("call_")
160 _TIMEOUTS[name[len("call_"):]] = secs
166 """RPC-wrapper decorator.
168 When applied to a function, it runs it with the RPC system
169 initialized, and it shutsdown the system afterwards. This means the
170 function must be called without RPC being initialized.
173 def wrapper(*args, **kwargs):
176 return fn(*args, **kwargs)
182 class RpcResult(object):
185 This class holds an RPC result. It is needed since in multi-node
186 calls we can't raise an exception just because one one out of many
187 failed, and therefore we use this class to encapsulate the result.
189 @ivar data: the data payload, for successful results, or None
190 @ivar call: the name of the RPC call
191 @ivar node: the name of the node to which we made the call
192 @ivar offline: whether the operation failed because the node was
193 offline, as opposed to actual failure; offline=True will always
194 imply failed=True, in order to allow simpler checking if
195 the user doesn't care about the exact failure mode
196 @ivar fail_msg: the error message if the call failed
199 def __init__(self, data=None, failed=False, offline=False,
200 call=None, node=None):
201 self.offline = offline
206 self.fail_msg = "Node is marked offline"
207 self.data = self.payload = None
209 self.fail_msg = self._EnsureErr(data)
210 self.data = self.payload = None
213 if not isinstance(self.data, (tuple, list)):
214 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
218 self.fail_msg = ("RPC layer error: invalid result length (%d), "
219 "expected 2" % len(self.data))
221 elif not self.data[0]:
222 self.fail_msg = self._EnsureErr(self.data[1])
227 self.payload = data[1]
229 for attr_name in ["call", "data", "fail_msg",
230 "node", "offline", "payload"]:
231 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
235 """Helper to ensure we return a 'True' value for error."""
239 return "No error information"
241 def Raise(self, msg, prereq=False, ecode=None):
242 """If the result has failed, raise an OpExecError.
244 This is used so that LU code doesn't have to check for each
245 result, but instead can call this function.
248 if not self.fail_msg:
251 if not msg: # one could pass None for default message
252 msg = ("Call '%s' to node '%s' has failed: %s" %
253 (self.call, self.node, self.fail_msg))
255 msg = "%s: %s" % (msg, self.fail_msg)
257 ec = errors.OpPrereqError
259 ec = errors.OpExecError
260 if ecode is not None:
264 raise ec(*args) # pylint: disable-msg=W0142
267 def _AddressLookup(node_list,
268 ssc=ssconf.SimpleStore,
269 nslookup_fn=netutils.Hostname.GetIP):
270 """Return addresses for given node names.
272 @type node_list: list
273 @param node_list: List of node names
275 @param ssc: SimpleStore class that is used to obtain node->ip mappings
276 @type nslookup_fn: callable
277 @param nslookup_fn: function use to do NS lookup
278 @rtype: list of addresses and/or None's
279 @returns: List of corresponding addresses, if found
283 iplist = ss.GetNodePrimaryIPList()
284 family = ss.GetPrimaryIPFamily()
286 ipmap = dict(entry.split() for entry in iplist)
287 for node in node_list:
288 address = ipmap.get(node)
290 address = nslookup_fn(node, family=family)
291 addresses.append(address)
299 This class, given a (remote) method name, a list of parameters and a
300 list of nodes, will contact (in parallel) all nodes, and return a
301 dict of results (key: node name, value: result).
303 One current bug is that generic failure is still signaled by
304 'False' result, which is not good. This overloading of values can
308 def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
309 assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
311 self.procedure = procedure
315 self._address_lookup_fn = address_lookup_fn
317 def ConnectList(self, node_list, address_list=None, read_timeout=None):
318 """Add a list of nodes to the target nodes.
320 @type node_list: list
321 @param node_list: the list of node names to connect
322 @type address_list: list or None
323 @keyword address_list: either None or a list with node addresses,
324 which must have the same length as the node list
325 @type read_timeout: int
326 @param read_timeout: overwrites default timeout for operation
329 if address_list is None:
330 # Always use IP address instead of node name
331 address_list = self._address_lookup_fn(node_list)
333 assert len(node_list) == len(address_list), \
334 "Name and address lists must have the same length"
336 for node, address in zip(node_list, address_list):
337 self.ConnectNode(node, address, read_timeout=read_timeout)
339 def ConnectNode(self, name, address=None, read_timeout=None):
340 """Add a node to the target list.
343 @param name: the node name
345 @param address: the node address, if known
346 @type read_timeout: int
347 @param read_timeout: overwrites default timeout for operation
351 # Always use IP address instead of node name
352 address = self._address_lookup_fn([name])[0]
354 assert(address is not None)
356 if read_timeout is None:
357 read_timeout = _TIMEOUTS[self.procedure]
359 self._request[name] = \
360 http.client.HttpClientRequest(str(address), self.port,
361 http.HTTP_PUT, str("/%s" % self.procedure),
362 headers=_RPC_CLIENT_HEADERS,
363 post_data=str(self.body),
364 read_timeout=read_timeout)
366 def GetResults(self, http_pool=None):
367 """Call nodes and return results.
370 @return: List of RPC results
374 http_pool = _thread_local.GetHttpClientPool()
376 http_pool.ProcessRequests(self._request.values())
380 for name, req in self._request.iteritems():
381 if req.success and req.resp_status_code == http.HTTP_OK:
382 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
383 node=name, call=self.procedure)
386 # TODO: Better error reporting
392 logging.error("RPC error in %s from node %s: %s",
393 self.procedure, name, msg)
394 results[name] = RpcResult(data=msg, failed=True, node=name,
400 def _EncodeImportExportIO(ieio, ieioargs):
401 """Encodes import/export I/O information.
404 if ieio == constants.IEIO_RAW_DISK:
405 assert len(ieioargs) == 1
406 return (ieioargs[0].ToDict(), )
408 if ieio == constants.IEIO_SCRIPT:
409 assert len(ieioargs) == 2
410 return (ieioargs[0].ToDict(), ieioargs[1])
415 class RpcRunner(object):
416 """RPC runner class"""
418 def __init__(self, cfg):
419 """Initialized the rpc runner.
421 @type cfg: C{config.ConfigWriter}
422 @param cfg: the configuration object that will be used to get data
427 self.port = netutils.GetDaemonPort(constants.NODED)
429 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
430 """Convert the given instance to a dict.
432 This is done via the instance's ToDict() method and additionally
433 we fill the hvparams with the cluster defaults.
435 @type instance: L{objects.Instance}
436 @param instance: an Instance object
437 @type hvp: dict or None
438 @param hvp: a dictionary with overridden hypervisor parameters
439 @type bep: dict or None
440 @param bep: a dictionary with overridden backend parameters
441 @type osp: dict or None
442 @param osp: a dictionary with overridden os parameters
444 @return: the instance dict, with the hvparams filled with the
448 idict = instance.ToDict()
449 cluster = self._cfg.GetClusterInfo()
450 idict["hvparams"] = cluster.FillHV(instance)
452 idict["hvparams"].update(hvp)
453 idict["beparams"] = cluster.FillBE(instance)
455 idict["beparams"].update(bep)
456 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
458 idict["osparams"].update(osp)
459 for nic in idict["nics"]:
460 nic['nicparams'] = objects.FillDict(
461 cluster.nicparams[constants.PP_DEFAULT],
465 def _ConnectList(self, client, node_list, call, read_timeout=None):
466 """Helper for computing node addresses.
468 @type client: L{ganeti.rpc.Client}
469 @param client: a C{Client} instance
470 @type node_list: list
471 @param node_list: the node list we should connect
473 @param call: the name of the remote procedure call, for filling in
474 correctly any eventual offline nodes' results
475 @type read_timeout: int
476 @param read_timeout: overwrites the default read timeout for the
480 all_nodes = self._cfg.GetAllNodesInfo()
484 for node in node_list:
485 if node in all_nodes:
486 if all_nodes[node].offline:
487 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
489 val = all_nodes[node].primary_ip
492 addr_list.append(val)
493 name_list.append(node)
495 client.ConnectList(name_list, address_list=addr_list,
496 read_timeout=read_timeout)
499 def _ConnectNode(self, client, node, call, read_timeout=None):
500 """Helper for computing one node's address.
502 @type client: L{ganeti.rpc.Client}
503 @param client: a C{Client} instance
505 @param node: the node we should connect
507 @param call: the name of the remote procedure call, for filling in
508 correctly any eventual offline nodes' results
509 @type read_timeout: int
510 @param read_timeout: overwrites the default read timeout for the
514 node_info = self._cfg.GetNodeInfo(node)
515 if node_info is not None:
516 if node_info.offline:
517 return RpcResult(node=node, offline=True, call=call)
518 addr = node_info.primary_ip
521 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
523 def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
524 """Helper for making a multi-node call
527 body = serializer.DumpJson(args, indent=False)
528 c = Client(procedure, body, self.port)
529 skip_dict = self._ConnectList(c, node_list, procedure,
530 read_timeout=read_timeout)
531 skip_dict.update(c.GetResults())
535 def _StaticMultiNodeCall(cls, node_list, procedure, args,
536 address_list=None, read_timeout=None):
537 """Helper for making a multi-node static call
540 body = serializer.DumpJson(args, indent=False)
541 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
542 c.ConnectList(node_list, address_list=address_list,
543 read_timeout=read_timeout)
544 return c.GetResults()
546 def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
547 """Helper for making a single-node call
550 body = serializer.DumpJson(args, indent=False)
551 c = Client(procedure, body, self.port)
552 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
554 # we did connect, node is not offline
555 result = c.GetResults()[node]
559 def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
560 """Helper for making a single-node static call
563 body = serializer.DumpJson(args, indent=False)
564 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
565 c.ConnectNode(node, read_timeout=read_timeout)
566 return c.GetResults()[node]
570 """Compresses a string for transport over RPC.
572 Small amounts of data are not compressed.
577 @return: Encoded data to send
580 # Small amounts of data are not compressed
582 return (constants.RPC_ENCODING_NONE, data)
584 # Compress with zlib and encode in base64
585 return (constants.RPC_ENCODING_ZLIB_BASE64,
586 base64.b64encode(zlib.compress(data, 3)))
592 @_RpcTimeout(_TMO_URGENT)
593 def call_bdev_sizes(self, node_list, devices):
594 """Gets the sizes of requested block devices present on a node
596 This is a multi-node call.
599 return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
601 @_RpcTimeout(_TMO_URGENT)
602 def call_lv_list(self, node_list, vg_name):
603 """Gets the logical volumes present in a given volume group.
605 This is a multi-node call.
608 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
610 @_RpcTimeout(_TMO_URGENT)
611 def call_vg_list(self, node_list):
612 """Gets the volume group list.
614 This is a multi-node call.
617 return self._MultiNodeCall(node_list, "vg_list", [])
619 @_RpcTimeout(_TMO_NORMAL)
620 def call_storage_list(self, node_list, su_name, su_args, name, fields):
621 """Get list of storage units.
623 This is a multi-node call.
626 return self._MultiNodeCall(node_list, "storage_list",
627 [su_name, su_args, name, fields])
629 @_RpcTimeout(_TMO_NORMAL)
630 def call_storage_modify(self, node, su_name, su_args, name, changes):
631 """Modify a storage unit.
633 This is a single-node call.
636 return self._SingleNodeCall(node, "storage_modify",
637 [su_name, su_args, name, changes])
639 @_RpcTimeout(_TMO_NORMAL)
640 def call_storage_execute(self, node, su_name, su_args, name, op):
641 """Executes an operation on a storage unit.
643 This is a single-node call.
646 return self._SingleNodeCall(node, "storage_execute",
647 [su_name, su_args, name, op])
649 @_RpcTimeout(_TMO_URGENT)
650 def call_bridges_exist(self, node, bridges_list):
651 """Checks if a node has all the bridges given.
653 This method checks if all bridges given in the bridges_list are
654 present on the remote node, so that an instance that uses interfaces
655 on those bridges can be started.
657 This is a single-node call.
660 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
662 @_RpcTimeout(_TMO_NORMAL)
663 def call_instance_start(self, node, instance, hvp, bep):
664 """Starts an instance.
666 This is a single-node call.
669 idict = self._InstDict(instance, hvp=hvp, bep=bep)
670 return self._SingleNodeCall(node, "instance_start", [idict])
672 @_RpcTimeout(_TMO_NORMAL)
673 def call_instance_shutdown(self, node, instance, timeout):
674 """Stops an instance.
676 This is a single-node call.
679 return self._SingleNodeCall(node, "instance_shutdown",
680 [self._InstDict(instance), timeout])
682 @_RpcTimeout(_TMO_NORMAL)
683 def call_migration_info(self, node, instance):
684 """Gather the information necessary to prepare an instance migration.
686 This is a single-node call.
689 @param node: the node on which the instance is currently running
690 @type instance: C{objects.Instance}
691 @param instance: the instance definition
694 return self._SingleNodeCall(node, "migration_info",
695 [self._InstDict(instance)])
697 @_RpcTimeout(_TMO_NORMAL)
698 def call_accept_instance(self, node, instance, info, target):
699 """Prepare a node to accept an instance.
701 This is a single-node call.
704 @param node: the target node for the migration
705 @type instance: C{objects.Instance}
706 @param instance: the instance definition
707 @type info: opaque/hypervisor specific (string/data)
708 @param info: result for the call_migration_info call
710 @param target: target hostname (usually ip address) (on the node itself)
713 return self._SingleNodeCall(node, "accept_instance",
714 [self._InstDict(instance), info, target])
716 @_RpcTimeout(_TMO_NORMAL)
717 def call_finalize_migration(self, node, instance, info, success):
718 """Finalize any target-node migration specific operation.
720 This is called both in case of a successful migration and in case of error
721 (in which case it should abort the migration).
723 This is a single-node call.
726 @param node: the target node for the migration
727 @type instance: C{objects.Instance}
728 @param instance: the instance definition
729 @type info: opaque/hypervisor specific (string/data)
730 @param info: result for the call_migration_info call
731 @type success: boolean
732 @param success: whether the migration was a success or a failure
735 return self._SingleNodeCall(node, "finalize_migration",
736 [self._InstDict(instance), info, success])
738 @_RpcTimeout(_TMO_SLOW)
739 def call_instance_migrate(self, node, instance, target, live):
740 """Migrate an instance.
742 This is a single-node call.
745 @param node: the node on which the instance is currently running
746 @type instance: C{objects.Instance}
747 @param instance: the instance definition
749 @param target: the target node name
751 @param live: whether the migration should be done live or not (the
752 interpretation of this parameter is left to the hypervisor)
755 return self._SingleNodeCall(node, "instance_migrate",
756 [self._InstDict(instance), target, live])
758 @_RpcTimeout(_TMO_NORMAL)
759 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
760 """Reboots an instance.
762 This is a single-node call.
765 return self._SingleNodeCall(node, "instance_reboot",
766 [self._InstDict(inst), reboot_type,
769 @_RpcTimeout(_TMO_1DAY)
770 def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
771 """Installs an OS on the given instance.
773 This is a single-node call.
776 return self._SingleNodeCall(node, "instance_os_add",
777 [self._InstDict(inst, osp=osparams),
780 @_RpcTimeout(_TMO_SLOW)
781 def call_instance_run_rename(self, node, inst, old_name, debug):
782 """Run the OS rename script for an instance.
784 This is a single-node call.
787 return self._SingleNodeCall(node, "instance_run_rename",
788 [self._InstDict(inst), old_name, debug])
790 @_RpcTimeout(_TMO_URGENT)
791 def call_instance_info(self, node, instance, hname):
792 """Returns information about a single instance.
794 This is a single-node call.
797 @param node: the list of nodes to query
798 @type instance: string
799 @param instance: the instance name
801 @param hname: the hypervisor type of the instance
804 return self._SingleNodeCall(node, "instance_info", [instance, hname])
806 @_RpcTimeout(_TMO_NORMAL)
807 def call_instance_migratable(self, node, instance):
808 """Checks whether the given instance can be migrated.
810 This is a single-node call.
812 @param node: the node to query
813 @type instance: L{objects.Instance}
814 @param instance: the instance to check
818 return self._SingleNodeCall(node, "instance_migratable",
819 [self._InstDict(instance)])
821 @_RpcTimeout(_TMO_URGENT)
822 def call_all_instances_info(self, node_list, hypervisor_list):
823 """Returns information about all instances on the given nodes.
825 This is a multi-node call.
827 @type node_list: list
828 @param node_list: the list of nodes to query
829 @type hypervisor_list: list
830 @param hypervisor_list: the hypervisors to query for instances
833 return self._MultiNodeCall(node_list, "all_instances_info",
836 @_RpcTimeout(_TMO_URGENT)
837 def call_instance_list(self, node_list, hypervisor_list):
838 """Returns the list of running instances on a given node.
840 This is a multi-node call.
842 @type node_list: list
843 @param node_list: the list of nodes to query
844 @type hypervisor_list: list
845 @param hypervisor_list: the hypervisors to query for instances
848 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
850 @_RpcTimeout(_TMO_FAST)
851 def call_node_tcp_ping(self, node, source, target, port, timeout,
853 """Do a TcpPing on the remote node
855 This is a single-node call.
858 return self._SingleNodeCall(node, "node_tcp_ping",
859 [source, target, port, timeout,
862 @_RpcTimeout(_TMO_FAST)
863 def call_node_has_ip_address(self, node, address):
864 """Checks if a node has the given IP address.
866 This is a single-node call.
869 return self._SingleNodeCall(node, "node_has_ip_address", [address])
871 @_RpcTimeout(_TMO_URGENT)
872 def call_node_info(self, node_list, vg_name, hypervisor_type):
873 """Return node information.
875 This will return memory information and volume group size and free
878 This is a multi-node call.
880 @type node_list: list
881 @param node_list: the list of nodes to query
882 @type vg_name: C{string}
883 @param vg_name: the name of the volume group to ask for disk space
885 @type hypervisor_type: C{str}
886 @param hypervisor_type: the name of the hypervisor to ask for
890 return self._MultiNodeCall(node_list, "node_info",
891 [vg_name, hypervisor_type])
893 @_RpcTimeout(_TMO_NORMAL)
894 def call_etc_hosts_modify(self, node, mode, name, ip):
895 """Modify hosts file with name
898 @param node: The node to call
900 @param mode: The mode to operate. Currently "add" or "remove"
902 @param name: The host name to be modified
904 @param ip: The ip of the entry (just valid if mode is "add")
907 return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
909 @_RpcTimeout(_TMO_NORMAL)
910 def call_node_verify(self, node_list, checkdict, cluster_name):
911 """Request verification of given parameters.
913 This is a multi-node call.
916 return self._MultiNodeCall(node_list, "node_verify",
917 [checkdict, cluster_name])
920 @_RpcTimeout(_TMO_FAST)
921 def call_node_start_master(cls, node, start_daemons, no_voting):
922 """Tells a node to activate itself as a master.
924 This is a single-node call.
927 return cls._StaticSingleNodeCall(node, "node_start_master",
928 [start_daemons, no_voting])
931 @_RpcTimeout(_TMO_FAST)
932 def call_node_stop_master(cls, node, stop_daemons):
933 """Tells a node to demote itself from master status.
935 This is a single-node call.
938 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
941 @_RpcTimeout(_TMO_URGENT)
942 def call_master_info(cls, node_list):
943 """Query master info.
945 This is a multi-node call.
948 # TODO: should this method query down nodes?
949 return cls._StaticMultiNodeCall(node_list, "master_info", [])
952 @_RpcTimeout(_TMO_URGENT)
953 def call_version(cls, node_list):
954 """Query node version.
956 This is a multi-node call.
959 return cls._StaticMultiNodeCall(node_list, "version", [])
961 @_RpcTimeout(_TMO_NORMAL)
962 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
963 """Request creation of a given block device.
965 This is a single-node call.
968 return self._SingleNodeCall(node, "blockdev_create",
969 [bdev.ToDict(), size, owner, on_primary, info])
971 @_RpcTimeout(_TMO_SLOW)
972 def call_blockdev_wipe(self, node, bdev, offset, size):
973 """Request wipe at given offset with given size of a block device.
975 This is a single-node call.
978 return self._SingleNodeCall(node, "blockdev_wipe",
979 [bdev.ToDict(), offset, size])
981 @_RpcTimeout(_TMO_NORMAL)
982 def call_blockdev_remove(self, node, bdev):
983 """Request removal of a given block device.
985 This is a single-node call.
988 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
990 @_RpcTimeout(_TMO_NORMAL)
991 def call_blockdev_rename(self, node, devlist):
992 """Request rename of the given block devices.
994 This is a single-node call.
997 return self._SingleNodeCall(node, "blockdev_rename",
998 [(d.ToDict(), uid) for d, uid in devlist])
1000 @_RpcTimeout(_TMO_NORMAL)
1001 def call_blockdev_pause_resume_sync(self, node, disks, pause):
1002 """Request a pause/resume of given block device.
1004 This is a single-node call.
1007 return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1008 [[bdev.ToDict() for bdev in disks], pause])
1010 @_RpcTimeout(_TMO_NORMAL)
1011 def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1012 """Request assembling of a given block device.
1014 This is a single-node call.
1017 return self._SingleNodeCall(node, "blockdev_assemble",
1018 [disk.ToDict(), owner, on_primary, idx])
1020 @_RpcTimeout(_TMO_NORMAL)
1021 def call_blockdev_shutdown(self, node, disk):
1022 """Request shutdown of a given block device.
1024 This is a single-node call.
1027 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1029 @_RpcTimeout(_TMO_NORMAL)
1030 def call_blockdev_addchildren(self, node, bdev, ndevs):
1031 """Request adding a list of children to a (mirroring) device.
1033 This is a single-node call.
1036 return self._SingleNodeCall(node, "blockdev_addchildren",
1038 [disk.ToDict() for disk in ndevs]])
1040 @_RpcTimeout(_TMO_NORMAL)
1041 def call_blockdev_removechildren(self, node, bdev, ndevs):
1042 """Request removing a list of children from a (mirroring) device.
1044 This is a single-node call.
1047 return self._SingleNodeCall(node, "blockdev_removechildren",
1049 [disk.ToDict() for disk in ndevs]])
1051 @_RpcTimeout(_TMO_NORMAL)
1052 def call_blockdev_getmirrorstatus(self, node, disks):
1053 """Request status of a (mirroring) device.
1055 This is a single-node call.
1058 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1059 [dsk.ToDict() for dsk in disks])
1060 if not result.fail_msg:
1061 result.payload = [objects.BlockDevStatus.FromDict(i)
1062 for i in result.payload]
1065 @_RpcTimeout(_TMO_NORMAL)
1066 def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1067 """Request status of (mirroring) devices from multiple nodes.
1069 This is a multi-node call.
1072 result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1073 [dict((name, [dsk.ToDict() for dsk in disks])
1074 for name, disks in node_disks.items())])
1075 for nres in result.values():
1079 for idx, (success, status) in enumerate(nres.payload):
1081 nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1085 @_RpcTimeout(_TMO_NORMAL)
1086 def call_blockdev_find(self, node, disk):
1087 """Request identification of a given block device.
1089 This is a single-node call.
1092 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1093 if not result.fail_msg and result.payload is not None:
1094 result.payload = objects.BlockDevStatus.FromDict(result.payload)
1097 @_RpcTimeout(_TMO_NORMAL)
1098 def call_blockdev_close(self, node, instance_name, disks):
1099 """Closes the given block devices.
1101 This is a single-node call.
1104 params = [instance_name, [cf.ToDict() for cf in disks]]
1105 return self._SingleNodeCall(node, "blockdev_close", params)
1107 @_RpcTimeout(_TMO_NORMAL)
1108 def call_blockdev_getsize(self, node, disks):
1109 """Returns the size of the given disks.
1111 This is a single-node call.
1114 params = [[cf.ToDict() for cf in disks]]
1115 return self._SingleNodeCall(node, "blockdev_getsize", params)
1117 @_RpcTimeout(_TMO_NORMAL)
1118 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1119 """Disconnects the network of the given drbd devices.
1121 This is a multi-node call.
1124 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1125 [nodes_ip, [cf.ToDict() for cf in disks]])
1127 @_RpcTimeout(_TMO_NORMAL)
1128 def call_drbd_attach_net(self, node_list, nodes_ip,
1129 disks, instance_name, multimaster):
1130 """Disconnects the given drbd devices.
1132 This is a multi-node call.
1135 return self._MultiNodeCall(node_list, "drbd_attach_net",
1136 [nodes_ip, [cf.ToDict() for cf in disks],
1137 instance_name, multimaster])
1139 @_RpcTimeout(_TMO_SLOW)
1140 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1141 """Waits for the synchronization of drbd devices is complete.
1143 This is a multi-node call.
1146 return self._MultiNodeCall(node_list, "drbd_wait_sync",
1147 [nodes_ip, [cf.ToDict() for cf in disks]])
1149 @_RpcTimeout(_TMO_URGENT)
1150 def call_drbd_helper(self, node_list):
1151 """Gets drbd helper.
1153 This is a multi-node call.
1156 return self._MultiNodeCall(node_list, "drbd_helper", [])
1159 @_RpcTimeout(_TMO_NORMAL)
1160 def call_upload_file(cls, node_list, file_name, address_list=None):
1163 The node will refuse the operation in case the file is not on the
1166 This is a multi-node call.
1168 @type node_list: list
1169 @param node_list: the list of node names to upload to
1170 @type file_name: str
1171 @param file_name: the filename to upload
1172 @type address_list: list or None
1173 @keyword address_list: an optional list of node addresses, in order
1174 to optimize the RPC speed
1177 file_contents = utils.ReadFile(file_name)
1178 data = cls._Compress(file_contents)
1179 st = os.stat(file_name)
1180 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1181 st.st_atime, st.st_mtime]
1182 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1183 address_list=address_list)
1186 @_RpcTimeout(_TMO_NORMAL)
1187 def call_write_ssconf_files(cls, node_list, values):
1188 """Write ssconf files.
1190 This is a multi-node call.
1193 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1195 @_RpcTimeout(_TMO_NORMAL)
1196 def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1199 This is a single-node call.
1202 return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1203 remote_node, timeout])
1205 @_RpcTimeout(_TMO_FAST)
1206 def call_os_diagnose(self, node_list):
1207 """Request a diagnose of OS definitions.
1209 This is a multi-node call.
1212 return self._MultiNodeCall(node_list, "os_diagnose", [])
1214 @_RpcTimeout(_TMO_FAST)
1215 def call_os_get(self, node, name):
1216 """Returns an OS definition.
1218 This is a single-node call.
1221 result = self._SingleNodeCall(node, "os_get", [name])
1222 if not result.fail_msg and isinstance(result.payload, dict):
1223 result.payload = objects.OS.FromDict(result.payload)
1226 @_RpcTimeout(_TMO_FAST)
1227 def call_os_validate(self, required, nodes, name, checks, params):
1228 """Run a validation routine for a given OS.
1230 This is a multi-node call.
1233 return self._MultiNodeCall(nodes, "os_validate",
1234 [required, name, checks, params])
1236 @_RpcTimeout(_TMO_NORMAL)
1237 def call_hooks_runner(self, node_list, hpath, phase, env):
1238 """Call the hooks runner.
1241 - op: the OpCode instance
1242 - env: a dictionary with the environment
1244 This is a multi-node call.
1247 params = [hpath, phase, env]
1248 return self._MultiNodeCall(node_list, "hooks_runner", params)
1250 @_RpcTimeout(_TMO_NORMAL)
1251 def call_iallocator_runner(self, node, name, idata):
1252 """Call an iallocator on a remote node
1255 - name: the iallocator name
1256 - input: the json-encoded input string
1258 This is a single-node call.
1261 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1263 @_RpcTimeout(_TMO_NORMAL)
1264 def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1265 """Request a snapshot of the given block device.
1267 This is a single-node call.
1270 return self._SingleNodeCall(node, "blockdev_grow",
1271 [cf_bdev.ToDict(), amount, dryrun])
1273 @_RpcTimeout(_TMO_1DAY)
1274 def call_blockdev_export(self, node, cf_bdev,
1275 dest_node, dest_path, cluster_name):
1276 """Export a given disk to another node.
1278 This is a single-node call.
1281 return self._SingleNodeCall(node, "blockdev_export",
1282 [cf_bdev.ToDict(), dest_node, dest_path,
1285 @_RpcTimeout(_TMO_NORMAL)
1286 def call_blockdev_snapshot(self, node, cf_bdev):
1287 """Request a snapshot of the given block device.
1289 This is a single-node call.
1292 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1294 @_RpcTimeout(_TMO_NORMAL)
1295 def call_finalize_export(self, node, instance, snap_disks):
1296 """Request the completion of an export operation.
1298 This writes the export config file, etc.
1300 This is a single-node call.
1304 for disk in snap_disks:
1305 if isinstance(disk, bool):
1306 flat_disks.append(disk)
1308 flat_disks.append(disk.ToDict())
1310 return self._SingleNodeCall(node, "finalize_export",
1311 [self._InstDict(instance), flat_disks])
1313 @_RpcTimeout(_TMO_FAST)
1314 def call_export_info(self, node, path):
1315 """Queries the export information in a given path.
1317 This is a single-node call.
1320 return self._SingleNodeCall(node, "export_info", [path])
1322 @_RpcTimeout(_TMO_FAST)
1323 def call_export_list(self, node_list):
1324 """Gets the stored exports list.
1326 This is a multi-node call.
1329 return self._MultiNodeCall(node_list, "export_list", [])
1331 @_RpcTimeout(_TMO_FAST)
1332 def call_export_remove(self, node, export):
1333 """Requests removal of a given export.
1335 This is a single-node call.
1338 return self._SingleNodeCall(node, "export_remove", [export])
1341 @_RpcTimeout(_TMO_NORMAL)
1342 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1343 """Requests a node to clean the cluster information it has.
1345 This will remove the configuration information from the ganeti data
1348 This is a single-node call.
1351 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1354 @_RpcTimeout(_TMO_FAST)
1355 def call_node_volumes(self, node_list):
1356 """Gets all volumes on node(s).
1358 This is a multi-node call.
1361 return self._MultiNodeCall(node_list, "node_volumes", [])
1363 @_RpcTimeout(_TMO_FAST)
1364 def call_node_demote_from_mc(self, node):
1365 """Demote a node from the master candidate role.
1367 This is a single-node call.
1370 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1372 @_RpcTimeout(_TMO_NORMAL)
1373 def call_node_powercycle(self, node, hypervisor):
1374 """Tries to powercycle a node.
1376 This is a single-node call.
1379 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1382 def call_test_delay(self, node_list, duration):
1383 """Sleep for a fixed time on given node(s).
1385 This is a multi-node call.
1388 return self._MultiNodeCall(node_list, "test_delay", [duration],
1389 read_timeout=int(duration + 5))
1391 @_RpcTimeout(_TMO_FAST)
1392 def call_file_storage_dir_create(self, node, file_storage_dir):
1393 """Create the given file storage directory.
1395 This is a single-node call.
1398 return self._SingleNodeCall(node, "file_storage_dir_create",
1401 @_RpcTimeout(_TMO_FAST)
1402 def call_file_storage_dir_remove(self, node, file_storage_dir):
1403 """Remove the given file storage directory.
1405 This is a single-node call.
1408 return self._SingleNodeCall(node, "file_storage_dir_remove",
1411 @_RpcTimeout(_TMO_FAST)
1412 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1413 new_file_storage_dir):
1414 """Rename file storage directory.
1416 This is a single-node call.
1419 return self._SingleNodeCall(node, "file_storage_dir_rename",
1420 [old_file_storage_dir, new_file_storage_dir])
1423 @_RpcTimeout(_TMO_URGENT)
1424 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1425 """Update job queue.
1427 This is a multi-node call.
1430 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1431 [file_name, cls._Compress(content)],
1432 address_list=address_list)
1435 @_RpcTimeout(_TMO_NORMAL)
1436 def call_jobqueue_purge(cls, node):
1439 This is a single-node call.
1442 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1445 @_RpcTimeout(_TMO_URGENT)
1446 def call_jobqueue_rename(cls, node_list, address_list, rename):
1447 """Rename a job queue file.
1449 This is a multi-node call.
1452 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1453 address_list=address_list)
1455 @_RpcTimeout(_TMO_NORMAL)
1456 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1457 """Validate the hypervisor params.
1459 This is a multi-node call.
1461 @type node_list: list
1462 @param node_list: the list of nodes to query
1463 @type hvname: string
1464 @param hvname: the hypervisor name
1465 @type hvparams: dict
1466 @param hvparams: the hypervisor parameters to be validated
1469 cluster = self._cfg.GetClusterInfo()
1470 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1471 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1474 @_RpcTimeout(_TMO_NORMAL)
1475 def call_x509_cert_create(self, node, validity):
1476 """Creates a new X509 certificate for SSL/TLS.
1478 This is a single-node call.
1481 @param validity: Validity in seconds
1484 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1486 @_RpcTimeout(_TMO_NORMAL)
1487 def call_x509_cert_remove(self, node, name):
1488 """Removes a X509 certificate.
1490 This is a single-node call.
1493 @param name: Certificate name
1496 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1498 @_RpcTimeout(_TMO_NORMAL)
1499 def call_import_start(self, node, opts, instance, dest, dest_args):
1500 """Starts a listener for an import.
1502 This is a single-node call.
1505 @param node: Node name
1506 @type instance: C{objects.Instance}
1507 @param instance: Instance object
1510 return self._SingleNodeCall(node, "import_start",
1512 self._InstDict(instance), dest,
1513 _EncodeImportExportIO(dest, dest_args)])
1515 @_RpcTimeout(_TMO_NORMAL)
1516 def call_export_start(self, node, opts, host, port,
1517 instance, source, source_args):
1518 """Starts an export daemon.
1520 This is a single-node call.
1523 @param node: Node name
1524 @type instance: C{objects.Instance}
1525 @param instance: Instance object
1528 return self._SingleNodeCall(node, "export_start",
1529 [opts.ToDict(), host, port,
1530 self._InstDict(instance), source,
1531 _EncodeImportExportIO(source, source_args)])
1533 @_RpcTimeout(_TMO_FAST)
1534 def call_impexp_status(self, node, names):
1535 """Gets the status of an import or export.
1537 This is a single-node call.
1540 @param node: Node name
1541 @type names: List of strings
1542 @param names: Import/export names
1543 @rtype: List of L{objects.ImportExportStatus} instances
1544 @return: Returns a list of the state of each named import/export or None if
1545 a status couldn't be retrieved
1548 result = self._SingleNodeCall(node, "impexp_status", [names])
1550 if not result.fail_msg:
1553 for i in result.payload:
1555 decoded.append(None)
1557 decoded.append(objects.ImportExportStatus.FromDict(i))
1559 result.payload = decoded
1563 @_RpcTimeout(_TMO_NORMAL)
1564 def call_impexp_abort(self, node, name):
1565 """Aborts an import or export.
1567 This is a single-node call.
1570 @param node: Node name
1572 @param name: Import/export name
1575 return self._SingleNodeCall(node, "impexp_abort", [name])
1577 @_RpcTimeout(_TMO_NORMAL)
1578 def call_impexp_cleanup(self, node, name):
1579 """Cleans up after an import or export.
1581 This is a single-node call.
1584 @param node: Node name
1586 @param name: Import/export name
1589 return self._SingleNodeCall(node, "impexp_cleanup", [name])