4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
45 # pylint has a bug here, doesn't see this import
46 import ganeti.http.client # pylint: disable-msg=W0611
49 # Module level variable
54 """Initializes the module-global HTTP client manager.
56 Must be called before using any RPC function.
59 global _http_manager # pylint: disable-msg=W0603
61 assert not _http_manager, "RPC module initialized more than once"
65 _http_manager = http.client.HttpClientManager()
69 """Stops the module-global HTTP client manager.
71 Must be called before quitting the program.
74 global _http_manager # pylint: disable-msg=W0603
77 _http_manager.Shutdown()
81 class RpcResult(object):
84 This class holds an RPC result. It is needed since in multi-node
85 calls we can't raise an exception just because one one out of many
86 failed, and therefore we use this class to encapsulate the result.
88 @ivar data: the data payload, for successful results, or None
89 @ivar call: the name of the RPC call
90 @ivar node: the name of the node to which we made the call
91 @ivar offline: whether the operation failed because the node was
92 offline, as opposed to actual failure; offline=True will always
93 imply failed=True, in order to allow simpler checking if
94 the user doesn't care about the exact failure mode
95 @ivar fail_msg: the error message if the call failed
98 def __init__(self, data=None, failed=False, offline=False,
99 call=None, node=None):
100 self.offline = offline
105 self.fail_msg = "Node is marked offline"
106 self.data = self.payload = None
108 self.fail_msg = self._EnsureErr(data)
109 self.data = self.payload = None
112 if not isinstance(self.data, (tuple, list)):
113 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
117 self.fail_msg = ("RPC layer error: invalid result length (%d), "
118 "expected 2" % len(self.data))
120 elif not self.data[0]:
121 self.fail_msg = self._EnsureErr(self.data[1])
126 self.payload = data[1]
128 assert hasattr(self, "call")
129 assert hasattr(self, "data")
130 assert hasattr(self, "fail_msg")
131 assert hasattr(self, "node")
132 assert hasattr(self, "offline")
133 assert hasattr(self, "payload")
137 """Helper to ensure we return a 'True' value for error."""
141 return "No error information"
143 def Raise(self, msg, prereq=False, ecode=None):
144 """If the result has failed, raise an OpExecError.
146 This is used so that LU code doesn't have to check for each
147 result, but instead can call this function.
150 if not self.fail_msg:
153 if not msg: # one could pass None for default message
154 msg = ("Call '%s' to node '%s' has failed: %s" %
155 (self.call, self.node, self.fail_msg))
157 msg = "%s: %s" % (msg, self.fail_msg)
159 ec = errors.OpPrereqError
161 ec = errors.OpExecError
162 if ecode is not None:
166 raise ec(*args) # pylint: disable-msg=W0142
172 This class, given a (remote) method name, a list of parameters and a
173 list of nodes, will contact (in parallel) all nodes, and return a
174 dict of results (key: node name, value: result).
176 One current bug is that generic failure is still signaled by
177 'False' result, which is not good. This overloading of values can
181 def __init__(self, procedure, body, port):
182 self.procedure = procedure
188 http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
189 ssl_cert_path=constants.NODED_CERT_FILE)
191 def ConnectList(self, node_list, address_list=None):
192 """Add a list of nodes to the target nodes.
194 @type node_list: list
195 @param node_list: the list of node names to connect
196 @type address_list: list or None
197 @keyword address_list: either None or a list with node addresses,
198 which must have the same length as the node list
201 if address_list is None:
202 address_list = [None for _ in node_list]
204 assert len(node_list) == len(address_list), \
205 "Name and address lists should have the same length"
206 for node, address in zip(node_list, address_list):
207 self.ConnectNode(node, address)
209 def ConnectNode(self, name, address=None):
210 """Add a node to the target list.
213 @param name: the node name
215 @keyword address: the node address, if known
222 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
223 "/%s" % self.procedure,
225 ssl_params=self._ssl_params,
226 ssl_verify_peer=True)
228 def GetResults(self):
229 """Call nodes and return results.
232 @return: List of RPC results
235 assert _http_manager, "RPC module not initialized"
237 _http_manager.ExecRequests(self.nc.values())
241 for name, req in self.nc.iteritems():
242 if req.success and req.resp_status_code == http.HTTP_OK:
243 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
244 node=name, call=self.procedure)
247 # TODO: Better error reporting
253 logging.error("RPC error in %s from node %s: %s",
254 self.procedure, name, msg)
255 results[name] = RpcResult(data=msg, failed=True, node=name,
261 class RpcRunner(object):
262 """RPC runner class"""
264 def __init__(self, cfg):
265 """Initialized the rpc runner.
267 @type cfg: C{config.ConfigWriter}
268 @param cfg: the configuration object that will be used to get data
273 self.port = utils.GetDaemonPort(constants.NODED)
275 def _InstDict(self, instance, hvp=None, bep=None):
276 """Convert the given instance to a dict.
278 This is done via the instance's ToDict() method and additionally
279 we fill the hvparams with the cluster defaults.
281 @type instance: L{objects.Instance}
282 @param instance: an Instance object
283 @type hvp: dict or None
284 @param hvp: a dictionary with overridden hypervisor parameters
285 @type bep: dict or None
286 @param bep: a dictionary with overridden backend parameters
288 @return: the instance dict, with the hvparams filled with the
292 idict = instance.ToDict()
293 cluster = self._cfg.GetClusterInfo()
294 idict["hvparams"] = cluster.FillHV(instance)
296 idict["hvparams"].update(hvp)
297 idict["beparams"] = cluster.FillBE(instance)
299 idict["beparams"].update(bep)
300 for nic in idict["nics"]:
301 nic['nicparams'] = objects.FillDict(
302 cluster.nicparams[constants.PP_DEFAULT],
306 def _ConnectList(self, client, node_list, call):
307 """Helper for computing node addresses.
309 @type client: L{ganeti.rpc.Client}
310 @param client: a C{Client} instance
311 @type node_list: list
312 @param node_list: the node list we should connect
314 @param call: the name of the remote procedure call, for filling in
315 correctly any eventual offline nodes' results
318 all_nodes = self._cfg.GetAllNodesInfo()
322 for node in node_list:
323 if node in all_nodes:
324 if all_nodes[node].offline:
325 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
327 val = all_nodes[node].primary_ip
330 addr_list.append(val)
331 name_list.append(node)
333 client.ConnectList(name_list, address_list=addr_list)
336 def _ConnectNode(self, client, node, call):
337 """Helper for computing one node's address.
339 @type client: L{ganeti.rpc.Client}
340 @param client: a C{Client} instance
342 @param node: the node we should connect
344 @param call: the name of the remote procedure call, for filling in
345 correctly any eventual offline nodes' results
348 node_info = self._cfg.GetNodeInfo(node)
349 if node_info is not None:
350 if node_info.offline:
351 return RpcResult(node=node, offline=True, call=call)
352 addr = node_info.primary_ip
355 client.ConnectNode(node, address=addr)
357 def _MultiNodeCall(self, node_list, procedure, args):
358 """Helper for making a multi-node call
361 body = serializer.DumpJson(args, indent=False)
362 c = Client(procedure, body, self.port)
363 skip_dict = self._ConnectList(c, node_list, procedure)
364 skip_dict.update(c.GetResults())
368 def _StaticMultiNodeCall(cls, node_list, procedure, args,
370 """Helper for making a multi-node static call
373 body = serializer.DumpJson(args, indent=False)
374 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
375 c.ConnectList(node_list, address_list=address_list)
376 return c.GetResults()
378 def _SingleNodeCall(self, node, procedure, args):
379 """Helper for making a single-node call
382 body = serializer.DumpJson(args, indent=False)
383 c = Client(procedure, body, self.port)
384 result = self._ConnectNode(c, node, procedure)
386 # we did connect, node is not offline
387 result = c.GetResults()[node]
391 def _StaticSingleNodeCall(cls, node, procedure, args):
392 """Helper for making a single-node static call
395 body = serializer.DumpJson(args, indent=False)
396 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
398 return c.GetResults()[node]
402 """Compresses a string for transport over RPC.
404 Small amounts of data are not compressed.
409 @return: Encoded data to send
412 # Small amounts of data are not compressed
414 return (constants.RPC_ENCODING_NONE, data)
416 # Compress with zlib and encode in base64
417 return (constants.RPC_ENCODING_ZLIB_BASE64,
418 base64.b64encode(zlib.compress(data, 3)))
424 def call_lv_list(self, node_list, vg_name):
425 """Gets the logical volumes present in a given volume group.
427 This is a multi-node call.
430 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
432 def call_vg_list(self, node_list):
433 """Gets the volume group list.
435 This is a multi-node call.
438 return self._MultiNodeCall(node_list, "vg_list", [])
440 def call_storage_list(self, node_list, su_name, su_args, name, fields):
441 """Get list of storage units.
443 This is a multi-node call.
446 return self._MultiNodeCall(node_list, "storage_list",
447 [su_name, su_args, name, fields])
449 def call_storage_modify(self, node, su_name, su_args, name, changes):
450 """Modify a storage unit.
452 This is a single-node call.
455 return self._SingleNodeCall(node, "storage_modify",
456 [su_name, su_args, name, changes])
458 def call_storage_execute(self, node, su_name, su_args, name, op):
459 """Executes an operation on a storage unit.
461 This is a single-node call.
464 return self._SingleNodeCall(node, "storage_execute",
465 [su_name, su_args, name, op])
467 def call_bridges_exist(self, node, bridges_list):
468 """Checks if a node has all the bridges given.
470 This method checks if all bridges given in the bridges_list are
471 present on the remote node, so that an instance that uses interfaces
472 on those bridges can be started.
474 This is a single-node call.
477 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
479 def call_instance_start(self, node, instance, hvp, bep):
480 """Starts an instance.
482 This is a single-node call.
485 idict = self._InstDict(instance, hvp=hvp, bep=bep)
486 return self._SingleNodeCall(node, "instance_start", [idict])
488 def call_instance_shutdown(self, node, instance, timeout):
489 """Stops an instance.
491 This is a single-node call.
494 return self._SingleNodeCall(node, "instance_shutdown",
495 [self._InstDict(instance), timeout])
497 def call_migration_info(self, node, instance):
498 """Gather the information necessary to prepare an instance migration.
500 This is a single-node call.
503 @param node: the node on which the instance is currently running
504 @type instance: C{objects.Instance}
505 @param instance: the instance definition
508 return self._SingleNodeCall(node, "migration_info",
509 [self._InstDict(instance)])
511 def call_accept_instance(self, node, instance, info, target):
512 """Prepare a node to accept an instance.
514 This is a single-node call.
517 @param node: the target node for the migration
518 @type instance: C{objects.Instance}
519 @param instance: the instance definition
520 @type info: opaque/hypervisor specific (string/data)
521 @param info: result for the call_migration_info call
523 @param target: target hostname (usually ip address) (on the node itself)
526 return self._SingleNodeCall(node, "accept_instance",
527 [self._InstDict(instance), info, target])
529 def call_finalize_migration(self, node, instance, info, success):
530 """Finalize any target-node migration specific operation.
532 This is called both in case of a successful migration and in case of error
533 (in which case it should abort the migration).
535 This is a single-node call.
538 @param node: the target node for the migration
539 @type instance: C{objects.Instance}
540 @param instance: the instance definition
541 @type info: opaque/hypervisor specific (string/data)
542 @param info: result for the call_migration_info call
543 @type success: boolean
544 @param success: whether the migration was a success or a failure
547 return self._SingleNodeCall(node, "finalize_migration",
548 [self._InstDict(instance), info, success])
550 def call_instance_migrate(self, node, instance, target, live):
551 """Migrate an instance.
553 This is a single-node call.
556 @param node: the node on which the instance is currently running
557 @type instance: C{objects.Instance}
558 @param instance: the instance definition
560 @param target: the target node name
562 @param live: whether the migration should be done live or not (the
563 interpretation of this parameter is left to the hypervisor)
566 return self._SingleNodeCall(node, "instance_migrate",
567 [self._InstDict(instance), target, live])
569 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
570 """Reboots an instance.
572 This is a single-node call.
575 return self._SingleNodeCall(node, "instance_reboot",
576 [self._InstDict(inst), reboot_type,
579 def call_instance_os_add(self, node, inst, reinstall, debug):
580 """Installs an OS on the given instance.
582 This is a single-node call.
585 return self._SingleNodeCall(node, "instance_os_add",
586 [self._InstDict(inst), reinstall, debug])
588 def call_instance_run_rename(self, node, inst, old_name, debug):
589 """Run the OS rename script for an instance.
591 This is a single-node call.
594 return self._SingleNodeCall(node, "instance_run_rename",
595 [self._InstDict(inst), old_name, debug])
597 def call_instance_info(self, node, instance, hname):
598 """Returns information about a single instance.
600 This is a single-node call.
603 @param node: the list of nodes to query
604 @type instance: string
605 @param instance: the instance name
607 @param hname: the hypervisor type of the instance
610 return self._SingleNodeCall(node, "instance_info", [instance, hname])
612 def call_instance_migratable(self, node, instance):
613 """Checks whether the given instance can be migrated.
615 This is a single-node call.
617 @param node: the node to query
618 @type instance: L{objects.Instance}
619 @param instance: the instance to check
623 return self._SingleNodeCall(node, "instance_migratable",
624 [self._InstDict(instance)])
626 def call_all_instances_info(self, node_list, hypervisor_list):
627 """Returns information about all instances on the given nodes.
629 This is a multi-node call.
631 @type node_list: list
632 @param node_list: the list of nodes to query
633 @type hypervisor_list: list
634 @param hypervisor_list: the hypervisors to query for instances
637 return self._MultiNodeCall(node_list, "all_instances_info",
640 def call_instance_list(self, node_list, hypervisor_list):
641 """Returns the list of running instances on a given node.
643 This is a multi-node call.
645 @type node_list: list
646 @param node_list: the list of nodes to query
647 @type hypervisor_list: list
648 @param hypervisor_list: the hypervisors to query for instances
651 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
653 def call_node_tcp_ping(self, node, source, target, port, timeout,
655 """Do a TcpPing on the remote node
657 This is a single-node call.
660 return self._SingleNodeCall(node, "node_tcp_ping",
661 [source, target, port, timeout,
664 def call_node_has_ip_address(self, node, address):
665 """Checks if a node has the given IP address.
667 This is a single-node call.
670 return self._SingleNodeCall(node, "node_has_ip_address", [address])
672 def call_node_info(self, node_list, vg_name, hypervisor_type):
673 """Return node information.
675 This will return memory information and volume group size and free
678 This is a multi-node call.
680 @type node_list: list
681 @param node_list: the list of nodes to query
682 @type vg_name: C{string}
683 @param vg_name: the name of the volume group to ask for disk space
685 @type hypervisor_type: C{str}
686 @param hypervisor_type: the name of the hypervisor to ask for
690 return self._MultiNodeCall(node_list, "node_info",
691 [vg_name, hypervisor_type])
693 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
694 """Add a node to the cluster.
696 This is a single-node call.
699 return self._SingleNodeCall(node, "node_add",
700 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
702 def call_node_verify(self, node_list, checkdict, cluster_name):
703 """Request verification of given parameters.
705 This is a multi-node call.
708 return self._MultiNodeCall(node_list, "node_verify",
709 [checkdict, cluster_name])
712 def call_node_start_master(cls, node, start_daemons, no_voting):
713 """Tells a node to activate itself as a master.
715 This is a single-node call.
718 return cls._StaticSingleNodeCall(node, "node_start_master",
719 [start_daemons, no_voting])
722 def call_node_stop_master(cls, node, stop_daemons):
723 """Tells a node to demote itself from master status.
725 This is a single-node call.
728 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
731 def call_master_info(cls, node_list):
732 """Query master info.
734 This is a multi-node call.
737 # TODO: should this method query down nodes?
738 return cls._StaticMultiNodeCall(node_list, "master_info", [])
741 def call_version(cls, node_list):
742 """Query node version.
744 This is a multi-node call.
747 return cls._StaticMultiNodeCall(node_list, "version", [])
749 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
750 """Request creation of a given block device.
752 This is a single-node call.
755 return self._SingleNodeCall(node, "blockdev_create",
756 [bdev.ToDict(), size, owner, on_primary, info])
758 def call_blockdev_remove(self, node, bdev):
759 """Request removal of a given block device.
761 This is a single-node call.
764 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
766 def call_blockdev_rename(self, node, devlist):
767 """Request rename of the given block devices.
769 This is a single-node call.
772 return self._SingleNodeCall(node, "blockdev_rename",
773 [(d.ToDict(), uid) for d, uid in devlist])
775 def call_blockdev_assemble(self, node, disk, owner, on_primary):
776 """Request assembling of a given block device.
778 This is a single-node call.
781 return self._SingleNodeCall(node, "blockdev_assemble",
782 [disk.ToDict(), owner, on_primary])
784 def call_blockdev_shutdown(self, node, disk):
785 """Request shutdown of a given block device.
787 This is a single-node call.
790 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
792 def call_blockdev_addchildren(self, node, bdev, ndevs):
793 """Request adding a list of children to a (mirroring) device.
795 This is a single-node call.
798 return self._SingleNodeCall(node, "blockdev_addchildren",
800 [disk.ToDict() for disk in ndevs]])
802 def call_blockdev_removechildren(self, node, bdev, ndevs):
803 """Request removing a list of children from a (mirroring) device.
805 This is a single-node call.
808 return self._SingleNodeCall(node, "blockdev_removechildren",
810 [disk.ToDict() for disk in ndevs]])
812 def call_blockdev_getmirrorstatus(self, node, disks):
813 """Request status of a (mirroring) device.
815 This is a single-node call.
818 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
819 [dsk.ToDict() for dsk in disks])
820 if not result.fail_msg:
821 result.payload = [objects.BlockDevStatus.FromDict(i)
822 for i in result.payload]
825 def call_blockdev_find(self, node, disk):
826 """Request identification of a given block device.
828 This is a single-node call.
831 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
832 if not result.fail_msg and result.payload is not None:
833 result.payload = objects.BlockDevStatus.FromDict(result.payload)
836 def call_blockdev_close(self, node, instance_name, disks):
837 """Closes the given block devices.
839 This is a single-node call.
842 params = [instance_name, [cf.ToDict() for cf in disks]]
843 return self._SingleNodeCall(node, "blockdev_close", params)
845 def call_blockdev_getsizes(self, node, disks):
846 """Returns the size of the given disks.
848 This is a single-node call.
851 params = [[cf.ToDict() for cf in disks]]
852 return self._SingleNodeCall(node, "blockdev_getsize", params)
854 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
855 """Disconnects the network of the given drbd devices.
857 This is a multi-node call.
860 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
861 [nodes_ip, [cf.ToDict() for cf in disks]])
863 def call_drbd_attach_net(self, node_list, nodes_ip,
864 disks, instance_name, multimaster):
865 """Disconnects the given drbd devices.
867 This is a multi-node call.
870 return self._MultiNodeCall(node_list, "drbd_attach_net",
871 [nodes_ip, [cf.ToDict() for cf in disks],
872 instance_name, multimaster])
874 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
875 """Waits for the synchronization of drbd devices is complete.
877 This is a multi-node call.
880 return self._MultiNodeCall(node_list, "drbd_wait_sync",
881 [nodes_ip, [cf.ToDict() for cf in disks]])
884 def call_upload_file(cls, node_list, file_name, address_list=None):
887 The node will refuse the operation in case the file is not on the
890 This is a multi-node call.
892 @type node_list: list
893 @param node_list: the list of node names to upload to
895 @param file_name: the filename to upload
896 @type address_list: list or None
897 @keyword address_list: an optional list of node addresses, in order
898 to optimize the RPC speed
901 file_contents = utils.ReadFile(file_name)
902 data = cls._Compress(file_contents)
903 st = os.stat(file_name)
904 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
905 st.st_atime, st.st_mtime]
906 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
907 address_list=address_list)
910 def call_write_ssconf_files(cls, node_list, values):
911 """Write ssconf files.
913 This is a multi-node call.
916 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
918 def call_os_diagnose(self, node_list):
919 """Request a diagnose of OS definitions.
921 This is a multi-node call.
924 return self._MultiNodeCall(node_list, "os_diagnose", [])
926 def call_os_get(self, node, name):
927 """Returns an OS definition.
929 This is a single-node call.
932 result = self._SingleNodeCall(node, "os_get", [name])
933 if not result.fail_msg and isinstance(result.payload, dict):
934 result.payload = objects.OS.FromDict(result.payload)
937 def call_hooks_runner(self, node_list, hpath, phase, env):
938 """Call the hooks runner.
941 - op: the OpCode instance
942 - env: a dictionary with the environment
944 This is a multi-node call.
947 params = [hpath, phase, env]
948 return self._MultiNodeCall(node_list, "hooks_runner", params)
950 def call_iallocator_runner(self, node, name, idata):
951 """Call an iallocator on a remote node
954 - name: the iallocator name
955 - input: the json-encoded input string
957 This is a single-node call.
960 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
962 def call_blockdev_grow(self, node, cf_bdev, amount):
963 """Request a snapshot of the given block device.
965 This is a single-node call.
968 return self._SingleNodeCall(node, "blockdev_grow",
969 [cf_bdev.ToDict(), amount])
971 def call_blockdev_export(self, node, cf_bdev,
972 dest_node, dest_path, cluster_name):
973 """Export a given disk to another node.
975 This is a single-node call.
978 return self._SingleNodeCall(node, "blockdev_export",
979 [cf_bdev.ToDict(), dest_node, dest_path,
982 def call_blockdev_snapshot(self, node, cf_bdev):
983 """Request a snapshot of the given block device.
985 This is a single-node call.
988 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
990 def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
991 cluster_name, idx, debug):
992 """Request the export of a given snapshot.
994 This is a single-node call.
997 return self._SingleNodeCall(node, "snapshot_export",
998 [snap_bdev.ToDict(), dest_node,
999 self._InstDict(instance), cluster_name,
1002 def call_finalize_export(self, node, instance, snap_disks):
1003 """Request the completion of an export operation.
1005 This writes the export config file, etc.
1007 This is a single-node call.
1011 for disk in snap_disks:
1012 if isinstance(disk, bool):
1013 flat_disks.append(disk)
1015 flat_disks.append(disk.ToDict())
1017 return self._SingleNodeCall(node, "finalize_export",
1018 [self._InstDict(instance), flat_disks])
1020 def call_export_info(self, node, path):
1021 """Queries the export information in a given path.
1023 This is a single-node call.
1026 return self._SingleNodeCall(node, "export_info", [path])
1028 def call_instance_os_import(self, node, inst, src_node, src_images,
1029 cluster_name, debug):
1030 """Request the import of a backup into an instance.
1032 This is a single-node call.
1035 return self._SingleNodeCall(node, "instance_os_import",
1036 [self._InstDict(inst), src_node, src_images,
1037 cluster_name, debug])
1039 def call_export_list(self, node_list):
1040 """Gets the stored exports list.
1042 This is a multi-node call.
1045 return self._MultiNodeCall(node_list, "export_list", [])
1047 def call_export_remove(self, node, export):
1048 """Requests removal of a given export.
1050 This is a single-node call.
1053 return self._SingleNodeCall(node, "export_remove", [export])
1056 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1057 """Requests a node to clean the cluster information it has.
1059 This will remove the configuration information from the ganeti data
1062 This is a single-node call.
1065 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1068 def call_node_volumes(self, node_list):
1069 """Gets all volumes on node(s).
1071 This is a multi-node call.
1074 return self._MultiNodeCall(node_list, "node_volumes", [])
1076 def call_node_demote_from_mc(self, node):
1077 """Demote a node from the master candidate role.
1079 This is a single-node call.
1082 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1084 def call_node_powercycle(self, node, hypervisor):
1085 """Tries to powercycle a node.
1087 This is a single-node call.
1090 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1092 def call_test_delay(self, node_list, duration):
1093 """Sleep for a fixed time on given node(s).
1095 This is a multi-node call.
1098 return self._MultiNodeCall(node_list, "test_delay", [duration])
1100 def call_file_storage_dir_create(self, node, file_storage_dir):
1101 """Create the given file storage directory.
1103 This is a single-node call.
1106 return self._SingleNodeCall(node, "file_storage_dir_create",
1109 def call_file_storage_dir_remove(self, node, file_storage_dir):
1110 """Remove the given file storage directory.
1112 This is a single-node call.
1115 return self._SingleNodeCall(node, "file_storage_dir_remove",
1118 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1119 new_file_storage_dir):
1120 """Rename file storage directory.
1122 This is a single-node call.
1125 return self._SingleNodeCall(node, "file_storage_dir_rename",
1126 [old_file_storage_dir, new_file_storage_dir])
1129 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1130 """Update job queue.
1132 This is a multi-node call.
1135 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1136 [file_name, cls._Compress(content)],
1137 address_list=address_list)
1140 def call_jobqueue_purge(cls, node):
1143 This is a single-node call.
1146 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1149 def call_jobqueue_rename(cls, node_list, address_list, rename):
1150 """Rename a job queue file.
1152 This is a multi-node call.
1155 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1156 address_list=address_list)
1159 def call_jobqueue_set_drain(cls, node_list, drain_flag):
1160 """Set the drain flag on the queue.
1162 This is a multi-node call.
1164 @type node_list: list
1165 @param node_list: the list of nodes to query
1166 @type drain_flag: bool
1167 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1170 return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1173 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1174 """Validate the hypervisor params.
1176 This is a multi-node call.
1178 @type node_list: list
1179 @param node_list: the list of nodes to query
1180 @type hvname: string
1181 @param hvname: the hypervisor name
1182 @type hvparams: dict
1183 @param hvparams: the hypervisor parameters to be validated
1186 cluster = self._cfg.GetClusterInfo()
1187 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1188 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1191 def call_create_x509_certificate(self, node, validity):
1192 """Creates a new X509 certificate for SSL/TLS.
1194 This is a single-node call.
1197 @param validity: Validity in seconds
1200 return self._SingleNodeCall(node, "create_x509_certificate", [validity])
1202 def call_remove_x509_certificate(self, node, name):
1203 """Removes a X509 certificate.
1205 This is a single-node call.
1208 @param name: Certificate name
1211 return self._SingleNodeCall(node, "remove_x509_certificate", [name])