4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
45 # pylint has a bug here, doesn't see this import
46 import ganeti.http.client # pylint: disable-msg=W0611
49 # Module level variable
52 # Various time constants for the timeout table
53 _TMO_URGENT = 60 # one minute
54 _TMO_FAST = 5 * 60 # five minutes
55 _TMO_NORMAL = 15 * 60 # 15 minutes
56 _TMO_SLOW = 3600 # one hour
60 # Timeout table that will be built later by decorators
61 # Guidelines for choosing timeouts:
62 # - call used during watcher: timeout -> 1min, _TMO_URGENT
63 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
64 # - other calls: 15 min, _TMO_NORMAL
65 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
72 """Initializes the module-global HTTP client manager.
74 Must be called before using any RPC function.
77 global _http_manager # pylint: disable-msg=W0603
79 assert not _http_manager, "RPC module initialized more than once"
83 _http_manager = http.client.HttpClientManager()
87 """Stops the module-global HTTP client manager.
89 Must be called before quitting the program.
92 global _http_manager # pylint: disable-msg=W0603
95 _http_manager.Shutdown()
99 def _RpcTimeout(secs):
100 """Timeout decorator.
102 When applied to a rpc call_* function, it updates the global timeout
103 table with the given function/timeout.
108 assert name.startswith("call_")
109 _TIMEOUTS[name[len("call_"):]] = secs
114 class RpcResult(object):
117 This class holds an RPC result. It is needed since in multi-node
118 calls we can't raise an exception just because one one out of many
119 failed, and therefore we use this class to encapsulate the result.
121 @ivar data: the data payload, for successful results, or None
122 @ivar call: the name of the RPC call
123 @ivar node: the name of the node to which we made the call
124 @ivar offline: whether the operation failed because the node was
125 offline, as opposed to actual failure; offline=True will always
126 imply failed=True, in order to allow simpler checking if
127 the user doesn't care about the exact failure mode
128 @ivar fail_msg: the error message if the call failed
131 def __init__(self, data=None, failed=False, offline=False,
132 call=None, node=None):
133 self.offline = offline
138 self.fail_msg = "Node is marked offline"
139 self.data = self.payload = None
141 self.fail_msg = self._EnsureErr(data)
142 self.data = self.payload = None
145 if not isinstance(self.data, (tuple, list)):
146 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
150 self.fail_msg = ("RPC layer error: invalid result length (%d), "
151 "expected 2" % len(self.data))
153 elif not self.data[0]:
154 self.fail_msg = self._EnsureErr(self.data[1])
159 self.payload = data[1]
161 assert hasattr(self, "call")
162 assert hasattr(self, "data")
163 assert hasattr(self, "fail_msg")
164 assert hasattr(self, "node")
165 assert hasattr(self, "offline")
166 assert hasattr(self, "payload")
170 """Helper to ensure we return a 'True' value for error."""
174 return "No error information"
176 def Raise(self, msg, prereq=False, ecode=None):
177 """If the result has failed, raise an OpExecError.
179 This is used so that LU code doesn't have to check for each
180 result, but instead can call this function.
183 if not self.fail_msg:
186 if not msg: # one could pass None for default message
187 msg = ("Call '%s' to node '%s' has failed: %s" %
188 (self.call, self.node, self.fail_msg))
190 msg = "%s: %s" % (msg, self.fail_msg)
192 ec = errors.OpPrereqError
194 ec = errors.OpExecError
195 if ecode is not None:
199 raise ec(*args) # pylint: disable-msg=W0142
205 This class, given a (remote) method name, a list of parameters and a
206 list of nodes, will contact (in parallel) all nodes, and return a
207 dict of results (key: node name, value: result).
209 One current bug is that generic failure is still signaled by
210 'False' result, which is not good. This overloading of values can
214 def __init__(self, procedure, body, port):
215 assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
217 self.procedure = procedure
223 http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
224 ssl_cert_path=constants.NODED_CERT_FILE)
226 def ConnectList(self, node_list, address_list=None, read_timeout=None):
227 """Add a list of nodes to the target nodes.
229 @type node_list: list
230 @param node_list: the list of node names to connect
231 @type address_list: list or None
232 @keyword address_list: either None or a list with node addresses,
233 which must have the same length as the node list
234 @type read_timeout: int
235 @param read_timeout: overwrites the default read timeout for the
239 if address_list is None:
240 address_list = [None for _ in node_list]
242 assert len(node_list) == len(address_list), \
243 "Name and address lists should have the same length"
244 for node, address in zip(node_list, address_list):
245 self.ConnectNode(node, address, read_timeout=read_timeout)
247 def ConnectNode(self, name, address=None, read_timeout=None):
248 """Add a node to the target list.
251 @param name: the node name
253 @keyword address: the node address, if known
259 if read_timeout is None:
260 read_timeout = _TIMEOUTS[self.procedure]
263 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
264 "/%s" % self.procedure,
266 ssl_params=self._ssl_params,
267 ssl_verify_peer=True,
268 read_timeout=read_timeout)
270 def GetResults(self):
271 """Call nodes and return results.
274 @return: List of RPC results
277 assert _http_manager, "RPC module not initialized"
279 _http_manager.ExecRequests(self.nc.values())
283 for name, req in self.nc.iteritems():
284 if req.success and req.resp_status_code == http.HTTP_OK:
285 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
286 node=name, call=self.procedure)
289 # TODO: Better error reporting
295 logging.error("RPC error in %s from node %s: %s",
296 self.procedure, name, msg)
297 results[name] = RpcResult(data=msg, failed=True, node=name,
303 def _EncodeImportExportIO(ieio, ieioargs):
304 """Encodes import/export I/O information.
307 if ieio == constants.IEIO_RAW_DISK:
308 assert len(ieioargs) == 1
309 return (ieioargs[0].ToDict(), )
311 if ieio == constants.IEIO_SCRIPT:
312 assert len(ieioargs) == 2
313 return (ieioargs[0].ToDict(), ieioargs[1])
318 class RpcRunner(object):
319 """RPC runner class"""
321 def __init__(self, cfg):
322 """Initialized the rpc runner.
324 @type cfg: C{config.ConfigWriter}
325 @param cfg: the configuration object that will be used to get data
330 self.port = utils.GetDaemonPort(constants.NODED)
332 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
333 """Convert the given instance to a dict.
335 This is done via the instance's ToDict() method and additionally
336 we fill the hvparams with the cluster defaults.
338 @type instance: L{objects.Instance}
339 @param instance: an Instance object
340 @type hvp: dict or None
341 @param hvp: a dictionary with overridden hypervisor parameters
342 @type bep: dict or None
343 @param bep: a dictionary with overridden backend parameters
344 @type osp: dict or None
345 @param osp: a dictionary with overriden os parameters
347 @return: the instance dict, with the hvparams filled with the
351 idict = instance.ToDict()
352 cluster = self._cfg.GetClusterInfo()
353 idict["hvparams"] = cluster.FillHV(instance)
355 idict["hvparams"].update(hvp)
356 idict["beparams"] = cluster.FillBE(instance)
358 idict["beparams"].update(bep)
359 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
361 idict["osparams"].update(osp)
362 for nic in idict["nics"]:
363 nic['nicparams'] = objects.FillDict(
364 cluster.nicparams[constants.PP_DEFAULT],
368 def _ConnectList(self, client, node_list, call, read_timeout=None):
369 """Helper for computing node addresses.
371 @type client: L{ganeti.rpc.Client}
372 @param client: a C{Client} instance
373 @type node_list: list
374 @param node_list: the node list we should connect
376 @param call: the name of the remote procedure call, for filling in
377 correctly any eventual offline nodes' results
378 @type read_timeout: int
379 @param read_timeout: overwrites the default read timeout for the
383 all_nodes = self._cfg.GetAllNodesInfo()
387 for node in node_list:
388 if node in all_nodes:
389 if all_nodes[node].offline:
390 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
392 val = all_nodes[node].primary_ip
395 addr_list.append(val)
396 name_list.append(node)
398 client.ConnectList(name_list, address_list=addr_list,
399 read_timeout=read_timeout)
402 def _ConnectNode(self, client, node, call, read_timeout=None):
403 """Helper for computing one node's address.
405 @type client: L{ganeti.rpc.Client}
406 @param client: a C{Client} instance
408 @param node: the node we should connect
410 @param call: the name of the remote procedure call, for filling in
411 correctly any eventual offline nodes' results
412 @type read_timeout: int
413 @param read_timeout: overwrites the default read timeout for the
417 node_info = self._cfg.GetNodeInfo(node)
418 if node_info is not None:
419 if node_info.offline:
420 return RpcResult(node=node, offline=True, call=call)
421 addr = node_info.primary_ip
424 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
426 def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
427 """Helper for making a multi-node call
430 body = serializer.DumpJson(args, indent=False)
431 c = Client(procedure, body, self.port)
432 skip_dict = self._ConnectList(c, node_list, procedure,
433 read_timeout=read_timeout)
434 skip_dict.update(c.GetResults())
438 def _StaticMultiNodeCall(cls, node_list, procedure, args,
439 address_list=None, read_timeout=None):
440 """Helper for making a multi-node static call
443 body = serializer.DumpJson(args, indent=False)
444 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
445 c.ConnectList(node_list, address_list=address_list,
446 read_timeout=read_timeout)
447 return c.GetResults()
449 def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
450 """Helper for making a single-node call
453 body = serializer.DumpJson(args, indent=False)
454 c = Client(procedure, body, self.port)
455 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
457 # we did connect, node is not offline
458 result = c.GetResults()[node]
462 def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
463 """Helper for making a single-node static call
466 body = serializer.DumpJson(args, indent=False)
467 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
468 c.ConnectNode(node, read_timeout=read_timeout)
469 return c.GetResults()[node]
473 """Compresses a string for transport over RPC.
475 Small amounts of data are not compressed.
480 @return: Encoded data to send
483 # Small amounts of data are not compressed
485 return (constants.RPC_ENCODING_NONE, data)
487 # Compress with zlib and encode in base64
488 return (constants.RPC_ENCODING_ZLIB_BASE64,
489 base64.b64encode(zlib.compress(data, 3)))
495 @_RpcTimeout(_TMO_URGENT)
496 def call_lv_list(self, node_list, vg_name):
497 """Gets the logical volumes present in a given volume group.
499 This is a multi-node call.
502 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
504 @_RpcTimeout(_TMO_URGENT)
505 def call_vg_list(self, node_list):
506 """Gets the volume group list.
508 This is a multi-node call.
511 return self._MultiNodeCall(node_list, "vg_list", [])
513 @_RpcTimeout(_TMO_NORMAL)
514 def call_storage_list(self, node_list, su_name, su_args, name, fields):
515 """Get list of storage units.
517 This is a multi-node call.
520 return self._MultiNodeCall(node_list, "storage_list",
521 [su_name, su_args, name, fields])
523 @_RpcTimeout(_TMO_NORMAL)
524 def call_storage_modify(self, node, su_name, su_args, name, changes):
525 """Modify a storage unit.
527 This is a single-node call.
530 return self._SingleNodeCall(node, "storage_modify",
531 [su_name, su_args, name, changes])
533 @_RpcTimeout(_TMO_NORMAL)
534 def call_storage_execute(self, node, su_name, su_args, name, op):
535 """Executes an operation on a storage unit.
537 This is a single-node call.
540 return self._SingleNodeCall(node, "storage_execute",
541 [su_name, su_args, name, op])
543 @_RpcTimeout(_TMO_URGENT)
544 def call_bridges_exist(self, node, bridges_list):
545 """Checks if a node has all the bridges given.
547 This method checks if all bridges given in the bridges_list are
548 present on the remote node, so that an instance that uses interfaces
549 on those bridges can be started.
551 This is a single-node call.
554 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
556 @_RpcTimeout(_TMO_NORMAL)
557 def call_instance_start(self, node, instance, hvp, bep):
558 """Starts an instance.
560 This is a single-node call.
563 idict = self._InstDict(instance, hvp=hvp, bep=bep)
564 return self._SingleNodeCall(node, "instance_start", [idict])
566 @_RpcTimeout(_TMO_NORMAL)
567 def call_instance_shutdown(self, node, instance, timeout):
568 """Stops an instance.
570 This is a single-node call.
573 return self._SingleNodeCall(node, "instance_shutdown",
574 [self._InstDict(instance), timeout])
576 @_RpcTimeout(_TMO_NORMAL)
577 def call_migration_info(self, node, instance):
578 """Gather the information necessary to prepare an instance migration.
580 This is a single-node call.
583 @param node: the node on which the instance is currently running
584 @type instance: C{objects.Instance}
585 @param instance: the instance definition
588 return self._SingleNodeCall(node, "migration_info",
589 [self._InstDict(instance)])
591 @_RpcTimeout(_TMO_NORMAL)
592 def call_accept_instance(self, node, instance, info, target):
593 """Prepare a node to accept an instance.
595 This is a single-node call.
598 @param node: the target node for the migration
599 @type instance: C{objects.Instance}
600 @param instance: the instance definition
601 @type info: opaque/hypervisor specific (string/data)
602 @param info: result for the call_migration_info call
604 @param target: target hostname (usually ip address) (on the node itself)
607 return self._SingleNodeCall(node, "accept_instance",
608 [self._InstDict(instance), info, target])
610 @_RpcTimeout(_TMO_NORMAL)
611 def call_finalize_migration(self, node, instance, info, success):
612 """Finalize any target-node migration specific operation.
614 This is called both in case of a successful migration and in case of error
615 (in which case it should abort the migration).
617 This is a single-node call.
620 @param node: the target node for the migration
621 @type instance: C{objects.Instance}
622 @param instance: the instance definition
623 @type info: opaque/hypervisor specific (string/data)
624 @param info: result for the call_migration_info call
625 @type success: boolean
626 @param success: whether the migration was a success or a failure
629 return self._SingleNodeCall(node, "finalize_migration",
630 [self._InstDict(instance), info, success])
632 @_RpcTimeout(_TMO_SLOW)
633 def call_instance_migrate(self, node, instance, target, live):
634 """Migrate an instance.
636 This is a single-node call.
639 @param node: the node on which the instance is currently running
640 @type instance: C{objects.Instance}
641 @param instance: the instance definition
643 @param target: the target node name
645 @param live: whether the migration should be done live or not (the
646 interpretation of this parameter is left to the hypervisor)
649 return self._SingleNodeCall(node, "instance_migrate",
650 [self._InstDict(instance), target, live])
652 @_RpcTimeout(_TMO_NORMAL)
653 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
654 """Reboots an instance.
656 This is a single-node call.
659 return self._SingleNodeCall(node, "instance_reboot",
660 [self._InstDict(inst), reboot_type,
663 @_RpcTimeout(_TMO_1DAY)
664 def call_instance_os_add(self, node, inst, reinstall, debug):
665 """Installs an OS on the given instance.
667 This is a single-node call.
670 return self._SingleNodeCall(node, "instance_os_add",
671 [self._InstDict(inst), reinstall, debug])
673 @_RpcTimeout(_TMO_SLOW)
674 def call_instance_run_rename(self, node, inst, old_name, debug):
675 """Run the OS rename script for an instance.
677 This is a single-node call.
680 return self._SingleNodeCall(node, "instance_run_rename",
681 [self._InstDict(inst), old_name, debug])
683 @_RpcTimeout(_TMO_URGENT)
684 def call_instance_info(self, node, instance, hname):
685 """Returns information about a single instance.
687 This is a single-node call.
690 @param node: the list of nodes to query
691 @type instance: string
692 @param instance: the instance name
694 @param hname: the hypervisor type of the instance
697 return self._SingleNodeCall(node, "instance_info", [instance, hname])
699 @_RpcTimeout(_TMO_NORMAL)
700 def call_instance_migratable(self, node, instance):
701 """Checks whether the given instance can be migrated.
703 This is a single-node call.
705 @param node: the node to query
706 @type instance: L{objects.Instance}
707 @param instance: the instance to check
711 return self._SingleNodeCall(node, "instance_migratable",
712 [self._InstDict(instance)])
714 @_RpcTimeout(_TMO_URGENT)
715 def call_all_instances_info(self, node_list, hypervisor_list):
716 """Returns information about all instances on the given nodes.
718 This is a multi-node call.
720 @type node_list: list
721 @param node_list: the list of nodes to query
722 @type hypervisor_list: list
723 @param hypervisor_list: the hypervisors to query for instances
726 return self._MultiNodeCall(node_list, "all_instances_info",
729 @_RpcTimeout(_TMO_URGENT)
730 def call_instance_list(self, node_list, hypervisor_list):
731 """Returns the list of running instances on a given node.
733 This is a multi-node call.
735 @type node_list: list
736 @param node_list: the list of nodes to query
737 @type hypervisor_list: list
738 @param hypervisor_list: the hypervisors to query for instances
741 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
743 @_RpcTimeout(_TMO_FAST)
744 def call_node_tcp_ping(self, node, source, target, port, timeout,
746 """Do a TcpPing on the remote node
748 This is a single-node call.
751 return self._SingleNodeCall(node, "node_tcp_ping",
752 [source, target, port, timeout,
755 @_RpcTimeout(_TMO_FAST)
756 def call_node_has_ip_address(self, node, address):
757 """Checks if a node has the given IP address.
759 This is a single-node call.
762 return self._SingleNodeCall(node, "node_has_ip_address", [address])
764 @_RpcTimeout(_TMO_URGENT)
765 def call_node_info(self, node_list, vg_name, hypervisor_type):
766 """Return node information.
768 This will return memory information and volume group size and free
771 This is a multi-node call.
773 @type node_list: list
774 @param node_list: the list of nodes to query
775 @type vg_name: C{string}
776 @param vg_name: the name of the volume group to ask for disk space
778 @type hypervisor_type: C{str}
779 @param hypervisor_type: the name of the hypervisor to ask for
783 return self._MultiNodeCall(node_list, "node_info",
784 [vg_name, hypervisor_type])
786 @_RpcTimeout(_TMO_NORMAL)
787 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
788 """Add a node to the cluster.
790 This is a single-node call.
793 return self._SingleNodeCall(node, "node_add",
794 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
796 @_RpcTimeout(_TMO_NORMAL)
797 def call_node_verify(self, node_list, checkdict, cluster_name):
798 """Request verification of given parameters.
800 This is a multi-node call.
803 return self._MultiNodeCall(node_list, "node_verify",
804 [checkdict, cluster_name])
807 @_RpcTimeout(_TMO_FAST)
808 def call_node_start_master(cls, node, start_daemons, no_voting):
809 """Tells a node to activate itself as a master.
811 This is a single-node call.
814 return cls._StaticSingleNodeCall(node, "node_start_master",
815 [start_daemons, no_voting])
818 @_RpcTimeout(_TMO_FAST)
819 def call_node_stop_master(cls, node, stop_daemons):
820 """Tells a node to demote itself from master status.
822 This is a single-node call.
825 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
828 @_RpcTimeout(_TMO_URGENT)
829 def call_master_info(cls, node_list):
830 """Query master info.
832 This is a multi-node call.
835 # TODO: should this method query down nodes?
836 return cls._StaticMultiNodeCall(node_list, "master_info", [])
839 @_RpcTimeout(_TMO_URGENT)
840 def call_version(cls, node_list):
841 """Query node version.
843 This is a multi-node call.
846 return cls._StaticMultiNodeCall(node_list, "version", [])
848 @_RpcTimeout(_TMO_NORMAL)
849 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
850 """Request creation of a given block device.
852 This is a single-node call.
855 return self._SingleNodeCall(node, "blockdev_create",
856 [bdev.ToDict(), size, owner, on_primary, info])
858 @_RpcTimeout(_TMO_NORMAL)
859 def call_blockdev_remove(self, node, bdev):
860 """Request removal of a given block device.
862 This is a single-node call.
865 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
867 @_RpcTimeout(_TMO_NORMAL)
868 def call_blockdev_rename(self, node, devlist):
869 """Request rename of the given block devices.
871 This is a single-node call.
874 return self._SingleNodeCall(node, "blockdev_rename",
875 [(d.ToDict(), uid) for d, uid in devlist])
877 @_RpcTimeout(_TMO_NORMAL)
878 def call_blockdev_assemble(self, node, disk, owner, on_primary):
879 """Request assembling of a given block device.
881 This is a single-node call.
884 return self._SingleNodeCall(node, "blockdev_assemble",
885 [disk.ToDict(), owner, on_primary])
887 @_RpcTimeout(_TMO_NORMAL)
888 def call_blockdev_shutdown(self, node, disk):
889 """Request shutdown of a given block device.
891 This is a single-node call.
894 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
896 @_RpcTimeout(_TMO_NORMAL)
897 def call_blockdev_addchildren(self, node, bdev, ndevs):
898 """Request adding a list of children to a (mirroring) device.
900 This is a single-node call.
903 return self._SingleNodeCall(node, "blockdev_addchildren",
905 [disk.ToDict() for disk in ndevs]])
907 @_RpcTimeout(_TMO_NORMAL)
908 def call_blockdev_removechildren(self, node, bdev, ndevs):
909 """Request removing a list of children from a (mirroring) device.
911 This is a single-node call.
914 return self._SingleNodeCall(node, "blockdev_removechildren",
916 [disk.ToDict() for disk in ndevs]])
918 @_RpcTimeout(_TMO_NORMAL)
919 def call_blockdev_getmirrorstatus(self, node, disks):
920 """Request status of a (mirroring) device.
922 This is a single-node call.
925 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
926 [dsk.ToDict() for dsk in disks])
927 if not result.fail_msg:
928 result.payload = [objects.BlockDevStatus.FromDict(i)
929 for i in result.payload]
932 @_RpcTimeout(_TMO_NORMAL)
933 def call_blockdev_find(self, node, disk):
934 """Request identification of a given block device.
936 This is a single-node call.
939 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
940 if not result.fail_msg and result.payload is not None:
941 result.payload = objects.BlockDevStatus.FromDict(result.payload)
944 @_RpcTimeout(_TMO_NORMAL)
945 def call_blockdev_close(self, node, instance_name, disks):
946 """Closes the given block devices.
948 This is a single-node call.
951 params = [instance_name, [cf.ToDict() for cf in disks]]
952 return self._SingleNodeCall(node, "blockdev_close", params)
954 @_RpcTimeout(_TMO_NORMAL)
955 def call_blockdev_getsizes(self, node, disks):
956 """Returns the size of the given disks.
958 This is a single-node call.
961 params = [[cf.ToDict() for cf in disks]]
962 return self._SingleNodeCall(node, "blockdev_getsize", params)
964 @_RpcTimeout(_TMO_NORMAL)
965 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
966 """Disconnects the network of the given drbd devices.
968 This is a multi-node call.
971 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
972 [nodes_ip, [cf.ToDict() for cf in disks]])
974 @_RpcTimeout(_TMO_NORMAL)
975 def call_drbd_attach_net(self, node_list, nodes_ip,
976 disks, instance_name, multimaster):
977 """Disconnects the given drbd devices.
979 This is a multi-node call.
982 return self._MultiNodeCall(node_list, "drbd_attach_net",
983 [nodes_ip, [cf.ToDict() for cf in disks],
984 instance_name, multimaster])
986 @_RpcTimeout(_TMO_SLOW)
987 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
988 """Waits for the synchronization of drbd devices is complete.
990 This is a multi-node call.
993 return self._MultiNodeCall(node_list, "drbd_wait_sync",
994 [nodes_ip, [cf.ToDict() for cf in disks]])
997 @_RpcTimeout(_TMO_NORMAL)
998 def call_upload_file(cls, node_list, file_name, address_list=None):
1001 The node will refuse the operation in case the file is not on the
1004 This is a multi-node call.
1006 @type node_list: list
1007 @param node_list: the list of node names to upload to
1008 @type file_name: str
1009 @param file_name: the filename to upload
1010 @type address_list: list or None
1011 @keyword address_list: an optional list of node addresses, in order
1012 to optimize the RPC speed
1015 file_contents = utils.ReadFile(file_name)
1016 data = cls._Compress(file_contents)
1017 st = os.stat(file_name)
1018 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1019 st.st_atime, st.st_mtime]
1020 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1021 address_list=address_list)
1024 @_RpcTimeout(_TMO_NORMAL)
1025 def call_write_ssconf_files(cls, node_list, values):
1026 """Write ssconf files.
1028 This is a multi-node call.
1031 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1033 @_RpcTimeout(_TMO_FAST)
1034 def call_os_diagnose(self, node_list):
1035 """Request a diagnose of OS definitions.
1037 This is a multi-node call.
1040 return self._MultiNodeCall(node_list, "os_diagnose", [])
1042 @_RpcTimeout(_TMO_FAST)
1043 def call_os_get(self, node, name):
1044 """Returns an OS definition.
1046 This is a single-node call.
1049 result = self._SingleNodeCall(node, "os_get", [name])
1050 if not result.fail_msg and isinstance(result.payload, dict):
1051 result.payload = objects.OS.FromDict(result.payload)
1054 @_RpcTimeout(_TMO_FAST)
1055 def call_os_validate(self, required, nodes, name, checks, params):
1056 """Run a validation routine for a given OS.
1058 This is a multi-node call.
1061 return self._MultiNodeCall(nodes, "os_validate",
1062 [required, name, checks, params])
1064 @_RpcTimeout(_TMO_NORMAL)
1065 def call_hooks_runner(self, node_list, hpath, phase, env):
1066 """Call the hooks runner.
1069 - op: the OpCode instance
1070 - env: a dictionary with the environment
1072 This is a multi-node call.
1075 params = [hpath, phase, env]
1076 return self._MultiNodeCall(node_list, "hooks_runner", params)
1078 @_RpcTimeout(_TMO_NORMAL)
1079 def call_iallocator_runner(self, node, name, idata):
1080 """Call an iallocator on a remote node
1083 - name: the iallocator name
1084 - input: the json-encoded input string
1086 This is a single-node call.
1089 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1091 @_RpcTimeout(_TMO_NORMAL)
1092 def call_blockdev_grow(self, node, cf_bdev, amount):
1093 """Request a snapshot of the given block device.
1095 This is a single-node call.
1098 return self._SingleNodeCall(node, "blockdev_grow",
1099 [cf_bdev.ToDict(), amount])
1101 @_RpcTimeout(_TMO_1DAY)
1102 def call_blockdev_export(self, node, cf_bdev,
1103 dest_node, dest_path, cluster_name):
1104 """Export a given disk to another node.
1106 This is a single-node call.
1109 return self._SingleNodeCall(node, "blockdev_export",
1110 [cf_bdev.ToDict(), dest_node, dest_path,
1113 @_RpcTimeout(_TMO_NORMAL)
1114 def call_blockdev_snapshot(self, node, cf_bdev):
1115 """Request a snapshot of the given block device.
1117 This is a single-node call.
1120 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1122 @_RpcTimeout(_TMO_NORMAL)
1123 def call_finalize_export(self, node, instance, snap_disks):
1124 """Request the completion of an export operation.
1126 This writes the export config file, etc.
1128 This is a single-node call.
1132 for disk in snap_disks:
1133 if isinstance(disk, bool):
1134 flat_disks.append(disk)
1136 flat_disks.append(disk.ToDict())
1138 return self._SingleNodeCall(node, "finalize_export",
1139 [self._InstDict(instance), flat_disks])
1141 @_RpcTimeout(_TMO_FAST)
1142 def call_export_info(self, node, path):
1143 """Queries the export information in a given path.
1145 This is a single-node call.
1148 return self._SingleNodeCall(node, "export_info", [path])
1150 @_RpcTimeout(_TMO_FAST)
1151 def call_export_list(self, node_list):
1152 """Gets the stored exports list.
1154 This is a multi-node call.
1157 return self._MultiNodeCall(node_list, "export_list", [])
1159 @_RpcTimeout(_TMO_FAST)
1160 def call_export_remove(self, node, export):
1161 """Requests removal of a given export.
1163 This is a single-node call.
1166 return self._SingleNodeCall(node, "export_remove", [export])
1169 @_RpcTimeout(_TMO_NORMAL)
1170 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1171 """Requests a node to clean the cluster information it has.
1173 This will remove the configuration information from the ganeti data
1176 This is a single-node call.
1179 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1182 @_RpcTimeout(_TMO_FAST)
1183 def call_node_volumes(self, node_list):
1184 """Gets all volumes on node(s).
1186 This is a multi-node call.
1189 return self._MultiNodeCall(node_list, "node_volumes", [])
1191 @_RpcTimeout(_TMO_FAST)
1192 def call_node_demote_from_mc(self, node):
1193 """Demote a node from the master candidate role.
1195 This is a single-node call.
1198 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1200 @_RpcTimeout(_TMO_NORMAL)
1201 def call_node_powercycle(self, node, hypervisor):
1202 """Tries to powercycle a node.
1204 This is a single-node call.
1207 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1210 def call_test_delay(self, node_list, duration):
1211 """Sleep for a fixed time on given node(s).
1213 This is a multi-node call.
1216 return self._MultiNodeCall(node_list, "test_delay", [duration],
1217 read_timeout=int(duration + 5))
1219 @_RpcTimeout(_TMO_FAST)
1220 def call_file_storage_dir_create(self, node, file_storage_dir):
1221 """Create the given file storage directory.
1223 This is a single-node call.
1226 return self._SingleNodeCall(node, "file_storage_dir_create",
1229 @_RpcTimeout(_TMO_FAST)
1230 def call_file_storage_dir_remove(self, node, file_storage_dir):
1231 """Remove the given file storage directory.
1233 This is a single-node call.
1236 return self._SingleNodeCall(node, "file_storage_dir_remove",
1239 @_RpcTimeout(_TMO_FAST)
1240 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1241 new_file_storage_dir):
1242 """Rename file storage directory.
1244 This is a single-node call.
1247 return self._SingleNodeCall(node, "file_storage_dir_rename",
1248 [old_file_storage_dir, new_file_storage_dir])
1251 @_RpcTimeout(_TMO_FAST)
1252 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1253 """Update job queue.
1255 This is a multi-node call.
1258 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1259 [file_name, cls._Compress(content)],
1260 address_list=address_list)
1263 @_RpcTimeout(_TMO_NORMAL)
1264 def call_jobqueue_purge(cls, node):
1267 This is a single-node call.
1270 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1273 @_RpcTimeout(_TMO_FAST)
1274 def call_jobqueue_rename(cls, node_list, address_list, rename):
1275 """Rename a job queue file.
1277 This is a multi-node call.
1280 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1281 address_list=address_list)
1283 @_RpcTimeout(_TMO_NORMAL)
1284 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1285 """Validate the hypervisor params.
1287 This is a multi-node call.
1289 @type node_list: list
1290 @param node_list: the list of nodes to query
1291 @type hvname: string
1292 @param hvname: the hypervisor name
1293 @type hvparams: dict
1294 @param hvparams: the hypervisor parameters to be validated
1297 cluster = self._cfg.GetClusterInfo()
1298 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1299 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1302 @_RpcTimeout(_TMO_NORMAL)
1303 def call_x509_cert_create(self, node, validity):
1304 """Creates a new X509 certificate for SSL/TLS.
1306 This is a single-node call.
1309 @param validity: Validity in seconds
1312 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1314 @_RpcTimeout(_TMO_NORMAL)
1315 def call_x509_cert_remove(self, node, name):
1316 """Removes a X509 certificate.
1318 This is a single-node call.
1321 @param name: Certificate name
1324 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1326 @_RpcTimeout(_TMO_NORMAL)
1327 def call_import_start(self, node, opts, instance, dest, dest_args):
1328 """Starts a listener for an import.
1330 This is a single-node call.
1333 @param node: Node name
1334 @type instance: C{objects.Instance}
1335 @param instance: Instance object
1338 return self._SingleNodeCall(node, "import_start",
1340 self._InstDict(instance), dest,
1341 _EncodeImportExportIO(dest, dest_args)])
1343 @_RpcTimeout(_TMO_NORMAL)
1344 def call_export_start(self, node, opts, host, port,
1345 instance, source, source_args):
1346 """Starts an export daemon.
1348 This is a single-node call.
1351 @param node: Node name
1352 @type instance: C{objects.Instance}
1353 @param instance: Instance object
1356 return self._SingleNodeCall(node, "export_start",
1357 [opts.ToDict(), host, port,
1358 self._InstDict(instance), source,
1359 _EncodeImportExportIO(source, source_args)])
1361 @_RpcTimeout(_TMO_FAST)
1362 def call_impexp_status(self, node, names):
1363 """Gets the status of an import or export.
1365 This is a single-node call.
1368 @param node: Node name
1369 @type names: List of strings
1370 @param names: Import/export names
1371 @rtype: List of L{objects.ImportExportStatus} instances
1372 @return: Returns a list of the state of each named import/export or None if
1373 a status couldn't be retrieved
1376 result = self._SingleNodeCall(node, "impexp_status", [names])
1378 if not result.fail_msg:
1381 for i in result.payload:
1383 decoded.append(None)
1385 decoded.append(objects.ImportExportStatus.FromDict(i))
1387 result.payload = decoded
1391 @_RpcTimeout(_TMO_NORMAL)
1392 def call_impexp_abort(self, node, name):
1393 """Aborts an import or export.
1395 This is a single-node call.
1398 @param node: Node name
1400 @param name: Import/export name
1403 return self._SingleNodeCall(node, "impexp_abort", [name])
1405 @_RpcTimeout(_TMO_NORMAL)
1406 def call_impexp_cleanup(self, node, name):
1407 """Cleans up after an import or export.
1409 This is a single-node call.
1412 @param node: Node name
1414 @param name: Import/export name
1417 return self._SingleNodeCall(node, "impexp_cleanup", [name])