4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
49 # pylint has a bug here, doesn't see this import
50 import ganeti.http.client # pylint: disable-msg=W0611
53 # Timeout for connecting to nodes (seconds)
54 _RPC_CONNECT_TIMEOUT = 5
56 _RPC_CLIENT_HEADERS = [
57 "Content-type: %s" % http.HTTP_APP_JSON,
61 # Various time constants for the timeout table
62 _TMO_URGENT = 60 # one minute
63 _TMO_FAST = 5 * 60 # five minutes
64 _TMO_NORMAL = 15 * 60 # 15 minutes
65 _TMO_SLOW = 3600 # one hour
69 # Timeout table that will be built later by decorators
70 # Guidelines for choosing timeouts:
71 # - call used during watcher: timeout -> 1min, _TMO_URGENT
72 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73 # - other calls: 15 min, _TMO_NORMAL
74 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
81 """Initializes the module-global HTTP client manager.
83 Must be called before using any RPC function and while exactly one thread is
87 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88 # one thread running. This check is just a safety measure -- it doesn't
90 assert threading.activeCount() == 1, \
91 "Found more than one active thread when initializing pycURL"
93 logging.info("Using PycURL %s", pycurl.version)
95 pycurl.global_init(pycurl.GLOBAL_ALL)
99 """Stops the module-global HTTP client manager.
101 Must be called before quitting the program and while exactly one thread is
105 pycurl.global_cleanup()
108 def _ConfigRpcCurl(curl):
109 noded_cert = str(constants.NODED_CERT_FILE)
111 curl.setopt(pycurl.FOLLOWLOCATION, False)
112 curl.setopt(pycurl.CAINFO, noded_cert)
113 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114 curl.setopt(pycurl.SSL_VERIFYPEER, True)
115 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116 curl.setopt(pycurl.SSLCERT, noded_cert)
117 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118 curl.setopt(pycurl.SSLKEY, noded_cert)
119 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
122 # Aliasing this module avoids the following warning by epydoc: "Warning: No
123 # information available for ganeti.rpc._RpcThreadLocal's base threading.local"
124 _threading = threading
127 class _RpcThreadLocal(_threading.local):
128 def GetHttpClientPool(self):
129 """Returns a per-thread HTTP client pool.
131 @rtype: L{http.client.HttpClientPool}
136 except AttributeError:
137 pool = http.client.HttpClientPool(_ConfigRpcCurl)
143 # Remove module alias (see above)
147 _thread_local = _RpcThreadLocal()
150 def _RpcTimeout(secs):
151 """Timeout decorator.
153 When applied to a rpc call_* function, it updates the global timeout
154 table with the given function/timeout.
159 assert name.startswith("call_")
160 _TIMEOUTS[name[len("call_"):]] = secs
166 """RPC-wrapper decorator.
168 When applied to a function, it runs it with the RPC system
169 initialized, and it shutsdown the system afterwards. This means the
170 function must be called without RPC being initialized.
173 def wrapper(*args, **kwargs):
176 return fn(*args, **kwargs)
182 class RpcResult(object):
185 This class holds an RPC result. It is needed since in multi-node
186 calls we can't raise an exception just because one one out of many
187 failed, and therefore we use this class to encapsulate the result.
189 @ivar data: the data payload, for successful results, or None
190 @ivar call: the name of the RPC call
191 @ivar node: the name of the node to which we made the call
192 @ivar offline: whether the operation failed because the node was
193 offline, as opposed to actual failure; offline=True will always
194 imply failed=True, in order to allow simpler checking if
195 the user doesn't care about the exact failure mode
196 @ivar fail_msg: the error message if the call failed
199 def __init__(self, data=None, failed=False, offline=False,
200 call=None, node=None):
201 self.offline = offline
206 self.fail_msg = "Node is marked offline"
207 self.data = self.payload = None
209 self.fail_msg = self._EnsureErr(data)
210 self.data = self.payload = None
213 if not isinstance(self.data, (tuple, list)):
214 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
218 self.fail_msg = ("RPC layer error: invalid result length (%d), "
219 "expected 2" % len(self.data))
221 elif not self.data[0]:
222 self.fail_msg = self._EnsureErr(self.data[1])
227 self.payload = data[1]
229 assert hasattr(self, "call")
230 assert hasattr(self, "data")
231 assert hasattr(self, "fail_msg")
232 assert hasattr(self, "node")
233 assert hasattr(self, "offline")
234 assert hasattr(self, "payload")
238 """Helper to ensure we return a 'True' value for error."""
242 return "No error information"
244 def Raise(self, msg, prereq=False, ecode=None):
245 """If the result has failed, raise an OpExecError.
247 This is used so that LU code doesn't have to check for each
248 result, but instead can call this function.
251 if not self.fail_msg:
254 if not msg: # one could pass None for default message
255 msg = ("Call '%s' to node '%s' has failed: %s" %
256 (self.call, self.node, self.fail_msg))
258 msg = "%s: %s" % (msg, self.fail_msg)
260 ec = errors.OpPrereqError
262 ec = errors.OpExecError
263 if ecode is not None:
267 raise ec(*args) # pylint: disable-msg=W0142
270 def _AddressLookup(node_list,
271 ssc=ssconf.SimpleStore,
272 nslookup_fn=netutils.Hostname.GetIP):
273 """Return addresses for given node names.
275 @type node_list: list
276 @param node_list: List of node names
278 @param ssc: SimpleStore class that is used to obtain node->ip mappings
279 @type nslookup_fn: callable
280 @param nslookup_fn: function use to do NS lookup
281 @rtype: list of addresses and/or None's
282 @returns: List of corresponding addresses, if found
286 iplist = ss.GetNodePrimaryIPList()
287 family = ss.GetPrimaryIPFamily()
289 ipmap = dict(entry.split() for entry in iplist)
290 for node in node_list:
291 address = ipmap.get(node)
293 address = nslookup_fn(node, family=family)
294 addresses.append(address)
302 This class, given a (remote) method name, a list of parameters and a
303 list of nodes, will contact (in parallel) all nodes, and return a
304 dict of results (key: node name, value: result).
306 One current bug is that generic failure is still signaled by
307 'False' result, which is not good. This overloading of values can
311 def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
312 assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
314 self.procedure = procedure
318 self._address_lookup_fn = address_lookup_fn
320 def ConnectList(self, node_list, address_list=None, read_timeout=None):
321 """Add a list of nodes to the target nodes.
323 @type node_list: list
324 @param node_list: the list of node names to connect
325 @type address_list: list or None
326 @keyword address_list: either None or a list with node addresses,
327 which must have the same length as the node list
328 @type read_timeout: int
329 @param read_timeout: overwrites default timeout for operation
332 if address_list is None:
333 # Always use IP address instead of node name
334 address_list = self._address_lookup_fn(node_list)
336 assert len(node_list) == len(address_list), \
337 "Name and address lists must have the same length"
339 for node, address in zip(node_list, address_list):
340 self.ConnectNode(node, address, read_timeout=read_timeout)
342 def ConnectNode(self, name, address=None, read_timeout=None):
343 """Add a node to the target list.
346 @param name: the node name
348 @param address: the node address, if known
349 @type read_timeout: int
350 @param read_timeout: overwrites default timeout for operation
354 # Always use IP address instead of node name
355 address = self._address_lookup_fn([name])[0]
357 assert(address is not None)
359 if read_timeout is None:
360 read_timeout = _TIMEOUTS[self.procedure]
362 self._request[name] = \
363 http.client.HttpClientRequest(str(address), self.port,
364 http.HTTP_PUT, str("/%s" % self.procedure),
365 headers=_RPC_CLIENT_HEADERS,
366 post_data=str(self.body),
367 read_timeout=read_timeout)
369 def GetResults(self, http_pool=None):
370 """Call nodes and return results.
373 @return: List of RPC results
377 http_pool = _thread_local.GetHttpClientPool()
379 http_pool.ProcessRequests(self._request.values())
383 for name, req in self._request.iteritems():
384 if req.success and req.resp_status_code == http.HTTP_OK:
385 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
386 node=name, call=self.procedure)
389 # TODO: Better error reporting
395 logging.error("RPC error in %s from node %s: %s",
396 self.procedure, name, msg)
397 results[name] = RpcResult(data=msg, failed=True, node=name,
403 def _EncodeImportExportIO(ieio, ieioargs):
404 """Encodes import/export I/O information.
407 if ieio == constants.IEIO_RAW_DISK:
408 assert len(ieioargs) == 1
409 return (ieioargs[0].ToDict(), )
411 if ieio == constants.IEIO_SCRIPT:
412 assert len(ieioargs) == 2
413 return (ieioargs[0].ToDict(), ieioargs[1])
418 class RpcRunner(object):
419 """RPC runner class"""
421 def __init__(self, cfg):
422 """Initialized the rpc runner.
424 @type cfg: C{config.ConfigWriter}
425 @param cfg: the configuration object that will be used to get data
430 self.port = netutils.GetDaemonPort(constants.NODED)
432 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
433 """Convert the given instance to a dict.
435 This is done via the instance's ToDict() method and additionally
436 we fill the hvparams with the cluster defaults.
438 @type instance: L{objects.Instance}
439 @param instance: an Instance object
440 @type hvp: dict or None
441 @param hvp: a dictionary with overridden hypervisor parameters
442 @type bep: dict or None
443 @param bep: a dictionary with overridden backend parameters
444 @type osp: dict or None
445 @param osp: a dictionary with overridden os parameters
447 @return: the instance dict, with the hvparams filled with the
451 idict = instance.ToDict()
452 cluster = self._cfg.GetClusterInfo()
453 idict["hvparams"] = cluster.FillHV(instance)
455 idict["hvparams"].update(hvp)
456 idict["beparams"] = cluster.FillBE(instance)
458 idict["beparams"].update(bep)
459 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
461 idict["osparams"].update(osp)
462 for nic in idict["nics"]:
463 nic['nicparams'] = objects.FillDict(
464 cluster.nicparams[constants.PP_DEFAULT],
468 def _ConnectList(self, client, node_list, call, read_timeout=None):
469 """Helper for computing node addresses.
471 @type client: L{ganeti.rpc.Client}
472 @param client: a C{Client} instance
473 @type node_list: list
474 @param node_list: the node list we should connect
476 @param call: the name of the remote procedure call, for filling in
477 correctly any eventual offline nodes' results
478 @type read_timeout: int
479 @param read_timeout: overwrites the default read timeout for the
483 all_nodes = self._cfg.GetAllNodesInfo()
487 for node in node_list:
488 if node in all_nodes:
489 if all_nodes[node].offline:
490 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
492 val = all_nodes[node].primary_ip
495 addr_list.append(val)
496 name_list.append(node)
498 client.ConnectList(name_list, address_list=addr_list,
499 read_timeout=read_timeout)
502 def _ConnectNode(self, client, node, call, read_timeout=None):
503 """Helper for computing one node's address.
505 @type client: L{ganeti.rpc.Client}
506 @param client: a C{Client} instance
508 @param node: the node we should connect
510 @param call: the name of the remote procedure call, for filling in
511 correctly any eventual offline nodes' results
512 @type read_timeout: int
513 @param read_timeout: overwrites the default read timeout for the
517 node_info = self._cfg.GetNodeInfo(node)
518 if node_info is not None:
519 if node_info.offline:
520 return RpcResult(node=node, offline=True, call=call)
521 addr = node_info.primary_ip
524 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
526 def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
527 """Helper for making a multi-node call
530 body = serializer.DumpJson(args, indent=False)
531 c = Client(procedure, body, self.port)
532 skip_dict = self._ConnectList(c, node_list, procedure,
533 read_timeout=read_timeout)
534 skip_dict.update(c.GetResults())
538 def _StaticMultiNodeCall(cls, node_list, procedure, args,
539 address_list=None, read_timeout=None):
540 """Helper for making a multi-node static call
543 body = serializer.DumpJson(args, indent=False)
544 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
545 c.ConnectList(node_list, address_list=address_list,
546 read_timeout=read_timeout)
547 return c.GetResults()
549 def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
550 """Helper for making a single-node call
553 body = serializer.DumpJson(args, indent=False)
554 c = Client(procedure, body, self.port)
555 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
557 # we did connect, node is not offline
558 result = c.GetResults()[node]
562 def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
563 """Helper for making a single-node static call
566 body = serializer.DumpJson(args, indent=False)
567 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
568 c.ConnectNode(node, read_timeout=read_timeout)
569 return c.GetResults()[node]
573 """Compresses a string for transport over RPC.
575 Small amounts of data are not compressed.
580 @return: Encoded data to send
583 # Small amounts of data are not compressed
585 return (constants.RPC_ENCODING_NONE, data)
587 # Compress with zlib and encode in base64
588 return (constants.RPC_ENCODING_ZLIB_BASE64,
589 base64.b64encode(zlib.compress(data, 3)))
595 @_RpcTimeout(_TMO_URGENT)
596 def call_lv_list(self, node_list, vg_name):
597 """Gets the logical volumes present in a given volume group.
599 This is a multi-node call.
602 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
604 @_RpcTimeout(_TMO_URGENT)
605 def call_vg_list(self, node_list):
606 """Gets the volume group list.
608 This is a multi-node call.
611 return self._MultiNodeCall(node_list, "vg_list", [])
613 @_RpcTimeout(_TMO_NORMAL)
614 def call_storage_list(self, node_list, su_name, su_args, name, fields):
615 """Get list of storage units.
617 This is a multi-node call.
620 return self._MultiNodeCall(node_list, "storage_list",
621 [su_name, su_args, name, fields])
623 @_RpcTimeout(_TMO_NORMAL)
624 def call_storage_modify(self, node, su_name, su_args, name, changes):
625 """Modify a storage unit.
627 This is a single-node call.
630 return self._SingleNodeCall(node, "storage_modify",
631 [su_name, su_args, name, changes])
633 @_RpcTimeout(_TMO_NORMAL)
634 def call_storage_execute(self, node, su_name, su_args, name, op):
635 """Executes an operation on a storage unit.
637 This is a single-node call.
640 return self._SingleNodeCall(node, "storage_execute",
641 [su_name, su_args, name, op])
643 @_RpcTimeout(_TMO_URGENT)
644 def call_bridges_exist(self, node, bridges_list):
645 """Checks if a node has all the bridges given.
647 This method checks if all bridges given in the bridges_list are
648 present on the remote node, so that an instance that uses interfaces
649 on those bridges can be started.
651 This is a single-node call.
654 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
656 @_RpcTimeout(_TMO_NORMAL)
657 def call_instance_start(self, node, instance, hvp, bep):
658 """Starts an instance.
660 This is a single-node call.
663 idict = self._InstDict(instance, hvp=hvp, bep=bep)
664 return self._SingleNodeCall(node, "instance_start", [idict])
666 @_RpcTimeout(_TMO_NORMAL)
667 def call_instance_shutdown(self, node, instance, timeout):
668 """Stops an instance.
670 This is a single-node call.
673 return self._SingleNodeCall(node, "instance_shutdown",
674 [self._InstDict(instance), timeout])
676 @_RpcTimeout(_TMO_NORMAL)
677 def call_migration_info(self, node, instance):
678 """Gather the information necessary to prepare an instance migration.
680 This is a single-node call.
683 @param node: the node on which the instance is currently running
684 @type instance: C{objects.Instance}
685 @param instance: the instance definition
688 return self._SingleNodeCall(node, "migration_info",
689 [self._InstDict(instance)])
691 @_RpcTimeout(_TMO_NORMAL)
692 def call_accept_instance(self, node, instance, info, target):
693 """Prepare a node to accept an instance.
695 This is a single-node call.
698 @param node: the target node for the migration
699 @type instance: C{objects.Instance}
700 @param instance: the instance definition
701 @type info: opaque/hypervisor specific (string/data)
702 @param info: result for the call_migration_info call
704 @param target: target hostname (usually ip address) (on the node itself)
707 return self._SingleNodeCall(node, "accept_instance",
708 [self._InstDict(instance), info, target])
710 @_RpcTimeout(_TMO_NORMAL)
711 def call_finalize_migration(self, node, instance, info, success):
712 """Finalize any target-node migration specific operation.
714 This is called both in case of a successful migration and in case of error
715 (in which case it should abort the migration).
717 This is a single-node call.
720 @param node: the target node for the migration
721 @type instance: C{objects.Instance}
722 @param instance: the instance definition
723 @type info: opaque/hypervisor specific (string/data)
724 @param info: result for the call_migration_info call
725 @type success: boolean
726 @param success: whether the migration was a success or a failure
729 return self._SingleNodeCall(node, "finalize_migration",
730 [self._InstDict(instance), info, success])
732 @_RpcTimeout(_TMO_SLOW)
733 def call_instance_migrate(self, node, instance, target, live):
734 """Migrate an instance.
736 This is a single-node call.
739 @param node: the node on which the instance is currently running
740 @type instance: C{objects.Instance}
741 @param instance: the instance definition
743 @param target: the target node name
745 @param live: whether the migration should be done live or not (the
746 interpretation of this parameter is left to the hypervisor)
749 return self._SingleNodeCall(node, "instance_migrate",
750 [self._InstDict(instance), target, live])
752 @_RpcTimeout(_TMO_NORMAL)
753 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
754 """Reboots an instance.
756 This is a single-node call.
759 return self._SingleNodeCall(node, "instance_reboot",
760 [self._InstDict(inst), reboot_type,
763 @_RpcTimeout(_TMO_1DAY)
764 def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
765 """Installs an OS on the given instance.
767 This is a single-node call.
770 return self._SingleNodeCall(node, "instance_os_add",
771 [self._InstDict(inst, osp=osparams),
774 @_RpcTimeout(_TMO_SLOW)
775 def call_instance_run_rename(self, node, inst, old_name, debug):
776 """Run the OS rename script for an instance.
778 This is a single-node call.
781 return self._SingleNodeCall(node, "instance_run_rename",
782 [self._InstDict(inst), old_name, debug])
784 @_RpcTimeout(_TMO_URGENT)
785 def call_instance_info(self, node, instance, hname):
786 """Returns information about a single instance.
788 This is a single-node call.
791 @param node: the list of nodes to query
792 @type instance: string
793 @param instance: the instance name
795 @param hname: the hypervisor type of the instance
798 return self._SingleNodeCall(node, "instance_info", [instance, hname])
800 @_RpcTimeout(_TMO_NORMAL)
801 def call_instance_migratable(self, node, instance):
802 """Checks whether the given instance can be migrated.
804 This is a single-node call.
806 @param node: the node to query
807 @type instance: L{objects.Instance}
808 @param instance: the instance to check
812 return self._SingleNodeCall(node, "instance_migratable",
813 [self._InstDict(instance)])
815 @_RpcTimeout(_TMO_URGENT)
816 def call_all_instances_info(self, node_list, hypervisor_list):
817 """Returns information about all instances on the given nodes.
819 This is a multi-node call.
821 @type node_list: list
822 @param node_list: the list of nodes to query
823 @type hypervisor_list: list
824 @param hypervisor_list: the hypervisors to query for instances
827 return self._MultiNodeCall(node_list, "all_instances_info",
830 @_RpcTimeout(_TMO_URGENT)
831 def call_instance_list(self, node_list, hypervisor_list):
832 """Returns the list of running instances on a given node.
834 This is a multi-node call.
836 @type node_list: list
837 @param node_list: the list of nodes to query
838 @type hypervisor_list: list
839 @param hypervisor_list: the hypervisors to query for instances
842 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
844 @_RpcTimeout(_TMO_FAST)
845 def call_node_tcp_ping(self, node, source, target, port, timeout,
847 """Do a TcpPing on the remote node
849 This is a single-node call.
852 return self._SingleNodeCall(node, "node_tcp_ping",
853 [source, target, port, timeout,
856 @_RpcTimeout(_TMO_FAST)
857 def call_node_has_ip_address(self, node, address):
858 """Checks if a node has the given IP address.
860 This is a single-node call.
863 return self._SingleNodeCall(node, "node_has_ip_address", [address])
865 @_RpcTimeout(_TMO_URGENT)
866 def call_node_info(self, node_list, vg_name, hypervisor_type):
867 """Return node information.
869 This will return memory information and volume group size and free
872 This is a multi-node call.
874 @type node_list: list
875 @param node_list: the list of nodes to query
876 @type vg_name: C{string}
877 @param vg_name: the name of the volume group to ask for disk space
879 @type hypervisor_type: C{str}
880 @param hypervisor_type: the name of the hypervisor to ask for
884 return self._MultiNodeCall(node_list, "node_info",
885 [vg_name, hypervisor_type])
887 @_RpcTimeout(_TMO_NORMAL)
888 def call_etc_hosts_modify(self, node, mode, name, ip):
889 """Modify hosts file with name
892 @param node: The node to call
894 @param mode: The mode to operate. Currently "add" or "remove"
896 @param name: The host name to be modified
898 @param ip: The ip of the entry (just valid if mode is "add")
901 return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
903 @_RpcTimeout(_TMO_NORMAL)
904 def call_node_verify(self, node_list, checkdict, cluster_name):
905 """Request verification of given parameters.
907 This is a multi-node call.
910 return self._MultiNodeCall(node_list, "node_verify",
911 [checkdict, cluster_name])
914 @_RpcTimeout(_TMO_FAST)
915 def call_node_start_master(cls, node, start_daemons, no_voting):
916 """Tells a node to activate itself as a master.
918 This is a single-node call.
921 return cls._StaticSingleNodeCall(node, "node_start_master",
922 [start_daemons, no_voting])
925 @_RpcTimeout(_TMO_FAST)
926 def call_node_stop_master(cls, node, stop_daemons):
927 """Tells a node to demote itself from master status.
929 This is a single-node call.
932 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
935 @_RpcTimeout(_TMO_URGENT)
936 def call_master_info(cls, node_list):
937 """Query master info.
939 This is a multi-node call.
942 # TODO: should this method query down nodes?
943 return cls._StaticMultiNodeCall(node_list, "master_info", [])
946 @_RpcTimeout(_TMO_URGENT)
947 def call_version(cls, node_list):
948 """Query node version.
950 This is a multi-node call.
953 return cls._StaticMultiNodeCall(node_list, "version", [])
955 @_RpcTimeout(_TMO_NORMAL)
956 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
957 """Request creation of a given block device.
959 This is a single-node call.
962 return self._SingleNodeCall(node, "blockdev_create",
963 [bdev.ToDict(), size, owner, on_primary, info])
965 @_RpcTimeout(_TMO_SLOW)
966 def call_blockdev_wipe(self, node, bdev, offset, size):
967 """Request wipe at given offset with given size of a block device.
969 This is a single-node call.
972 return self._SingleNodeCall(node, "blockdev_wipe",
973 [bdev.ToDict(), offset, size])
975 @_RpcTimeout(_TMO_NORMAL)
976 def call_blockdev_remove(self, node, bdev):
977 """Request removal of a given block device.
979 This is a single-node call.
982 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
984 @_RpcTimeout(_TMO_NORMAL)
985 def call_blockdev_rename(self, node, devlist):
986 """Request rename of the given block devices.
988 This is a single-node call.
991 return self._SingleNodeCall(node, "blockdev_rename",
992 [(d.ToDict(), uid) for d, uid in devlist])
994 @_RpcTimeout(_TMO_NORMAL)
995 def call_blockdev_assemble(self, node, disk, owner, on_primary):
996 """Request assembling of a given block device.
998 This is a single-node call.
1001 return self._SingleNodeCall(node, "blockdev_assemble",
1002 [disk.ToDict(), owner, on_primary])
1004 @_RpcTimeout(_TMO_NORMAL)
1005 def call_blockdev_shutdown(self, node, disk):
1006 """Request shutdown of a given block device.
1008 This is a single-node call.
1011 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1013 @_RpcTimeout(_TMO_NORMAL)
1014 def call_blockdev_addchildren(self, node, bdev, ndevs):
1015 """Request adding a list of children to a (mirroring) device.
1017 This is a single-node call.
1020 return self._SingleNodeCall(node, "blockdev_addchildren",
1022 [disk.ToDict() for disk in ndevs]])
1024 @_RpcTimeout(_TMO_NORMAL)
1025 def call_blockdev_removechildren(self, node, bdev, ndevs):
1026 """Request removing a list of children from a (mirroring) device.
1028 This is a single-node call.
1031 return self._SingleNodeCall(node, "blockdev_removechildren",
1033 [disk.ToDict() for disk in ndevs]])
1035 @_RpcTimeout(_TMO_NORMAL)
1036 def call_blockdev_getmirrorstatus(self, node, disks):
1037 """Request status of a (mirroring) device.
1039 This is a single-node call.
1042 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1043 [dsk.ToDict() for dsk in disks])
1044 if not result.fail_msg:
1045 result.payload = [objects.BlockDevStatus.FromDict(i)
1046 for i in result.payload]
1049 @_RpcTimeout(_TMO_NORMAL)
1050 def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1051 """Request status of (mirroring) devices from multiple nodes.
1053 This is a multi-node call.
1056 result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1057 [dict((name, [dsk.ToDict() for dsk in disks])
1058 for name, disks in node_disks.items())])
1059 for nres in result.values():
1063 for idx, (success, status) in enumerate(nres.payload):
1065 nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1069 @_RpcTimeout(_TMO_NORMAL)
1070 def call_blockdev_find(self, node, disk):
1071 """Request identification of a given block device.
1073 This is a single-node call.
1076 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1077 if not result.fail_msg and result.payload is not None:
1078 result.payload = objects.BlockDevStatus.FromDict(result.payload)
1081 @_RpcTimeout(_TMO_NORMAL)
1082 def call_blockdev_close(self, node, instance_name, disks):
1083 """Closes the given block devices.
1085 This is a single-node call.
1088 params = [instance_name, [cf.ToDict() for cf in disks]]
1089 return self._SingleNodeCall(node, "blockdev_close", params)
1091 @_RpcTimeout(_TMO_NORMAL)
1092 def call_blockdev_getsizes(self, node, disks):
1093 """Returns the size of the given disks.
1095 This is a single-node call.
1098 params = [[cf.ToDict() for cf in disks]]
1099 return self._SingleNodeCall(node, "blockdev_getsize", params)
1101 @_RpcTimeout(_TMO_NORMAL)
1102 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1103 """Disconnects the network of the given drbd devices.
1105 This is a multi-node call.
1108 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1109 [nodes_ip, [cf.ToDict() for cf in disks]])
1111 @_RpcTimeout(_TMO_NORMAL)
1112 def call_drbd_attach_net(self, node_list, nodes_ip,
1113 disks, instance_name, multimaster):
1114 """Disconnects the given drbd devices.
1116 This is a multi-node call.
1119 return self._MultiNodeCall(node_list, "drbd_attach_net",
1120 [nodes_ip, [cf.ToDict() for cf in disks],
1121 instance_name, multimaster])
1123 @_RpcTimeout(_TMO_SLOW)
1124 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1125 """Waits for the synchronization of drbd devices is complete.
1127 This is a multi-node call.
1130 return self._MultiNodeCall(node_list, "drbd_wait_sync",
1131 [nodes_ip, [cf.ToDict() for cf in disks]])
1133 @_RpcTimeout(_TMO_URGENT)
1134 def call_drbd_helper(self, node_list):
1135 """Gets drbd helper.
1137 This is a multi-node call.
1140 return self._MultiNodeCall(node_list, "drbd_helper", [])
1143 @_RpcTimeout(_TMO_NORMAL)
1144 def call_upload_file(cls, node_list, file_name, address_list=None):
1147 The node will refuse the operation in case the file is not on the
1150 This is a multi-node call.
1152 @type node_list: list
1153 @param node_list: the list of node names to upload to
1154 @type file_name: str
1155 @param file_name: the filename to upload
1156 @type address_list: list or None
1157 @keyword address_list: an optional list of node addresses, in order
1158 to optimize the RPC speed
1161 file_contents = utils.ReadFile(file_name)
1162 data = cls._Compress(file_contents)
1163 st = os.stat(file_name)
1164 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1165 st.st_atime, st.st_mtime]
1166 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1167 address_list=address_list)
1170 @_RpcTimeout(_TMO_NORMAL)
1171 def call_write_ssconf_files(cls, node_list, values):
1172 """Write ssconf files.
1174 This is a multi-node call.
1177 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1179 @_RpcTimeout(_TMO_NORMAL)
1180 def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1183 This is a single-node call.
1186 return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1187 remote_node, timeout])
1189 @_RpcTimeout(_TMO_FAST)
1190 def call_os_diagnose(self, node_list):
1191 """Request a diagnose of OS definitions.
1193 This is a multi-node call.
1196 return self._MultiNodeCall(node_list, "os_diagnose", [])
1198 @_RpcTimeout(_TMO_FAST)
1199 def call_os_get(self, node, name):
1200 """Returns an OS definition.
1202 This is a single-node call.
1205 result = self._SingleNodeCall(node, "os_get", [name])
1206 if not result.fail_msg and isinstance(result.payload, dict):
1207 result.payload = objects.OS.FromDict(result.payload)
1210 @_RpcTimeout(_TMO_FAST)
1211 def call_os_validate(self, required, nodes, name, checks, params):
1212 """Run a validation routine for a given OS.
1214 This is a multi-node call.
1217 return self._MultiNodeCall(nodes, "os_validate",
1218 [required, name, checks, params])
1220 @_RpcTimeout(_TMO_NORMAL)
1221 def call_hooks_runner(self, node_list, hpath, phase, env):
1222 """Call the hooks runner.
1225 - op: the OpCode instance
1226 - env: a dictionary with the environment
1228 This is a multi-node call.
1231 params = [hpath, phase, env]
1232 return self._MultiNodeCall(node_list, "hooks_runner", params)
1234 @_RpcTimeout(_TMO_NORMAL)
1235 def call_iallocator_runner(self, node, name, idata):
1236 """Call an iallocator on a remote node
1239 - name: the iallocator name
1240 - input: the json-encoded input string
1242 This is a single-node call.
1245 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1247 @_RpcTimeout(_TMO_NORMAL)
1248 def call_blockdev_grow(self, node, cf_bdev, amount):
1249 """Request a snapshot of the given block device.
1251 This is a single-node call.
1254 return self._SingleNodeCall(node, "blockdev_grow",
1255 [cf_bdev.ToDict(), amount])
1257 @_RpcTimeout(_TMO_1DAY)
1258 def call_blockdev_export(self, node, cf_bdev,
1259 dest_node, dest_path, cluster_name):
1260 """Export a given disk to another node.
1262 This is a single-node call.
1265 return self._SingleNodeCall(node, "blockdev_export",
1266 [cf_bdev.ToDict(), dest_node, dest_path,
1269 @_RpcTimeout(_TMO_NORMAL)
1270 def call_blockdev_snapshot(self, node, cf_bdev):
1271 """Request a snapshot of the given block device.
1273 This is a single-node call.
1276 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1278 @_RpcTimeout(_TMO_NORMAL)
1279 def call_finalize_export(self, node, instance, snap_disks):
1280 """Request the completion of an export operation.
1282 This writes the export config file, etc.
1284 This is a single-node call.
1288 for disk in snap_disks:
1289 if isinstance(disk, bool):
1290 flat_disks.append(disk)
1292 flat_disks.append(disk.ToDict())
1294 return self._SingleNodeCall(node, "finalize_export",
1295 [self._InstDict(instance), flat_disks])
1297 @_RpcTimeout(_TMO_FAST)
1298 def call_export_info(self, node, path):
1299 """Queries the export information in a given path.
1301 This is a single-node call.
1304 return self._SingleNodeCall(node, "export_info", [path])
1306 @_RpcTimeout(_TMO_FAST)
1307 def call_export_list(self, node_list):
1308 """Gets the stored exports list.
1310 This is a multi-node call.
1313 return self._MultiNodeCall(node_list, "export_list", [])
1315 @_RpcTimeout(_TMO_FAST)
1316 def call_export_remove(self, node, export):
1317 """Requests removal of a given export.
1319 This is a single-node call.
1322 return self._SingleNodeCall(node, "export_remove", [export])
1325 @_RpcTimeout(_TMO_NORMAL)
1326 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1327 """Requests a node to clean the cluster information it has.
1329 This will remove the configuration information from the ganeti data
1332 This is a single-node call.
1335 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1338 @_RpcTimeout(_TMO_FAST)
1339 def call_node_volumes(self, node_list):
1340 """Gets all volumes on node(s).
1342 This is a multi-node call.
1345 return self._MultiNodeCall(node_list, "node_volumes", [])
1347 @_RpcTimeout(_TMO_FAST)
1348 def call_node_demote_from_mc(self, node):
1349 """Demote a node from the master candidate role.
1351 This is a single-node call.
1354 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1356 @_RpcTimeout(_TMO_NORMAL)
1357 def call_node_powercycle(self, node, hypervisor):
1358 """Tries to powercycle a node.
1360 This is a single-node call.
1363 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1366 def call_test_delay(self, node_list, duration):
1367 """Sleep for a fixed time on given node(s).
1369 This is a multi-node call.
1372 return self._MultiNodeCall(node_list, "test_delay", [duration],
1373 read_timeout=int(duration + 5))
1375 @_RpcTimeout(_TMO_FAST)
1376 def call_file_storage_dir_create(self, node, file_storage_dir):
1377 """Create the given file storage directory.
1379 This is a single-node call.
1382 return self._SingleNodeCall(node, "file_storage_dir_create",
1385 @_RpcTimeout(_TMO_FAST)
1386 def call_file_storage_dir_remove(self, node, file_storage_dir):
1387 """Remove the given file storage directory.
1389 This is a single-node call.
1392 return self._SingleNodeCall(node, "file_storage_dir_remove",
1395 @_RpcTimeout(_TMO_FAST)
1396 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1397 new_file_storage_dir):
1398 """Rename file storage directory.
1400 This is a single-node call.
1403 return self._SingleNodeCall(node, "file_storage_dir_rename",
1404 [old_file_storage_dir, new_file_storage_dir])
1407 @_RpcTimeout(_TMO_FAST)
1408 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1409 """Update job queue.
1411 This is a multi-node call.
1414 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1415 [file_name, cls._Compress(content)],
1416 address_list=address_list)
1419 @_RpcTimeout(_TMO_NORMAL)
1420 def call_jobqueue_purge(cls, node):
1423 This is a single-node call.
1426 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1429 @_RpcTimeout(_TMO_FAST)
1430 def call_jobqueue_rename(cls, node_list, address_list, rename):
1431 """Rename a job queue file.
1433 This is a multi-node call.
1436 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1437 address_list=address_list)
1439 @_RpcTimeout(_TMO_NORMAL)
1440 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1441 """Validate the hypervisor params.
1443 This is a multi-node call.
1445 @type node_list: list
1446 @param node_list: the list of nodes to query
1447 @type hvname: string
1448 @param hvname: the hypervisor name
1449 @type hvparams: dict
1450 @param hvparams: the hypervisor parameters to be validated
1453 cluster = self._cfg.GetClusterInfo()
1454 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1455 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1458 @_RpcTimeout(_TMO_NORMAL)
1459 def call_x509_cert_create(self, node, validity):
1460 """Creates a new X509 certificate for SSL/TLS.
1462 This is a single-node call.
1465 @param validity: Validity in seconds
1468 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1470 @_RpcTimeout(_TMO_NORMAL)
1471 def call_x509_cert_remove(self, node, name):
1472 """Removes a X509 certificate.
1474 This is a single-node call.
1477 @param name: Certificate name
1480 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1482 @_RpcTimeout(_TMO_NORMAL)
1483 def call_import_start(self, node, opts, instance, dest, dest_args):
1484 """Starts a listener for an import.
1486 This is a single-node call.
1489 @param node: Node name
1490 @type instance: C{objects.Instance}
1491 @param instance: Instance object
1494 return self._SingleNodeCall(node, "import_start",
1496 self._InstDict(instance), dest,
1497 _EncodeImportExportIO(dest, dest_args)])
1499 @_RpcTimeout(_TMO_NORMAL)
1500 def call_export_start(self, node, opts, host, port,
1501 instance, source, source_args):
1502 """Starts an export daemon.
1504 This is a single-node call.
1507 @param node: Node name
1508 @type instance: C{objects.Instance}
1509 @param instance: Instance object
1512 return self._SingleNodeCall(node, "export_start",
1513 [opts.ToDict(), host, port,
1514 self._InstDict(instance), source,
1515 _EncodeImportExportIO(source, source_args)])
1517 @_RpcTimeout(_TMO_FAST)
1518 def call_impexp_status(self, node, names):
1519 """Gets the status of an import or export.
1521 This is a single-node call.
1524 @param node: Node name
1525 @type names: List of strings
1526 @param names: Import/export names
1527 @rtype: List of L{objects.ImportExportStatus} instances
1528 @return: Returns a list of the state of each named import/export or None if
1529 a status couldn't be retrieved
1532 result = self._SingleNodeCall(node, "impexp_status", [names])
1534 if not result.fail_msg:
1537 for i in result.payload:
1539 decoded.append(None)
1541 decoded.append(objects.ImportExportStatus.FromDict(i))
1543 result.payload = decoded
1547 @_RpcTimeout(_TMO_NORMAL)
1548 def call_impexp_abort(self, node, name):
1549 """Aborts an import or export.
1551 This is a single-node call.
1554 @param node: Node name
1556 @param name: Import/export name
1559 return self._SingleNodeCall(node, "impexp_abort", [name])
1561 @_RpcTimeout(_TMO_NORMAL)
1562 def call_impexp_cleanup(self, node, name):
1563 """Cleans up after an import or export.
1565 This is a single-node call.
1568 @param node: Node name
1570 @param name: Import/export name
1573 return self._SingleNodeCall(node, "impexp_cleanup", [name])