4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
51 # Special module generated at build time
52 from ganeti import _generated_rpc
54 # pylint has a bug here, doesn't see this import
55 import ganeti.http.client # pylint: disable=W0611
58 # Timeout for connecting to nodes (seconds)
59 _RPC_CONNECT_TIMEOUT = 5
61 _RPC_CLIENT_HEADERS = [
62 "Content-type: %s" % http.HTTP_APP_JSON,
66 # Various time constants for the timeout table
67 _TMO_URGENT = 60 # one minute
68 _TMO_FAST = 5 * 60 # five minutes
69 _TMO_NORMAL = 15 * 60 # 15 minutes
70 _TMO_SLOW = 3600 # one hour
74 # Timeout table that will be built later by decorators
75 # Guidelines for choosing timeouts:
76 # - call used during watcher: timeout -> 1min, _TMO_URGENT
77 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
78 # - other calls: 15 min, _TMO_NORMAL
79 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
84 #: Special value to describe an offline host
89 """Initializes the module-global HTTP client manager.
91 Must be called before using any RPC function and while exactly one thread is
95 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
96 # one thread running. This check is just a safety measure -- it doesn't
98 assert threading.activeCount() == 1, \
99 "Found more than one active thread when initializing pycURL"
101 logging.info("Using PycURL %s", pycurl.version)
103 pycurl.global_init(pycurl.GLOBAL_ALL)
107 """Stops the module-global HTTP client manager.
109 Must be called before quitting the program and while exactly one thread is
113 pycurl.global_cleanup()
116 def _ConfigRpcCurl(curl):
117 noded_cert = str(constants.NODED_CERT_FILE)
119 curl.setopt(pycurl.FOLLOWLOCATION, False)
120 curl.setopt(pycurl.CAINFO, noded_cert)
121 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
122 curl.setopt(pycurl.SSL_VERIFYPEER, True)
123 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
124 curl.setopt(pycurl.SSLCERT, noded_cert)
125 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
126 curl.setopt(pycurl.SSLKEY, noded_cert)
127 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
130 def _RpcTimeout(secs):
131 """Timeout decorator.
133 When applied to a rpc call_* function, it updates the global timeout
134 table with the given function/timeout.
139 assert name.startswith("call_")
140 _TIMEOUTS[name[len("call_"):]] = secs
146 """RPC-wrapper decorator.
148 When applied to a function, it runs it with the RPC system
149 initialized, and it shutsdown the system afterwards. This means the
150 function must be called without RPC being initialized.
153 def wrapper(*args, **kwargs):
156 return fn(*args, **kwargs)
163 """Compresses a string for transport over RPC.
165 Small amounts of data are not compressed.
170 @return: Encoded data to send
173 # Small amounts of data are not compressed
175 return (constants.RPC_ENCODING_NONE, data)
177 # Compress with zlib and encode in base64
178 return (constants.RPC_ENCODING_ZLIB_BASE64,
179 base64.b64encode(zlib.compress(data, 3)))
182 class RpcResult(object):
185 This class holds an RPC result. It is needed since in multi-node
186 calls we can't raise an exception just because one one out of many
187 failed, and therefore we use this class to encapsulate the result.
189 @ivar data: the data payload, for successful results, or None
190 @ivar call: the name of the RPC call
191 @ivar node: the name of the node to which we made the call
192 @ivar offline: whether the operation failed because the node was
193 offline, as opposed to actual failure; offline=True will always
194 imply failed=True, in order to allow simpler checking if
195 the user doesn't care about the exact failure mode
196 @ivar fail_msg: the error message if the call failed
199 def __init__(self, data=None, failed=False, offline=False,
200 call=None, node=None):
201 self.offline = offline
206 self.fail_msg = "Node is marked offline"
207 self.data = self.payload = None
209 self.fail_msg = self._EnsureErr(data)
210 self.data = self.payload = None
213 if not isinstance(self.data, (tuple, list)):
214 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
218 self.fail_msg = ("RPC layer error: invalid result length (%d), "
219 "expected 2" % len(self.data))
221 elif not self.data[0]:
222 self.fail_msg = self._EnsureErr(self.data[1])
227 self.payload = data[1]
229 for attr_name in ["call", "data", "fail_msg",
230 "node", "offline", "payload"]:
231 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
235 """Helper to ensure we return a 'True' value for error."""
239 return "No error information"
241 def Raise(self, msg, prereq=False, ecode=None):
242 """If the result has failed, raise an OpExecError.
244 This is used so that LU code doesn't have to check for each
245 result, but instead can call this function.
248 if not self.fail_msg:
251 if not msg: # one could pass None for default message
252 msg = ("Call '%s' to node '%s' has failed: %s" %
253 (self.call, self.node, self.fail_msg))
255 msg = "%s: %s" % (msg, self.fail_msg)
257 ec = errors.OpPrereqError
259 ec = errors.OpExecError
260 if ecode is not None:
264 raise ec(*args) # pylint: disable=W0142
267 def _SsconfResolver(node_list,
268 ssc=ssconf.SimpleStore,
269 nslookup_fn=netutils.Hostname.GetIP):
270 """Return addresses for given node names.
272 @type node_list: list
273 @param node_list: List of node names
275 @param ssc: SimpleStore class that is used to obtain node->ip mappings
276 @type nslookup_fn: callable
277 @param nslookup_fn: function use to do NS lookup
278 @rtype: list of tuple; (string, string)
279 @return: List of tuples containing node name and IP address
283 iplist = ss.GetNodePrimaryIPList()
284 family = ss.GetPrimaryIPFamily()
285 ipmap = dict(entry.split() for entry in iplist)
288 for node in node_list:
291 ip = nslookup_fn(node, family=family)
292 result.append((node, ip))
297 class _StaticResolver:
298 def __init__(self, addresses):
299 """Initializes this class.
302 self._addresses = addresses
304 def __call__(self, hosts):
305 """Returns static addresses for hosts.
308 assert len(hosts) == len(self._addresses)
309 return zip(hosts, self._addresses)
312 def _CheckConfigNode(name, node):
313 """Checks if a node is online.
316 @param name: Node name
317 @type node: L{objects.Node} or None
318 @param node: Node object
322 # Depend on DNS for name resolution
331 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
332 """Calculate node addresses using configuration.
335 # Special case for single-host lookups
338 return [_CheckConfigNode(name, single_node_fn(name))]
340 all_nodes = all_nodes_fn()
341 return [_CheckConfigNode(name, all_nodes.get(name, None))
346 def __init__(self, resolver, port, lock_monitor_cb=None):
347 """Initializes this class.
349 @param resolver: callable accepting a list of hostnames, returning a list
350 of tuples containing name and IP address (IP address can be the name or
351 the special value L{_OFFLINE} to mark offline machines)
353 @param port: TCP port
354 @param lock_monitor_cb: Callable for registering with lock monitor
357 self._resolver = resolver
359 self._lock_monitor_cb = lock_monitor_cb
362 def _PrepareRequests(hosts, port, procedure, body, read_timeout):
363 """Prepares requests by sorting offline hosts into separate list.
369 for (name, ip) in hosts:
371 # Node is marked as offline
372 results[name] = RpcResult(node=name, offline=True, call=procedure)
375 http.client.HttpClientRequest(str(ip), port,
376 http.HTTP_PUT, str("/%s" % procedure),
377 headers=_RPC_CLIENT_HEADERS,
379 read_timeout=read_timeout,
380 nicename="%s/%s" % (name, procedure),
381 curl_config_fn=_ConfigRpcCurl)
383 return (results, requests)
386 def _CombineResults(results, requests, procedure):
387 """Combines pre-computed results for offline hosts with actual call results.
390 for name, req in requests.items():
391 if req.success and req.resp_status_code == http.HTTP_OK:
392 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
393 node=name, call=procedure)
395 # TODO: Better error reporting
401 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
402 host_result = RpcResult(data=msg, failed=True, node=name,
405 results[name] = host_result
409 def __call__(self, hosts, procedure, body, read_timeout=None,
410 _req_process_fn=http.client.ProcessRequests):
411 """Makes an RPC request to a number of nodes.
413 @type hosts: sequence
414 @param hosts: Hostnames
415 @type procedure: string
416 @param procedure: Request path
418 @param body: Request body
419 @type read_timeout: int or None
420 @param read_timeout: Read timeout for request
423 if read_timeout is None:
424 read_timeout = _TIMEOUTS.get(procedure, None)
426 assert read_timeout is not None, \
427 "Missing RPC read timeout for procedure '%s'" % procedure
429 (results, requests) = \
430 self._PrepareRequests(self._resolver(hosts), self._port, procedure,
431 str(body), read_timeout)
433 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
435 assert not frozenset(results).intersection(requests)
437 return self._CombineResults(results, requests, procedure)
440 def _EncodeImportExportIO(ieio, ieioargs):
441 """Encodes import/export I/O information.
444 if ieio == constants.IEIO_RAW_DISK:
445 assert len(ieioargs) == 1
446 return (ieioargs[0].ToDict(), )
448 if ieio == constants.IEIO_SCRIPT:
449 assert len(ieioargs) == 2
450 return (ieioargs[0].ToDict(), ieioargs[1])
455 class RpcRunner(_generated_rpc.RpcClientDefault):
459 def __init__(self, context):
460 """Initialized the RPC runner.
462 @type context: C{masterd.GanetiContext}
463 @param context: Ganeti context
466 _generated_rpc.RpcClientDefault.__init__(self)
468 self._cfg = context.cfg
469 self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
470 self._cfg.GetNodeInfo,
471 self._cfg.GetAllNodesInfo),
472 netutils.GetDaemonPort(constants.NODED),
473 lock_monitor_cb=context.glm.AddToLockMonitor)
475 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
476 """Convert the given instance to a dict.
478 This is done via the instance's ToDict() method and additionally
479 we fill the hvparams with the cluster defaults.
481 @type instance: L{objects.Instance}
482 @param instance: an Instance object
483 @type hvp: dict or None
484 @param hvp: a dictionary with overridden hypervisor parameters
485 @type bep: dict or None
486 @param bep: a dictionary with overridden backend parameters
487 @type osp: dict or None
488 @param osp: a dictionary with overridden os parameters
490 @return: the instance dict, with the hvparams filled with the
494 idict = instance.ToDict()
495 cluster = self._cfg.GetClusterInfo()
496 idict["hvparams"] = cluster.FillHV(instance)
498 idict["hvparams"].update(hvp)
499 idict["beparams"] = cluster.FillBE(instance)
501 idict["beparams"].update(bep)
502 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
504 idict["osparams"].update(osp)
505 for nic in idict["nics"]:
506 nic['nicparams'] = objects.FillDict(
507 cluster.nicparams[constants.PP_DEFAULT],
511 def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
512 """Helper for making a multi-node call
515 body = serializer.DumpJson(args, indent=False)
516 return self._proc(node_list, procedure, body, read_timeout=read_timeout)
518 def _Call(self, node_list, procedure, timeout, args):
519 """Entry point for automatically generated RPC wrappers.
522 return self._MultiNodeCall(node_list, procedure, args, read_timeout=timeout)
525 def _StaticMultiNodeCall(node_list, procedure, args,
526 address_list=None, read_timeout=None):
527 """Helper for making a multi-node static call
530 body = serializer.DumpJson(args, indent=False)
532 if address_list is None:
533 resolver = _SsconfResolver
535 # Caller provided an address list
536 resolver = _StaticResolver(address_list)
538 proc = _RpcProcessor(resolver,
539 netutils.GetDaemonPort(constants.NODED))
540 return proc(node_list, procedure, body, read_timeout=read_timeout)
542 def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
543 """Helper for making a single-node call
546 body = serializer.DumpJson(args, indent=False)
547 return self._proc([node], procedure, body, read_timeout=read_timeout)[node]
550 def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
551 """Helper for making a single-node static call
554 body = serializer.DumpJson(args, indent=False)
555 proc = _RpcProcessor(_SsconfResolver,
556 netutils.GetDaemonPort(constants.NODED))
557 return proc([node], procedure, body, read_timeout=read_timeout)[node]
560 def _BlockdevFindPostProc(result):
561 if not result.fail_msg and result.payload is not None:
562 result.payload = objects.BlockDevStatus.FromDict(result.payload)
566 def _BlockdevGetMirrorStatusPostProc(result):
567 if not result.fail_msg:
568 result.payload = [objects.BlockDevStatus.FromDict(i)
569 for i in result.payload]
573 def _BlockdevGetMirrorStatusMultiPostProc(result):
574 for nres in result.values():
578 for idx, (success, status) in enumerate(nres.payload):
580 nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
585 def _OsGetPostProc(result):
586 if not result.fail_msg and isinstance(result.payload, dict):
587 result.payload = objects.OS.FromDict(result.payload)
591 def _PrepareFinalizeExportDisks(snap_disks):
594 for disk in snap_disks:
595 if isinstance(disk, bool):
596 flat_disks.append(disk)
598 flat_disks.append(disk.ToDict())
603 def _ImpExpStatusPostProc(result):
604 """Post-processor for import/export status.
606 @rtype: Payload containing list of L{objects.ImportExportStatus} instances
607 @return: Returns a list of the state of each named import/export or None if
608 a status couldn't be retrieved
611 if not result.fail_msg:
614 for i in result.payload:
618 decoded.append(objects.ImportExportStatus.FromDict(i))
620 result.payload = decoded
628 @_RpcTimeout(_TMO_URGENT)
629 def call_bdev_sizes(self, node_list, devices):
630 """Gets the sizes of requested block devices present on a node
632 This is a multi-node call.
635 return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
637 @_RpcTimeout(_TMO_URGENT)
638 def call_lv_list(self, node_list, vg_name):
639 """Gets the logical volumes present in a given volume group.
641 This is a multi-node call.
644 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
646 @_RpcTimeout(_TMO_URGENT)
647 def call_vg_list(self, node_list):
648 """Gets the volume group list.
650 This is a multi-node call.
653 return self._MultiNodeCall(node_list, "vg_list", [])
655 @_RpcTimeout(_TMO_NORMAL)
656 def call_storage_list(self, node_list, su_name, su_args, name, fields):
657 """Get list of storage units.
659 This is a multi-node call.
662 return self._MultiNodeCall(node_list, "storage_list",
663 [su_name, su_args, name, fields])
665 @_RpcTimeout(_TMO_NORMAL)
666 def call_storage_modify(self, node, su_name, su_args, name, changes):
667 """Modify a storage unit.
669 This is a single-node call.
672 return self._SingleNodeCall(node, "storage_modify",
673 [su_name, su_args, name, changes])
675 @_RpcTimeout(_TMO_NORMAL)
676 def call_storage_execute(self, node, su_name, su_args, name, op):
677 """Executes an operation on a storage unit.
679 This is a single-node call.
682 return self._SingleNodeCall(node, "storage_execute",
683 [su_name, su_args, name, op])
685 @_RpcTimeout(_TMO_URGENT)
686 def call_bridges_exist(self, node, bridges_list):
687 """Checks if a node has all the bridges given.
689 This method checks if all bridges given in the bridges_list are
690 present on the remote node, so that an instance that uses interfaces
691 on those bridges can be started.
693 This is a single-node call.
696 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
698 @_RpcTimeout(_TMO_NORMAL)
699 def call_instance_start(self, node, instance, hvp, bep, startup_paused):
700 """Starts an instance.
702 This is a single-node call.
705 idict = self._InstDict(instance, hvp=hvp, bep=bep)
706 return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
708 @_RpcTimeout(_TMO_NORMAL)
709 def call_instance_shutdown(self, node, instance, timeout):
710 """Stops an instance.
712 This is a single-node call.
715 return self._SingleNodeCall(node, "instance_shutdown",
716 [self._InstDict(instance), timeout])
718 @_RpcTimeout(_TMO_NORMAL)
719 def call_migration_info(self, node, instance):
720 """Gather the information necessary to prepare an instance migration.
722 This is a single-node call.
725 @param node: the node on which the instance is currently running
726 @type instance: C{objects.Instance}
727 @param instance: the instance definition
730 return self._SingleNodeCall(node, "migration_info",
731 [self._InstDict(instance)])
733 @_RpcTimeout(_TMO_NORMAL)
734 def call_accept_instance(self, node, instance, info, target):
735 """Prepare a node to accept an instance.
737 This is a single-node call.
740 @param node: the target node for the migration
741 @type instance: C{objects.Instance}
742 @param instance: the instance definition
743 @type info: opaque/hypervisor specific (string/data)
744 @param info: result for the call_migration_info call
746 @param target: target hostname (usually ip address) (on the node itself)
749 return self._SingleNodeCall(node, "accept_instance",
750 [self._InstDict(instance), info, target])
752 @_RpcTimeout(_TMO_NORMAL)
753 def call_instance_finalize_migration_dst(self, node, instance, info, success):
754 """Finalize any target-node migration specific operation.
756 This is called both in case of a successful migration and in case of error
757 (in which case it should abort the migration).
759 This is a single-node call.
762 @param node: the target node for the migration
763 @type instance: C{objects.Instance}
764 @param instance: the instance definition
765 @type info: opaque/hypervisor specific (string/data)
766 @param info: result for the call_migration_info call
767 @type success: boolean
768 @param success: whether the migration was a success or a failure
771 return self._SingleNodeCall(node, "instance_finalize_migration_dst",
772 [self._InstDict(instance), info, success])
774 @_RpcTimeout(_TMO_SLOW)
775 def call_instance_migrate(self, node, instance, target, live):
776 """Migrate an instance.
778 This is a single-node call.
781 @param node: the node on which the instance is currently running
782 @type instance: C{objects.Instance}
783 @param instance: the instance definition
785 @param target: the target node name
787 @param live: whether the migration should be done live or not (the
788 interpretation of this parameter is left to the hypervisor)
791 return self._SingleNodeCall(node, "instance_migrate",
792 [self._InstDict(instance), target, live])
794 @_RpcTimeout(_TMO_SLOW)
795 def call_instance_finalize_migration_src(self, node, instance, success, live):
796 """Finalize the instance migration on the source node.
798 This is a single-node call.
800 @type instance: L{objects.Instance}
801 @param instance: the instance that was migrated
803 @param success: whether the migration succeeded or not
805 @param live: whether the user requested a live migration or not
808 return self._SingleNodeCall(node, "instance_finalize_migration_src",
809 [self._InstDict(instance), success, live])
811 @_RpcTimeout(_TMO_SLOW)
812 def call_instance_get_migration_status(self, node, instance):
813 """Report migration status.
815 This is a single-node call that must be executed on the source node.
817 @type instance: L{objects.Instance}
818 @param instance: the instance that is being migrated
819 @rtype: L{objects.MigrationStatus}
820 @return: the status of the current migration (one of
821 L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
822 progress info that can be retrieved from the hypervisor
825 result = self._SingleNodeCall(node, "instance_get_migration_status",
826 [self._InstDict(instance)])
827 if not result.fail_msg and result.payload is not None:
828 result.payload = objects.MigrationStatus.FromDict(result.payload)
831 @_RpcTimeout(_TMO_NORMAL)
832 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
833 """Reboots an instance.
835 This is a single-node call.
838 return self._SingleNodeCall(node, "instance_reboot",
839 [self._InstDict(inst), reboot_type,
842 @_RpcTimeout(_TMO_1DAY)
843 def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
844 """Installs an OS on the given instance.
846 This is a single-node call.
849 return self._SingleNodeCall(node, "instance_os_add",
850 [self._InstDict(inst, osp=osparams),
853 @_RpcTimeout(_TMO_SLOW)
854 def call_instance_run_rename(self, node, inst, old_name, debug):
855 """Run the OS rename script for an instance.
857 This is a single-node call.
860 return self._SingleNodeCall(node, "instance_run_rename",
861 [self._InstDict(inst), old_name, debug])
863 @_RpcTimeout(_TMO_URGENT)
864 def call_instance_info(self, node, instance, hname):
865 """Returns information about a single instance.
867 This is a single-node call.
870 @param node: the list of nodes to query
871 @type instance: string
872 @param instance: the instance name
874 @param hname: the hypervisor type of the instance
877 return self._SingleNodeCall(node, "instance_info", [instance, hname])
879 @_RpcTimeout(_TMO_NORMAL)
880 def call_instance_migratable(self, node, instance):
881 """Checks whether the given instance can be migrated.
883 This is a single-node call.
885 @param node: the node to query
886 @type instance: L{objects.Instance}
887 @param instance: the instance to check
891 return self._SingleNodeCall(node, "instance_migratable",
892 [self._InstDict(instance)])
894 @_RpcTimeout(_TMO_URGENT)
895 def call_all_instances_info(self, node_list, hypervisor_list):
896 """Returns information about all instances on the given nodes.
898 This is a multi-node call.
900 @type node_list: list
901 @param node_list: the list of nodes to query
902 @type hypervisor_list: list
903 @param hypervisor_list: the hypervisors to query for instances
906 return self._MultiNodeCall(node_list, "all_instances_info",
909 @_RpcTimeout(_TMO_URGENT)
910 def call_instance_list(self, node_list, hypervisor_list):
911 """Returns the list of running instances on a given node.
913 This is a multi-node call.
915 @type node_list: list
916 @param node_list: the list of nodes to query
917 @type hypervisor_list: list
918 @param hypervisor_list: the hypervisors to query for instances
921 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
923 @_RpcTimeout(_TMO_FAST)
924 def call_node_has_ip_address(self, node, address):
925 """Checks if a node has the given IP address.
927 This is a single-node call.
930 return self._SingleNodeCall(node, "node_has_ip_address", [address])
932 @_RpcTimeout(_TMO_URGENT)
933 def call_node_info(self, node_list, vg_name, hypervisor_type):
934 """Return node information.
936 This will return memory information and volume group size and free
939 This is a multi-node call.
941 @type node_list: list
942 @param node_list: the list of nodes to query
943 @type vg_name: C{string}
944 @param vg_name: the name of the volume group to ask for disk space
946 @type hypervisor_type: C{str}
947 @param hypervisor_type: the name of the hypervisor to ask for
951 return self._MultiNodeCall(node_list, "node_info",
952 [vg_name, hypervisor_type])
954 @_RpcTimeout(_TMO_NORMAL)
955 def call_etc_hosts_modify(self, node, mode, name, ip):
956 """Modify hosts file with name
959 @param node: The node to call
961 @param mode: The mode to operate. Currently "add" or "remove"
963 @param name: The host name to be modified
965 @param ip: The ip of the entry (just valid if mode is "add")
968 return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
970 @_RpcTimeout(_TMO_NORMAL)
971 def call_node_verify(self, node_list, checkdict, cluster_name):
972 """Request verification of given parameters.
974 This is a multi-node call.
977 return self._MultiNodeCall(node_list, "node_verify",
978 [checkdict, cluster_name])
981 @_RpcTimeout(_TMO_FAST)
982 def call_node_start_master_daemons(cls, node, no_voting):
983 """Starts master daemons on a node.
985 This is a single-node call.
988 return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
992 @_RpcTimeout(_TMO_FAST)
993 def call_node_activate_master_ip(cls, node):
994 """Activates master IP on a node.
996 This is a single-node call.
999 return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
1002 @_RpcTimeout(_TMO_FAST)
1003 def call_node_stop_master(cls, node):
1004 """Deactivates master IP and stops master daemons on a node.
1006 This is a single-node call.
1009 return cls._StaticSingleNodeCall(node, "node_stop_master", [])
1012 @_RpcTimeout(_TMO_FAST)
1013 def call_node_deactivate_master_ip(cls, node):
1014 """Deactivates master IP on a node.
1016 This is a single-node call.
1019 return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
1022 @_RpcTimeout(_TMO_FAST)
1023 def call_node_change_master_netmask(cls, node, netmask):
1024 """Change master IP netmask.
1026 This is a single-node call.
1029 return cls._StaticSingleNodeCall(node, "node_change_master_netmask",
1033 @_RpcTimeout(_TMO_URGENT)
1034 def call_master_info(cls, node_list):
1035 """Query master info.
1037 This is a multi-node call.
1040 # TODO: should this method query down nodes?
1041 return cls._StaticMultiNodeCall(node_list, "master_info", [])
1044 @_RpcTimeout(_TMO_URGENT)
1045 def call_version(cls, node_list):
1046 """Query node version.
1048 This is a multi-node call.
1051 return cls._StaticMultiNodeCall(node_list, "version", [])
1053 @_RpcTimeout(_TMO_NORMAL)
1054 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
1055 """Request creation of a given block device.
1057 This is a single-node call.
1060 return self._SingleNodeCall(node, "blockdev_create",
1061 [bdev.ToDict(), size, owner, on_primary, info])
1063 @_RpcTimeout(_TMO_SLOW)
1064 def call_blockdev_wipe(self, node, bdev, offset, size):
1065 """Request wipe at given offset with given size of a block device.
1067 This is a single-node call.
1070 return self._SingleNodeCall(node, "blockdev_wipe",
1071 [bdev.ToDict(), offset, size])
1073 @_RpcTimeout(_TMO_NORMAL)
1074 def call_blockdev_remove(self, node, bdev):
1075 """Request removal of a given block device.
1077 This is a single-node call.
1080 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
1082 @_RpcTimeout(_TMO_NORMAL)
1083 def call_blockdev_rename(self, node, devlist):
1084 """Request rename of the given block devices.
1086 This is a single-node call.
1089 return self._SingleNodeCall(node, "blockdev_rename",
1090 [[(d.ToDict(), uid) for d, uid in devlist]])
1092 @_RpcTimeout(_TMO_NORMAL)
1093 def call_blockdev_pause_resume_sync(self, node, disks, pause):
1094 """Request a pause/resume of given block device.
1096 This is a single-node call.
1099 return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1100 [[bdev.ToDict() for bdev in disks], pause])
1102 @_RpcTimeout(_TMO_NORMAL)
1103 def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1104 """Request assembling of a given block device.
1106 This is a single-node call.
1109 return self._SingleNodeCall(node, "blockdev_assemble",
1110 [disk.ToDict(), owner, on_primary, idx])
1112 @_RpcTimeout(_TMO_NORMAL)
1113 def call_blockdev_shutdown(self, node, disk):
1114 """Request shutdown of a given block device.
1116 This is a single-node call.
1119 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1121 @_RpcTimeout(_TMO_NORMAL)
1122 def call_blockdev_addchildren(self, node, bdev, ndevs):
1123 """Request adding a list of children to a (mirroring) device.
1125 This is a single-node call.
1128 return self._SingleNodeCall(node, "blockdev_addchildren",
1130 [disk.ToDict() for disk in ndevs]])
1132 @_RpcTimeout(_TMO_NORMAL)
1133 def call_blockdev_removechildren(self, node, bdev, ndevs):
1134 """Request removing a list of children from a (mirroring) device.
1136 This is a single-node call.
1139 return self._SingleNodeCall(node, "blockdev_removechildren",
1141 [disk.ToDict() for disk in ndevs]])
1143 @_RpcTimeout(_TMO_NORMAL)
1144 def call_blockdev_getmirrorstatus(self, node, disks):
1145 """Request status of a (mirroring) device.
1147 This is a single-node call.
1150 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1151 [dsk.ToDict() for dsk in disks])
1152 if not result.fail_msg:
1153 result.payload = [objects.BlockDevStatus.FromDict(i)
1154 for i in result.payload]
1157 @_RpcTimeout(_TMO_NORMAL)
1158 def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1159 """Request status of (mirroring) devices from multiple nodes.
1161 This is a multi-node call.
1164 result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1165 [dict((name, [dsk.ToDict() for dsk in disks])
1166 for name, disks in node_disks.items())])
1167 for nres in result.values():
1171 for idx, (success, status) in enumerate(nres.payload):
1173 nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1177 @_RpcTimeout(_TMO_NORMAL)
1178 def call_blockdev_find(self, node, disk):
1179 """Request identification of a given block device.
1181 This is a single-node call.
1184 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1185 if not result.fail_msg and result.payload is not None:
1186 result.payload = objects.BlockDevStatus.FromDict(result.payload)
1189 @_RpcTimeout(_TMO_NORMAL)
1190 def call_blockdev_close(self, node, instance_name, disks):
1191 """Closes the given block devices.
1193 This is a single-node call.
1196 params = [instance_name, [cf.ToDict() for cf in disks]]
1197 return self._SingleNodeCall(node, "blockdev_close", params)
1199 @_RpcTimeout(_TMO_NORMAL)
1200 def call_blockdev_getsize(self, node, disks):
1201 """Returns the size of the given disks.
1203 This is a single-node call.
1206 params = [[cf.ToDict() for cf in disks]]
1207 return self._SingleNodeCall(node, "blockdev_getsize", params)
1209 @_RpcTimeout(_TMO_NORMAL)
1210 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1211 """Disconnects the network of the given drbd devices.
1213 This is a multi-node call.
1216 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1217 [nodes_ip, [cf.ToDict() for cf in disks]])
1219 @_RpcTimeout(_TMO_NORMAL)
1220 def call_drbd_attach_net(self, node_list, nodes_ip,
1221 disks, instance_name, multimaster):
1222 """Disconnects the given drbd devices.
1224 This is a multi-node call.
1227 return self._MultiNodeCall(node_list, "drbd_attach_net",
1228 [nodes_ip, [cf.ToDict() for cf in disks],
1229 instance_name, multimaster])
1231 @_RpcTimeout(_TMO_SLOW)
1232 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1233 """Waits for the synchronization of drbd devices is complete.
1235 This is a multi-node call.
1238 return self._MultiNodeCall(node_list, "drbd_wait_sync",
1239 [nodes_ip, [cf.ToDict() for cf in disks]])
1241 @_RpcTimeout(_TMO_URGENT)
1242 def call_drbd_helper(self, node_list):
1243 """Gets drbd helper.
1245 This is a multi-node call.
1248 return self._MultiNodeCall(node_list, "drbd_helper", [])
1251 @_RpcTimeout(_TMO_NORMAL)
1252 def call_upload_file(cls, node_list, file_name, address_list=None):
1255 The node will refuse the operation in case the file is not on the
1258 This is a multi-node call.
1260 @type node_list: list
1261 @param node_list: the list of node names to upload to
1262 @type file_name: str
1263 @param file_name: the filename to upload
1264 @type address_list: list or None
1265 @keyword address_list: an optional list of node addresses, in order
1266 to optimize the RPC speed
1269 file_contents = utils.ReadFile(file_name)
1270 data = _Compress(file_contents)
1271 st = os.stat(file_name)
1272 getents = runtime.GetEnts()
1273 params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1274 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1275 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1276 address_list=address_list)
1279 @_RpcTimeout(_TMO_NORMAL)
1280 def call_write_ssconf_files(cls, node_list, values):
1281 """Write ssconf files.
1283 This is a multi-node call.
1286 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1288 @_RpcTimeout(_TMO_NORMAL)
1289 def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1292 This is a single-node call.
1295 return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1296 remote_node, timeout])
1298 @_RpcTimeout(_TMO_FAST)
1299 def call_os_diagnose(self, node_list):
1300 """Request a diagnose of OS definitions.
1302 This is a multi-node call.
1305 return self._MultiNodeCall(node_list, "os_diagnose", [])
1307 @_RpcTimeout(_TMO_FAST)
1308 def call_os_get(self, node, name):
1309 """Returns an OS definition.
1311 This is a single-node call.
1314 result = self._SingleNodeCall(node, "os_get", [name])
1315 if not result.fail_msg and isinstance(result.payload, dict):
1316 result.payload = objects.OS.FromDict(result.payload)
1319 @_RpcTimeout(_TMO_FAST)
1320 def call_os_validate(self, nodes, required, name, checks, params):
1321 """Run a validation routine for a given OS.
1323 This is a multi-node call.
1326 return self._MultiNodeCall(nodes, "os_validate",
1327 [required, name, checks, params])
1329 @_RpcTimeout(_TMO_NORMAL)
1330 def call_hooks_runner(self, node_list, hpath, phase, env):
1331 """Call the hooks runner.
1334 - op: the OpCode instance
1335 - env: a dictionary with the environment
1337 This is a multi-node call.
1340 params = [hpath, phase, env]
1341 return self._MultiNodeCall(node_list, "hooks_runner", params)
1343 @_RpcTimeout(_TMO_NORMAL)
1344 def call_iallocator_runner(self, node, name, idata):
1345 """Call an iallocator on a remote node
1348 - name: the iallocator name
1349 - input: the json-encoded input string
1351 This is a single-node call.
1354 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1356 @_RpcTimeout(_TMO_NORMAL)
1357 def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1358 """Request a snapshot of the given block device.
1360 This is a single-node call.
1363 return self._SingleNodeCall(node, "blockdev_grow",
1364 [cf_bdev.ToDict(), amount, dryrun])
1366 @_RpcTimeout(_TMO_1DAY)
1367 def call_blockdev_export(self, node, cf_bdev,
1368 dest_node, dest_path, cluster_name):
1369 """Export a given disk to another node.
1371 This is a single-node call.
1374 return self._SingleNodeCall(node, "blockdev_export",
1375 [cf_bdev.ToDict(), dest_node, dest_path,
1378 @_RpcTimeout(_TMO_NORMAL)
1379 def call_blockdev_snapshot(self, node, cf_bdev):
1380 """Request a snapshot of the given block device.
1382 This is a single-node call.
1385 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1388 @_RpcTimeout(_TMO_NORMAL)
1389 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1390 """Requests a node to clean the cluster information it has.
1392 This will remove the configuration information from the ganeti data
1395 This is a single-node call.
1398 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1401 @_RpcTimeout(_TMO_FAST)
1402 def call_node_volumes(self, node_list):
1403 """Gets all volumes on node(s).
1405 This is a multi-node call.
1408 return self._MultiNodeCall(node_list, "node_volumes", [])
1410 @_RpcTimeout(_TMO_FAST)
1411 def call_node_demote_from_mc(self, node):
1412 """Demote a node from the master candidate role.
1414 This is a single-node call.
1417 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1419 @_RpcTimeout(_TMO_NORMAL)
1420 def call_node_powercycle(self, node, hypervisor):
1421 """Tries to powercycle a node.
1423 This is a single-node call.
1426 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1429 def call_test_delay(self, node_list, duration):
1430 """Sleep for a fixed time on given node(s).
1432 This is a multi-node call.
1435 return self._MultiNodeCall(node_list, "test_delay", [duration],
1436 read_timeout=int(duration + 5))
1438 @_RpcTimeout(_TMO_FAST)
1439 def call_file_storage_dir_create(self, node, file_storage_dir):
1440 """Create the given file storage directory.
1442 This is a single-node call.
1445 return self._SingleNodeCall(node, "file_storage_dir_create",
1448 @_RpcTimeout(_TMO_FAST)
1449 def call_file_storage_dir_remove(self, node, file_storage_dir):
1450 """Remove the given file storage directory.
1452 This is a single-node call.
1455 return self._SingleNodeCall(node, "file_storage_dir_remove",
1458 @_RpcTimeout(_TMO_FAST)
1459 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1460 new_file_storage_dir):
1461 """Rename file storage directory.
1463 This is a single-node call.
1466 return self._SingleNodeCall(node, "file_storage_dir_rename",
1467 [old_file_storage_dir, new_file_storage_dir])
1470 @_RpcTimeout(_TMO_URGENT)
1471 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1472 """Update job queue.
1474 This is a multi-node call.
1477 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1478 [file_name, _Compress(content)],
1479 address_list=address_list)
1482 @_RpcTimeout(_TMO_NORMAL)
1483 def call_jobqueue_purge(cls, node):
1486 This is a single-node call.
1489 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1492 @_RpcTimeout(_TMO_URGENT)
1493 def call_jobqueue_rename(cls, node_list, address_list, rename):
1494 """Rename a job queue file.
1496 This is a multi-node call.
1499 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1500 address_list=address_list)
1502 @_RpcTimeout(_TMO_NORMAL)
1503 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1504 """Validate the hypervisor params.
1506 This is a multi-node call.
1508 @type node_list: list
1509 @param node_list: the list of nodes to query
1510 @type hvname: string
1511 @param hvname: the hypervisor name
1512 @type hvparams: dict
1513 @param hvparams: the hypervisor parameters to be validated
1516 cluster = self._cfg.GetClusterInfo()
1517 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1518 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1521 @_RpcTimeout(_TMO_NORMAL)
1522 def call_x509_cert_create(self, node, validity):
1523 """Creates a new X509 certificate for SSL/TLS.
1525 This is a single-node call.
1528 @param validity: Validity in seconds
1531 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1533 @_RpcTimeout(_TMO_NORMAL)
1534 def call_x509_cert_remove(self, node, name):
1535 """Removes a X509 certificate.
1537 This is a single-node call.
1540 @param name: Certificate name
1543 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1545 @_RpcTimeout(_TMO_NORMAL)
1546 def call_import_start(self, node, opts, instance, component,
1548 """Starts a listener for an import.
1550 This is a single-node call.
1553 @param node: Node name
1554 @type instance: C{objects.Instance}
1555 @param instance: Instance object
1556 @type component: string
1557 @param component: which part of the instance is being imported
1560 return self._SingleNodeCall(node, "import_start",
1562 self._InstDict(instance), component, dest,
1563 _EncodeImportExportIO(dest, dest_args)])
1565 @_RpcTimeout(_TMO_NORMAL)
1566 def call_export_start(self, node, opts, host, port,
1567 instance, component, source, source_args):
1568 """Starts an export daemon.
1570 This is a single-node call.
1573 @param node: Node name
1574 @type instance: C{objects.Instance}
1575 @param instance: Instance object
1576 @type component: string
1577 @param component: which part of the instance is being imported
1580 return self._SingleNodeCall(node, "export_start",
1581 [opts.ToDict(), host, port,
1582 self._InstDict(instance),
1584 _EncodeImportExportIO(source, source_args)])