4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
51 # Special module generated at build time
52 from ganeti import _generated_rpc
54 # pylint has a bug here, doesn't see this import
55 import ganeti.http.client # pylint: disable=W0611
58 # Timeout for connecting to nodes (seconds)
59 _RPC_CONNECT_TIMEOUT = 5
61 _RPC_CLIENT_HEADERS = [
62 "Content-type: %s" % http.HTTP_APP_JSON,
66 # Various time constants for the timeout table
67 _TMO_URGENT = 60 # one minute
68 _TMO_FAST = 5 * 60 # five minutes
69 _TMO_NORMAL = 15 * 60 # 15 minutes
70 _TMO_SLOW = 3600 # one hour
74 # Timeout table that will be built later by decorators
75 # Guidelines for choosing timeouts:
76 # - call used during watcher: timeout -> 1min, _TMO_URGENT
77 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
78 # - other calls: 15 min, _TMO_NORMAL
79 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
84 #: Special value to describe an offline host
89 """Initializes the module-global HTTP client manager.
91 Must be called before using any RPC function and while exactly one thread is
95 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
96 # one thread running. This check is just a safety measure -- it doesn't
98 assert threading.activeCount() == 1, \
99 "Found more than one active thread when initializing pycURL"
101 logging.info("Using PycURL %s", pycurl.version)
103 pycurl.global_init(pycurl.GLOBAL_ALL)
107 """Stops the module-global HTTP client manager.
109 Must be called before quitting the program and while exactly one thread is
113 pycurl.global_cleanup()
116 def _ConfigRpcCurl(curl):
117 noded_cert = str(constants.NODED_CERT_FILE)
119 curl.setopt(pycurl.FOLLOWLOCATION, False)
120 curl.setopt(pycurl.CAINFO, noded_cert)
121 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
122 curl.setopt(pycurl.SSL_VERIFYPEER, True)
123 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
124 curl.setopt(pycurl.SSLCERT, noded_cert)
125 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
126 curl.setopt(pycurl.SSLKEY, noded_cert)
127 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
130 def _RpcTimeout(secs):
131 """Timeout decorator.
133 When applied to a rpc call_* function, it updates the global timeout
134 table with the given function/timeout.
139 assert name.startswith("call_")
140 _TIMEOUTS[name[len("call_"):]] = secs
146 """RPC-wrapper decorator.
148 When applied to a function, it runs it with the RPC system
149 initialized, and it shutsdown the system afterwards. This means the
150 function must be called without RPC being initialized.
153 def wrapper(*args, **kwargs):
156 return fn(*args, **kwargs)
163 """Compresses a string for transport over RPC.
165 Small amounts of data are not compressed.
170 @return: Encoded data to send
173 # Small amounts of data are not compressed
175 return (constants.RPC_ENCODING_NONE, data)
177 # Compress with zlib and encode in base64
178 return (constants.RPC_ENCODING_ZLIB_BASE64,
179 base64.b64encode(zlib.compress(data, 3)))
182 class RpcResult(object):
185 This class holds an RPC result. It is needed since in multi-node
186 calls we can't raise an exception just because one one out of many
187 failed, and therefore we use this class to encapsulate the result.
189 @ivar data: the data payload, for successful results, or None
190 @ivar call: the name of the RPC call
191 @ivar node: the name of the node to which we made the call
192 @ivar offline: whether the operation failed because the node was
193 offline, as opposed to actual failure; offline=True will always
194 imply failed=True, in order to allow simpler checking if
195 the user doesn't care about the exact failure mode
196 @ivar fail_msg: the error message if the call failed
199 def __init__(self, data=None, failed=False, offline=False,
200 call=None, node=None):
201 self.offline = offline
206 self.fail_msg = "Node is marked offline"
207 self.data = self.payload = None
209 self.fail_msg = self._EnsureErr(data)
210 self.data = self.payload = None
213 if not isinstance(self.data, (tuple, list)):
214 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
218 self.fail_msg = ("RPC layer error: invalid result length (%d), "
219 "expected 2" % len(self.data))
221 elif not self.data[0]:
222 self.fail_msg = self._EnsureErr(self.data[1])
227 self.payload = data[1]
229 for attr_name in ["call", "data", "fail_msg",
230 "node", "offline", "payload"]:
231 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
235 """Helper to ensure we return a 'True' value for error."""
239 return "No error information"
241 def Raise(self, msg, prereq=False, ecode=None):
242 """If the result has failed, raise an OpExecError.
244 This is used so that LU code doesn't have to check for each
245 result, but instead can call this function.
248 if not self.fail_msg:
251 if not msg: # one could pass None for default message
252 msg = ("Call '%s' to node '%s' has failed: %s" %
253 (self.call, self.node, self.fail_msg))
255 msg = "%s: %s" % (msg, self.fail_msg)
257 ec = errors.OpPrereqError
259 ec = errors.OpExecError
260 if ecode is not None:
264 raise ec(*args) # pylint: disable=W0142
267 def _SsconfResolver(node_list,
268 ssc=ssconf.SimpleStore,
269 nslookup_fn=netutils.Hostname.GetIP):
270 """Return addresses for given node names.
272 @type node_list: list
273 @param node_list: List of node names
275 @param ssc: SimpleStore class that is used to obtain node->ip mappings
276 @type nslookup_fn: callable
277 @param nslookup_fn: function use to do NS lookup
278 @rtype: list of tuple; (string, string)
279 @return: List of tuples containing node name and IP address
283 iplist = ss.GetNodePrimaryIPList()
284 family = ss.GetPrimaryIPFamily()
285 ipmap = dict(entry.split() for entry in iplist)
288 for node in node_list:
291 ip = nslookup_fn(node, family=family)
292 result.append((node, ip))
297 class _StaticResolver:
298 def __init__(self, addresses):
299 """Initializes this class.
302 self._addresses = addresses
304 def __call__(self, hosts):
305 """Returns static addresses for hosts.
308 assert len(hosts) == len(self._addresses)
309 return zip(hosts, self._addresses)
312 def _CheckConfigNode(name, node):
313 """Checks if a node is online.
316 @param name: Node name
317 @type node: L{objects.Node} or None
318 @param node: Node object
322 # Depend on DNS for name resolution
331 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
332 """Calculate node addresses using configuration.
335 # Special case for single-host lookups
338 return [_CheckConfigNode(name, single_node_fn(name))]
340 all_nodes = all_nodes_fn()
341 return [_CheckConfigNode(name, all_nodes.get(name, None))
346 def __init__(self, resolver, port, lock_monitor_cb=None):
347 """Initializes this class.
349 @param resolver: callable accepting a list of hostnames, returning a list
350 of tuples containing name and IP address (IP address can be the name or
351 the special value L{_OFFLINE} to mark offline machines)
353 @param port: TCP port
354 @param lock_monitor_cb: Callable for registering with lock monitor
357 self._resolver = resolver
359 self._lock_monitor_cb = lock_monitor_cb
362 def _PrepareRequests(hosts, port, procedure, body, read_timeout):
363 """Prepares requests by sorting offline hosts into separate list.
369 for (name, ip) in hosts:
371 # Node is marked as offline
372 results[name] = RpcResult(node=name, offline=True, call=procedure)
375 http.client.HttpClientRequest(str(ip), port,
376 http.HTTP_PUT, str("/%s" % procedure),
377 headers=_RPC_CLIENT_HEADERS,
379 read_timeout=read_timeout,
380 nicename="%s/%s" % (name, procedure),
381 curl_config_fn=_ConfigRpcCurl)
383 return (results, requests)
386 def _CombineResults(results, requests, procedure):
387 """Combines pre-computed results for offline hosts with actual call results.
390 for name, req in requests.items():
391 if req.success and req.resp_status_code == http.HTTP_OK:
392 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
393 node=name, call=procedure)
395 # TODO: Better error reporting
401 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
402 host_result = RpcResult(data=msg, failed=True, node=name,
405 results[name] = host_result
409 def __call__(self, hosts, procedure, body, read_timeout=None,
410 _req_process_fn=http.client.ProcessRequests):
411 """Makes an RPC request to a number of nodes.
413 @type hosts: sequence
414 @param hosts: Hostnames
415 @type procedure: string
416 @param procedure: Request path
418 @param body: Request body
419 @type read_timeout: int or None
420 @param read_timeout: Read timeout for request
423 if read_timeout is None:
424 read_timeout = _TIMEOUTS.get(procedure, None)
426 assert read_timeout is not None, \
427 "Missing RPC read timeout for procedure '%s'" % procedure
429 (results, requests) = \
430 self._PrepareRequests(self._resolver(hosts), self._port, procedure,
431 str(body), read_timeout)
433 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
435 assert not frozenset(results).intersection(requests)
437 return self._CombineResults(results, requests, procedure)
440 def _EncodeImportExportIO(ieio, ieioargs):
441 """Encodes import/export I/O information.
444 if ieio == constants.IEIO_RAW_DISK:
445 assert len(ieioargs) == 1
446 return (ieioargs[0].ToDict(), )
448 if ieio == constants.IEIO_SCRIPT:
449 assert len(ieioargs) == 2
450 return (ieioargs[0].ToDict(), ieioargs[1])
455 class RpcRunner(_generated_rpc.RpcClientDefault):
459 def __init__(self, context):
460 """Initialized the RPC runner.
462 @type context: C{masterd.GanetiContext}
463 @param context: Ganeti context
466 _generated_rpc.RpcClientDefault.__init__(self)
468 self._cfg = context.cfg
469 self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
470 self._cfg.GetNodeInfo,
471 self._cfg.GetAllNodesInfo),
472 netutils.GetDaemonPort(constants.NODED),
473 lock_monitor_cb=context.glm.AddToLockMonitor)
475 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
476 """Convert the given instance to a dict.
478 This is done via the instance's ToDict() method and additionally
479 we fill the hvparams with the cluster defaults.
481 @type instance: L{objects.Instance}
482 @param instance: an Instance object
483 @type hvp: dict or None
484 @param hvp: a dictionary with overridden hypervisor parameters
485 @type bep: dict or None
486 @param bep: a dictionary with overridden backend parameters
487 @type osp: dict or None
488 @param osp: a dictionary with overridden os parameters
490 @return: the instance dict, with the hvparams filled with the
494 idict = instance.ToDict()
495 cluster = self._cfg.GetClusterInfo()
496 idict["hvparams"] = cluster.FillHV(instance)
498 idict["hvparams"].update(hvp)
499 idict["beparams"] = cluster.FillBE(instance)
501 idict["beparams"].update(bep)
502 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
504 idict["osparams"].update(osp)
505 for nic in idict["nics"]:
506 nic['nicparams'] = objects.FillDict(
507 cluster.nicparams[constants.PP_DEFAULT],
511 def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
512 """Helper for making a multi-node call
515 body = serializer.DumpJson(args, indent=False)
516 return self._proc(node_list, procedure, body, read_timeout=read_timeout)
518 def _Call(self, node_list, procedure, timeout, args):
519 """Entry point for automatically generated RPC wrappers.
522 return self._MultiNodeCall(node_list, procedure, args, read_timeout=timeout)
525 def _StaticMultiNodeCall(node_list, procedure, args,
526 address_list=None, read_timeout=None):
527 """Helper for making a multi-node static call
530 body = serializer.DumpJson(args, indent=False)
532 if address_list is None:
533 resolver = _SsconfResolver
535 # Caller provided an address list
536 resolver = _StaticResolver(address_list)
538 proc = _RpcProcessor(resolver,
539 netutils.GetDaemonPort(constants.NODED))
540 return proc(node_list, procedure, body, read_timeout=read_timeout)
542 def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
543 """Helper for making a single-node call
546 body = serializer.DumpJson(args, indent=False)
547 return self._proc([node], procedure, body, read_timeout=read_timeout)[node]
550 def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
551 """Helper for making a single-node static call
554 body = serializer.DumpJson(args, indent=False)
555 proc = _RpcProcessor(_SsconfResolver,
556 netutils.GetDaemonPort(constants.NODED))
557 return proc([node], procedure, body, read_timeout=read_timeout)[node]
560 def _BlockdevFindPostProc(result):
561 if not result.fail_msg and result.payload is not None:
562 result.payload = objects.BlockDevStatus.FromDict(result.payload)
566 def _BlockdevGetMirrorStatusPostProc(result):
567 if not result.fail_msg:
568 result.payload = [objects.BlockDevStatus.FromDict(i)
569 for i in result.payload]
573 def _BlockdevGetMirrorStatusMultiPostProc(result):
574 for nres in result.values():
578 for idx, (success, status) in enumerate(nres.payload):
580 nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
585 def _OsGetPostProc(result):
586 if not result.fail_msg and isinstance(result.payload, dict):
587 result.payload = objects.OS.FromDict(result.payload)
591 def _PrepareFinalizeExportDisks(snap_disks):
594 for disk in snap_disks:
595 if isinstance(disk, bool):
596 flat_disks.append(disk)
598 flat_disks.append(disk.ToDict())
603 def _ImpExpStatusPostProc(result):
604 """Post-processor for import/export status.
606 @rtype: Payload containing list of L{objects.ImportExportStatus} instances
607 @return: Returns a list of the state of each named import/export or None if
608 a status couldn't be retrieved
611 if not result.fail_msg:
614 for i in result.payload:
618 decoded.append(objects.ImportExportStatus.FromDict(i))
620 result.payload = decoded
628 @_RpcTimeout(_TMO_URGENT)
629 def call_bdev_sizes(self, node_list, devices):
630 """Gets the sizes of requested block devices present on a node
632 This is a multi-node call.
635 return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
637 @_RpcTimeout(_TMO_NORMAL)
638 def call_storage_list(self, node_list, su_name, su_args, name, fields):
639 """Get list of storage units.
641 This is a multi-node call.
644 return self._MultiNodeCall(node_list, "storage_list",
645 [su_name, su_args, name, fields])
647 @_RpcTimeout(_TMO_NORMAL)
648 def call_storage_modify(self, node, su_name, su_args, name, changes):
649 """Modify a storage unit.
651 This is a single-node call.
654 return self._SingleNodeCall(node, "storage_modify",
655 [su_name, su_args, name, changes])
657 @_RpcTimeout(_TMO_NORMAL)
658 def call_storage_execute(self, node, su_name, su_args, name, op):
659 """Executes an operation on a storage unit.
661 This is a single-node call.
664 return self._SingleNodeCall(node, "storage_execute",
665 [su_name, su_args, name, op])
667 @_RpcTimeout(_TMO_NORMAL)
668 def call_instance_start(self, node, instance, hvp, bep, startup_paused):
669 """Starts an instance.
671 This is a single-node call.
674 idict = self._InstDict(instance, hvp=hvp, bep=bep)
675 return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
677 @_RpcTimeout(_TMO_NORMAL)
678 def call_instance_shutdown(self, node, instance, timeout):
679 """Stops an instance.
681 This is a single-node call.
684 return self._SingleNodeCall(node, "instance_shutdown",
685 [self._InstDict(instance), timeout])
687 @_RpcTimeout(_TMO_NORMAL)
688 def call_migration_info(self, node, instance):
689 """Gather the information necessary to prepare an instance migration.
691 This is a single-node call.
694 @param node: the node on which the instance is currently running
695 @type instance: C{objects.Instance}
696 @param instance: the instance definition
699 return self._SingleNodeCall(node, "migration_info",
700 [self._InstDict(instance)])
702 @_RpcTimeout(_TMO_NORMAL)
703 def call_accept_instance(self, node, instance, info, target):
704 """Prepare a node to accept an instance.
706 This is a single-node call.
709 @param node: the target node for the migration
710 @type instance: C{objects.Instance}
711 @param instance: the instance definition
712 @type info: opaque/hypervisor specific (string/data)
713 @param info: result for the call_migration_info call
715 @param target: target hostname (usually ip address) (on the node itself)
718 return self._SingleNodeCall(node, "accept_instance",
719 [self._InstDict(instance), info, target])
721 @_RpcTimeout(_TMO_NORMAL)
722 def call_instance_finalize_migration_dst(self, node, instance, info, success):
723 """Finalize any target-node migration specific operation.
725 This is called both in case of a successful migration and in case of error
726 (in which case it should abort the migration).
728 This is a single-node call.
731 @param node: the target node for the migration
732 @type instance: C{objects.Instance}
733 @param instance: the instance definition
734 @type info: opaque/hypervisor specific (string/data)
735 @param info: result for the call_migration_info call
736 @type success: boolean
737 @param success: whether the migration was a success or a failure
740 return self._SingleNodeCall(node, "instance_finalize_migration_dst",
741 [self._InstDict(instance), info, success])
743 @_RpcTimeout(_TMO_SLOW)
744 def call_instance_migrate(self, node, instance, target, live):
745 """Migrate an instance.
747 This is a single-node call.
750 @param node: the node on which the instance is currently running
751 @type instance: C{objects.Instance}
752 @param instance: the instance definition
754 @param target: the target node name
756 @param live: whether the migration should be done live or not (the
757 interpretation of this parameter is left to the hypervisor)
760 return self._SingleNodeCall(node, "instance_migrate",
761 [self._InstDict(instance), target, live])
763 @_RpcTimeout(_TMO_SLOW)
764 def call_instance_finalize_migration_src(self, node, instance, success, live):
765 """Finalize the instance migration on the source node.
767 This is a single-node call.
769 @type instance: L{objects.Instance}
770 @param instance: the instance that was migrated
772 @param success: whether the migration succeeded or not
774 @param live: whether the user requested a live migration or not
777 return self._SingleNodeCall(node, "instance_finalize_migration_src",
778 [self._InstDict(instance), success, live])
780 @_RpcTimeout(_TMO_SLOW)
781 def call_instance_get_migration_status(self, node, instance):
782 """Report migration status.
784 This is a single-node call that must be executed on the source node.
786 @type instance: L{objects.Instance}
787 @param instance: the instance that is being migrated
788 @rtype: L{objects.MigrationStatus}
789 @return: the status of the current migration (one of
790 L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
791 progress info that can be retrieved from the hypervisor
794 result = self._SingleNodeCall(node, "instance_get_migration_status",
795 [self._InstDict(instance)])
796 if not result.fail_msg and result.payload is not None:
797 result.payload = objects.MigrationStatus.FromDict(result.payload)
800 @_RpcTimeout(_TMO_NORMAL)
801 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
802 """Reboots an instance.
804 This is a single-node call.
807 return self._SingleNodeCall(node, "instance_reboot",
808 [self._InstDict(inst), reboot_type,
811 @_RpcTimeout(_TMO_1DAY)
812 def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
813 """Installs an OS on the given instance.
815 This is a single-node call.
818 return self._SingleNodeCall(node, "instance_os_add",
819 [self._InstDict(inst, osp=osparams),
822 @_RpcTimeout(_TMO_SLOW)
823 def call_instance_run_rename(self, node, inst, old_name, debug):
824 """Run the OS rename script for an instance.
826 This is a single-node call.
829 return self._SingleNodeCall(node, "instance_run_rename",
830 [self._InstDict(inst), old_name, debug])
832 @_RpcTimeout(_TMO_URGENT)
833 def call_instance_info(self, node, instance, hname):
834 """Returns information about a single instance.
836 This is a single-node call.
839 @param node: the list of nodes to query
840 @type instance: string
841 @param instance: the instance name
843 @param hname: the hypervisor type of the instance
846 return self._SingleNodeCall(node, "instance_info", [instance, hname])
848 @_RpcTimeout(_TMO_NORMAL)
849 def call_instance_migratable(self, node, instance):
850 """Checks whether the given instance can be migrated.
852 This is a single-node call.
854 @param node: the node to query
855 @type instance: L{objects.Instance}
856 @param instance: the instance to check
860 return self._SingleNodeCall(node, "instance_migratable",
861 [self._InstDict(instance)])
863 @_RpcTimeout(_TMO_URGENT)
864 def call_all_instances_info(self, node_list, hypervisor_list):
865 """Returns information about all instances on the given nodes.
867 This is a multi-node call.
869 @type node_list: list
870 @param node_list: the list of nodes to query
871 @type hypervisor_list: list
872 @param hypervisor_list: the hypervisors to query for instances
875 return self._MultiNodeCall(node_list, "all_instances_info",
878 @_RpcTimeout(_TMO_URGENT)
879 def call_instance_list(self, node_list, hypervisor_list):
880 """Returns the list of running instances on a given node.
882 This is a multi-node call.
884 @type node_list: list
885 @param node_list: the list of nodes to query
886 @type hypervisor_list: list
887 @param hypervisor_list: the hypervisors to query for instances
890 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
893 @_RpcTimeout(_TMO_FAST)
894 def call_node_start_master_daemons(cls, node, no_voting):
895 """Starts master daemons on a node.
897 This is a single-node call.
900 return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
904 @_RpcTimeout(_TMO_FAST)
905 def call_node_activate_master_ip(cls, node):
906 """Activates master IP on a node.
908 This is a single-node call.
911 return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
914 @_RpcTimeout(_TMO_FAST)
915 def call_node_stop_master(cls, node):
916 """Deactivates master IP and stops master daemons on a node.
918 This is a single-node call.
921 return cls._StaticSingleNodeCall(node, "node_stop_master", [])
924 @_RpcTimeout(_TMO_FAST)
925 def call_node_deactivate_master_ip(cls, node):
926 """Deactivates master IP on a node.
928 This is a single-node call.
931 return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
934 @_RpcTimeout(_TMO_FAST)
935 def call_node_change_master_netmask(cls, node, netmask):
936 """Change master IP netmask.
938 This is a single-node call.
941 return cls._StaticSingleNodeCall(node, "node_change_master_netmask",
945 @_RpcTimeout(_TMO_URGENT)
946 def call_master_info(cls, node_list):
947 """Query master info.
949 This is a multi-node call.
952 # TODO: should this method query down nodes?
953 return cls._StaticMultiNodeCall(node_list, "master_info", [])
956 @_RpcTimeout(_TMO_URGENT)
957 def call_version(cls, node_list):
958 """Query node version.
960 This is a multi-node call.
963 return cls._StaticMultiNodeCall(node_list, "version", [])
965 @_RpcTimeout(_TMO_NORMAL)
966 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
967 """Request creation of a given block device.
969 This is a single-node call.
972 return self._SingleNodeCall(node, "blockdev_create",
973 [bdev.ToDict(), size, owner, on_primary, info])
975 @_RpcTimeout(_TMO_SLOW)
976 def call_blockdev_wipe(self, node, bdev, offset, size):
977 """Request wipe at given offset with given size of a block device.
979 This is a single-node call.
982 return self._SingleNodeCall(node, "blockdev_wipe",
983 [bdev.ToDict(), offset, size])
985 @_RpcTimeout(_TMO_NORMAL)
986 def call_blockdev_remove(self, node, bdev):
987 """Request removal of a given block device.
989 This is a single-node call.
992 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
994 @_RpcTimeout(_TMO_NORMAL)
995 def call_blockdev_rename(self, node, devlist):
996 """Request rename of the given block devices.
998 This is a single-node call.
1001 return self._SingleNodeCall(node, "blockdev_rename",
1002 [[(d.ToDict(), uid) for d, uid in devlist]])
1004 @_RpcTimeout(_TMO_NORMAL)
1005 def call_blockdev_pause_resume_sync(self, node, disks, pause):
1006 """Request a pause/resume of given block device.
1008 This is a single-node call.
1011 return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1012 [[bdev.ToDict() for bdev in disks], pause])
1014 @_RpcTimeout(_TMO_NORMAL)
1015 def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1016 """Request assembling of a given block device.
1018 This is a single-node call.
1021 return self._SingleNodeCall(node, "blockdev_assemble",
1022 [disk.ToDict(), owner, on_primary, idx])
1024 @_RpcTimeout(_TMO_NORMAL)
1025 def call_blockdev_shutdown(self, node, disk):
1026 """Request shutdown of a given block device.
1028 This is a single-node call.
1031 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1033 @_RpcTimeout(_TMO_NORMAL)
1034 def call_blockdev_addchildren(self, node, bdev, ndevs):
1035 """Request adding a list of children to a (mirroring) device.
1037 This is a single-node call.
1040 return self._SingleNodeCall(node, "blockdev_addchildren",
1042 [disk.ToDict() for disk in ndevs]])
1044 @_RpcTimeout(_TMO_NORMAL)
1045 def call_blockdev_removechildren(self, node, bdev, ndevs):
1046 """Request removing a list of children from a (mirroring) device.
1048 This is a single-node call.
1051 return self._SingleNodeCall(node, "blockdev_removechildren",
1053 [disk.ToDict() for disk in ndevs]])
1055 @_RpcTimeout(_TMO_NORMAL)
1056 def call_blockdev_getmirrorstatus(self, node, disks):
1057 """Request status of a (mirroring) device.
1059 This is a single-node call.
1062 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1063 [dsk.ToDict() for dsk in disks])
1064 if not result.fail_msg:
1065 result.payload = [objects.BlockDevStatus.FromDict(i)
1066 for i in result.payload]
1069 @_RpcTimeout(_TMO_NORMAL)
1070 def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1071 """Request status of (mirroring) devices from multiple nodes.
1073 This is a multi-node call.
1076 result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1077 [dict((name, [dsk.ToDict() for dsk in disks])
1078 for name, disks in node_disks.items())])
1079 for nres in result.values():
1083 for idx, (success, status) in enumerate(nres.payload):
1085 nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1089 @_RpcTimeout(_TMO_NORMAL)
1090 def call_blockdev_find(self, node, disk):
1091 """Request identification of a given block device.
1093 This is a single-node call.
1096 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1097 if not result.fail_msg and result.payload is not None:
1098 result.payload = objects.BlockDevStatus.FromDict(result.payload)
1101 @_RpcTimeout(_TMO_NORMAL)
1102 def call_blockdev_close(self, node, instance_name, disks):
1103 """Closes the given block devices.
1105 This is a single-node call.
1108 params = [instance_name, [cf.ToDict() for cf in disks]]
1109 return self._SingleNodeCall(node, "blockdev_close", params)
1111 @_RpcTimeout(_TMO_NORMAL)
1112 def call_blockdev_getsize(self, node, disks):
1113 """Returns the size of the given disks.
1115 This is a single-node call.
1118 params = [[cf.ToDict() for cf in disks]]
1119 return self._SingleNodeCall(node, "blockdev_getsize", params)
1121 @_RpcTimeout(_TMO_NORMAL)
1122 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1123 """Disconnects the network of the given drbd devices.
1125 This is a multi-node call.
1128 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1129 [nodes_ip, [cf.ToDict() for cf in disks]])
1131 @_RpcTimeout(_TMO_NORMAL)
1132 def call_drbd_attach_net(self, node_list, nodes_ip,
1133 disks, instance_name, multimaster):
1134 """Disconnects the given drbd devices.
1136 This is a multi-node call.
1139 return self._MultiNodeCall(node_list, "drbd_attach_net",
1140 [nodes_ip, [cf.ToDict() for cf in disks],
1141 instance_name, multimaster])
1143 @_RpcTimeout(_TMO_SLOW)
1144 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1145 """Waits for the synchronization of drbd devices is complete.
1147 This is a multi-node call.
1150 return self._MultiNodeCall(node_list, "drbd_wait_sync",
1151 [nodes_ip, [cf.ToDict() for cf in disks]])
1153 @_RpcTimeout(_TMO_URGENT)
1154 def call_drbd_helper(self, node_list):
1155 """Gets drbd helper.
1157 This is a multi-node call.
1160 return self._MultiNodeCall(node_list, "drbd_helper", [])
1163 @_RpcTimeout(_TMO_NORMAL)
1164 def call_upload_file(cls, node_list, file_name, address_list=None):
1167 The node will refuse the operation in case the file is not on the
1170 This is a multi-node call.
1172 @type node_list: list
1173 @param node_list: the list of node names to upload to
1174 @type file_name: str
1175 @param file_name: the filename to upload
1176 @type address_list: list or None
1177 @keyword address_list: an optional list of node addresses, in order
1178 to optimize the RPC speed
1181 file_contents = utils.ReadFile(file_name)
1182 data = _Compress(file_contents)
1183 st = os.stat(file_name)
1184 getents = runtime.GetEnts()
1185 params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1186 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1187 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1188 address_list=address_list)
1191 @_RpcTimeout(_TMO_NORMAL)
1192 def call_write_ssconf_files(cls, node_list, values):
1193 """Write ssconf files.
1195 This is a multi-node call.
1198 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1200 @_RpcTimeout(_TMO_NORMAL)
1201 def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1202 """Request a snapshot of the given block device.
1204 This is a single-node call.
1207 return self._SingleNodeCall(node, "blockdev_grow",
1208 [cf_bdev.ToDict(), amount, dryrun])
1210 @_RpcTimeout(_TMO_1DAY)
1211 def call_blockdev_export(self, node, cf_bdev,
1212 dest_node, dest_path, cluster_name):
1213 """Export a given disk to another node.
1215 This is a single-node call.
1218 return self._SingleNodeCall(node, "blockdev_export",
1219 [cf_bdev.ToDict(), dest_node, dest_path,
1222 @_RpcTimeout(_TMO_NORMAL)
1223 def call_blockdev_snapshot(self, node, cf_bdev):
1224 """Request a snapshot of the given block device.
1226 This is a single-node call.
1229 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1232 @_RpcTimeout(_TMO_NORMAL)
1233 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1234 """Requests a node to clean the cluster information it has.
1236 This will remove the configuration information from the ganeti data
1239 This is a single-node call.
1242 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1245 def call_test_delay(self, node_list, duration, read_timeout=None):
1246 """Sleep for a fixed time on given node(s).
1248 This is a multi-node call.
1251 assert read_timeout is None
1252 return self.call_test_delay(node_list, duration,
1253 read_timeout=int(duration + 5))
1256 @_RpcTimeout(_TMO_URGENT)
1257 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1258 """Update job queue.
1260 This is a multi-node call.
1263 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1264 [file_name, _Compress(content)],
1265 address_list=address_list)
1268 @_RpcTimeout(_TMO_NORMAL)
1269 def call_jobqueue_purge(cls, node):
1272 This is a single-node call.
1275 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1278 @_RpcTimeout(_TMO_URGENT)
1279 def call_jobqueue_rename(cls, node_list, address_list, rename):
1280 """Rename a job queue file.
1282 This is a multi-node call.
1285 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1286 address_list=address_list)
1288 @_RpcTimeout(_TMO_NORMAL)
1289 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1290 """Validate the hypervisor params.
1292 This is a multi-node call.
1294 @type node_list: list
1295 @param node_list: the list of nodes to query
1296 @type hvname: string
1297 @param hvname: the hypervisor name
1298 @type hvparams: dict
1299 @param hvparams: the hypervisor parameters to be validated
1302 cluster = self._cfg.GetClusterInfo()
1303 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1304 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1307 @_RpcTimeout(_TMO_NORMAL)
1308 def call_import_start(self, node, opts, instance, component,
1310 """Starts a listener for an import.
1312 This is a single-node call.
1315 @param node: Node name
1316 @type instance: C{objects.Instance}
1317 @param instance: Instance object
1318 @type component: string
1319 @param component: which part of the instance is being imported
1322 return self._SingleNodeCall(node, "import_start",
1324 self._InstDict(instance), component, dest,
1325 _EncodeImportExportIO(dest, dest_args)])
1327 @_RpcTimeout(_TMO_NORMAL)
1328 def call_export_start(self, node, opts, host, port,
1329 instance, component, source, source_args):
1330 """Starts an export daemon.
1332 This is a single-node call.
1335 @param node: Node name
1336 @type instance: C{objects.Instance}
1337 @param instance: Instance object
1338 @type component: string
1339 @param component: which part of the instance is being imported
1342 return self._SingleNodeCall(node, "export_start",
1343 [opts.ToDict(), host, port,
1344 self._InstDict(instance),
1346 _EncodeImportExportIO(source, source_args)])