4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
45 import ganeti.http.client
48 # Module level variable
53 """Initializes the module-global HTTP client manager.
55 Must be called before using any RPC function.
60 assert not _http_manager, "RPC module initialized more than once"
62 _http_manager = http.client.HttpClientManager()
66 """Stops the module-global HTTP client manager.
68 Must be called before quitting the program.
74 _http_manager.Shutdown()
78 class RpcResult(object):
81 This class holds an RPC result. It is needed since in multi-node
82 calls we can't raise an exception just because one one out of many
83 failed, and therefore we use this class to encapsulate the result.
85 @ivar data: the data payload, for successful results, or None
86 @ivar call: the name of the RPC call
87 @ivar node: the name of the node to which we made the call
88 @ivar offline: whether the operation failed because the node was
89 offline, as opposed to actual failure; offline=True will always
90 imply failed=True, in order to allow simpler checking if
91 the user doesn't care about the exact failure mode
92 @ivar fail_msg: the error message if the call failed
95 def __init__(self, data=None, failed=False, offline=False,
96 call=None, node=None):
97 self.offline = offline
102 self.fail_msg = "Node is marked offline"
103 self.data = self.payload = None
105 self.fail_msg = self._EnsureErr(data)
106 self.data = self.payload = None
109 if not isinstance(self.data, (tuple, list)):
110 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
114 self.fail_msg = ("RPC layer error: invalid result length (%d), "
115 "expected 2" % len(self.data))
117 elif not self.data[0]:
118 self.fail_msg = self._EnsureErr(self.data[1])
123 self.payload = data[1]
125 assert hasattr(self, "call")
126 assert hasattr(self, "data")
127 assert hasattr(self, "fail_msg")
128 assert hasattr(self, "node")
129 assert hasattr(self, "offline")
130 assert hasattr(self, "payload")
134 """Helper to ensure we return a 'True' value for error."""
138 return "No error information"
140 def Raise(self, msg, prereq=False, ecode=None):
141 """If the result has failed, raise an OpExecError.
143 This is used so that LU code doesn't have to check for each
144 result, but instead can call this function.
147 if not self.fail_msg:
150 if not msg: # one could pass None for default message
151 msg = ("Call '%s' to node '%s' has failed: %s" %
152 (self.call, self.node, self.fail_msg))
154 msg = "%s: %s" % (msg, self.fail_msg)
156 ec = errors.OpPrereqError
158 ec = errors.OpExecError
159 if ecode is not None:
169 This class, given a (remote) method name, a list of parameters and a
170 list of nodes, will contact (in parallel) all nodes, and return a
171 dict of results (key: node name, value: result).
173 One current bug is that generic failure is still signaled by
174 'False' result, which is not good. This overloading of values can
178 def __init__(self, procedure, body, port):
179 self.procedure = procedure
185 http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
186 ssl_cert_path=constants.SSL_CERT_FILE)
188 def ConnectList(self, node_list, address_list=None):
189 """Add a list of nodes to the target nodes.
191 @type node_list: list
192 @param node_list: the list of node names to connect
193 @type address_list: list or None
194 @keyword address_list: either None or a list with node addresses,
195 which must have the same length as the node list
198 if address_list is None:
199 address_list = [None for _ in node_list]
201 assert len(node_list) == len(address_list), \
202 "Name and address lists should have the same length"
203 for node, address in zip(node_list, address_list):
204 self.ConnectNode(node, address)
206 def ConnectNode(self, name, address=None):
207 """Add a node to the target list.
210 @param name: the node name
212 @keyword address: the node address, if known
219 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
220 "/%s" % self.procedure,
222 ssl_params=self._ssl_params,
223 ssl_verify_peer=True)
225 def GetResults(self):
226 """Call nodes and return results.
229 @return: List of RPC results
232 assert _http_manager, "RPC module not initialized"
234 _http_manager.ExecRequests(self.nc.values())
238 for name, req in self.nc.iteritems():
239 if req.success and req.resp_status_code == http.HTTP_OK:
240 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
241 node=name, call=self.procedure)
244 # TODO: Better error reporting
250 logging.error("RPC error in %s from node %s: %s",
251 self.procedure, name, msg)
252 results[name] = RpcResult(data=msg, failed=True, node=name,
258 class RpcRunner(object):
259 """RPC runner class"""
261 def __init__(self, cfg):
262 """Initialized the rpc runner.
264 @type cfg: C{config.ConfigWriter}
265 @param cfg: the configuration object that will be used to get data
270 self.port = utils.GetDaemonPort(constants.NODED)
272 def _InstDict(self, instance, hvp=None, bep=None):
273 """Convert the given instance to a dict.
275 This is done via the instance's ToDict() method and additionally
276 we fill the hvparams with the cluster defaults.
278 @type instance: L{objects.Instance}
279 @param instance: an Instance object
280 @type hvp: dict or None
281 @param hvp: a dictionary with overridden hypervisor parameters
282 @type bep: dict or None
283 @param bep: a dictionary with overridden backend parameters
285 @return: the instance dict, with the hvparams filled with the
289 idict = instance.ToDict()
290 cluster = self._cfg.GetClusterInfo()
291 idict["hvparams"] = cluster.FillHV(instance)
293 idict["hvparams"].update(hvp)
294 idict["beparams"] = cluster.FillBE(instance)
296 idict["beparams"].update(bep)
297 for nic in idict["nics"]:
298 nic['nicparams'] = objects.FillDict(
299 cluster.nicparams[constants.PP_DEFAULT],
303 def _ConnectList(self, client, node_list, call):
304 """Helper for computing node addresses.
306 @type client: L{ganeti.rpc.Client}
307 @param client: a C{Client} instance
308 @type node_list: list
309 @param node_list: the node list we should connect
311 @param call: the name of the remote procedure call, for filling in
312 correctly any eventual offline nodes' results
315 all_nodes = self._cfg.GetAllNodesInfo()
319 for node in node_list:
320 if node in all_nodes:
321 if all_nodes[node].offline:
322 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
324 val = all_nodes[node].primary_ip
327 addr_list.append(val)
328 name_list.append(node)
330 client.ConnectList(name_list, address_list=addr_list)
333 def _ConnectNode(self, client, node, call):
334 """Helper for computing one node's address.
336 @type client: L{ganeti.rpc.Client}
337 @param client: a C{Client} instance
339 @param node: the node we should connect
341 @param call: the name of the remote procedure call, for filling in
342 correctly any eventual offline nodes' results
345 node_info = self._cfg.GetNodeInfo(node)
346 if node_info is not None:
347 if node_info.offline:
348 return RpcResult(node=node, offline=True, call=call)
349 addr = node_info.primary_ip
352 client.ConnectNode(node, address=addr)
354 def _MultiNodeCall(self, node_list, procedure, args):
355 """Helper for making a multi-node call
358 body = serializer.DumpJson(args, indent=False)
359 c = Client(procedure, body, self.port)
360 skip_dict = self._ConnectList(c, node_list, procedure)
361 skip_dict.update(c.GetResults())
365 def _StaticMultiNodeCall(cls, node_list, procedure, args,
367 """Helper for making a multi-node static call
370 body = serializer.DumpJson(args, indent=False)
371 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
372 c.ConnectList(node_list, address_list=address_list)
373 return c.GetResults()
375 def _SingleNodeCall(self, node, procedure, args):
376 """Helper for making a single-node call
379 body = serializer.DumpJson(args, indent=False)
380 c = Client(procedure, body, self.port)
381 result = self._ConnectNode(c, node, procedure)
383 # we did connect, node is not offline
384 result = c.GetResults()[node]
388 def _StaticSingleNodeCall(cls, node, procedure, args):
389 """Helper for making a single-node static call
392 body = serializer.DumpJson(args, indent=False)
393 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
395 return c.GetResults()[node]
399 """Compresses a string for transport over RPC.
401 Small amounts of data are not compressed.
406 @return: Encoded data to send
409 # Small amounts of data are not compressed
411 return (constants.RPC_ENCODING_NONE, data)
413 # Compress with zlib and encode in base64
414 return (constants.RPC_ENCODING_ZLIB_BASE64,
415 base64.b64encode(zlib.compress(data, 3)))
421 def call_lv_list(self, node_list, vg_name):
422 """Gets the logical volumes present in a given volume group.
424 This is a multi-node call.
427 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
429 def call_vg_list(self, node_list):
430 """Gets the volume group list.
432 This is a multi-node call.
435 return self._MultiNodeCall(node_list, "vg_list", [])
437 def call_storage_list(self, node_list, su_name, su_args, name, fields):
438 """Get list of storage units.
440 This is a multi-node call.
443 return self._MultiNodeCall(node_list, "storage_list",
444 [su_name, su_args, name, fields])
446 def call_storage_modify(self, node, su_name, su_args, name, changes):
447 """Modify a storage unit.
449 This is a single-node call.
452 return self._SingleNodeCall(node, "storage_modify",
453 [su_name, su_args, name, changes])
455 def call_storage_execute(self, node, su_name, su_args, name, op):
456 """Executes an operation on a storage unit.
458 This is a single-node call.
461 return self._SingleNodeCall(node, "storage_execute",
462 [su_name, su_args, name, op])
464 def call_bridges_exist(self, node, bridges_list):
465 """Checks if a node has all the bridges given.
467 This method checks if all bridges given in the bridges_list are
468 present on the remote node, so that an instance that uses interfaces
469 on those bridges can be started.
471 This is a single-node call.
474 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
476 def call_instance_start(self, node, instance, hvp, bep):
477 """Starts an instance.
479 This is a single-node call.
482 idict = self._InstDict(instance, hvp=hvp, bep=bep)
483 return self._SingleNodeCall(node, "instance_start", [idict])
485 def call_instance_shutdown(self, node, instance, timeout):
486 """Stops an instance.
488 This is a single-node call.
491 return self._SingleNodeCall(node, "instance_shutdown",
492 [self._InstDict(instance), timeout])
494 def call_migration_info(self, node, instance):
495 """Gather the information necessary to prepare an instance migration.
497 This is a single-node call.
500 @param node: the node on which the instance is currently running
501 @type instance: C{objects.Instance}
502 @param instance: the instance definition
505 return self._SingleNodeCall(node, "migration_info",
506 [self._InstDict(instance)])
508 def call_accept_instance(self, node, instance, info, target):
509 """Prepare a node to accept an instance.
511 This is a single-node call.
514 @param node: the target node for the migration
515 @type instance: C{objects.Instance}
516 @param instance: the instance definition
517 @type info: opaque/hypervisor specific (string/data)
518 @param info: result for the call_migration_info call
520 @param target: target hostname (usually ip address) (on the node itself)
523 return self._SingleNodeCall(node, "accept_instance",
524 [self._InstDict(instance), info, target])
526 def call_finalize_migration(self, node, instance, info, success):
527 """Finalize any target-node migration specific operation.
529 This is called both in case of a successful migration and in case of error
530 (in which case it should abort the migration).
532 This is a single-node call.
535 @param node: the target node for the migration
536 @type instance: C{objects.Instance}
537 @param instance: the instance definition
538 @type info: opaque/hypervisor specific (string/data)
539 @param info: result for the call_migration_info call
540 @type success: boolean
541 @param success: whether the migration was a success or a failure
544 return self._SingleNodeCall(node, "finalize_migration",
545 [self._InstDict(instance), info, success])
547 def call_instance_migrate(self, node, instance, target, live):
548 """Migrate an instance.
550 This is a single-node call.
553 @param node: the node on which the instance is currently running
554 @type instance: C{objects.Instance}
555 @param instance: the instance definition
557 @param target: the target node name
559 @param live: whether the migration should be done live or not (the
560 interpretation of this parameter is left to the hypervisor)
563 return self._SingleNodeCall(node, "instance_migrate",
564 [self._InstDict(instance), target, live])
566 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
567 """Reboots an instance.
569 This is a single-node call.
572 return self._SingleNodeCall(node, "instance_reboot",
573 [self._InstDict(inst), reboot_type,
576 def call_instance_os_add(self, node, inst, reinstall):
577 """Installs an OS on the given instance.
579 This is a single-node call.
582 return self._SingleNodeCall(node, "instance_os_add",
583 [self._InstDict(inst), reinstall])
585 def call_instance_run_rename(self, node, inst, old_name):
586 """Run the OS rename script for an instance.
588 This is a single-node call.
591 return self._SingleNodeCall(node, "instance_run_rename",
592 [self._InstDict(inst), old_name])
594 def call_instance_info(self, node, instance, hname):
595 """Returns information about a single instance.
597 This is a single-node call.
600 @param node: the list of nodes to query
601 @type instance: string
602 @param instance: the instance name
604 @param hname: the hypervisor type of the instance
607 return self._SingleNodeCall(node, "instance_info", [instance, hname])
609 def call_instance_migratable(self, node, instance):
610 """Checks whether the given instance can be migrated.
612 This is a single-node call.
614 @param node: the node to query
615 @type instance: L{objects.Instance}
616 @param instance: the instance to check
620 return self._SingleNodeCall(node, "instance_migratable",
621 [self._InstDict(instance)])
623 def call_all_instances_info(self, node_list, hypervisor_list):
624 """Returns information about all instances on the given nodes.
626 This is a multi-node call.
628 @type node_list: list
629 @param node_list: the list of nodes to query
630 @type hypervisor_list: list
631 @param hypervisor_list: the hypervisors to query for instances
634 return self._MultiNodeCall(node_list, "all_instances_info",
637 def call_instance_list(self, node_list, hypervisor_list):
638 """Returns the list of running instances on a given node.
640 This is a multi-node call.
642 @type node_list: list
643 @param node_list: the list of nodes to query
644 @type hypervisor_list: list
645 @param hypervisor_list: the hypervisors to query for instances
648 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
650 def call_node_tcp_ping(self, node, source, target, port, timeout,
652 """Do a TcpPing on the remote node
654 This is a single-node call.
657 return self._SingleNodeCall(node, "node_tcp_ping",
658 [source, target, port, timeout,
661 def call_node_has_ip_address(self, node, address):
662 """Checks if a node has the given IP address.
664 This is a single-node call.
667 return self._SingleNodeCall(node, "node_has_ip_address", [address])
669 def call_node_info(self, node_list, vg_name, hypervisor_type):
670 """Return node information.
672 This will return memory information and volume group size and free
675 This is a multi-node call.
677 @type node_list: list
678 @param node_list: the list of nodes to query
679 @type vg_name: C{string}
680 @param vg_name: the name of the volume group to ask for disk space
682 @type hypervisor_type: C{str}
683 @param hypervisor_type: the name of the hypervisor to ask for
687 return self._MultiNodeCall(node_list, "node_info",
688 [vg_name, hypervisor_type])
690 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
691 """Add a node to the cluster.
693 This is a single-node call.
696 return self._SingleNodeCall(node, "node_add",
697 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
699 def call_node_verify(self, node_list, checkdict, cluster_name):
700 """Request verification of given parameters.
702 This is a multi-node call.
705 return self._MultiNodeCall(node_list, "node_verify",
706 [checkdict, cluster_name])
709 def call_node_start_master(cls, node, start_daemons, no_voting):
710 """Tells a node to activate itself as a master.
712 This is a single-node call.
715 return cls._StaticSingleNodeCall(node, "node_start_master",
716 [start_daemons, no_voting])
719 def call_node_stop_master(cls, node, stop_daemons):
720 """Tells a node to demote itself from master status.
722 This is a single-node call.
725 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
728 def call_master_info(cls, node_list):
729 """Query master info.
731 This is a multi-node call.
734 # TODO: should this method query down nodes?
735 return cls._StaticMultiNodeCall(node_list, "master_info", [])
738 def call_version(cls, node_list):
739 """Query node version.
741 This is a multi-node call.
744 return cls._StaticMultiNodeCall(node_list, "version", [])
746 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
747 """Request creation of a given block device.
749 This is a single-node call.
752 return self._SingleNodeCall(node, "blockdev_create",
753 [bdev.ToDict(), size, owner, on_primary, info])
755 def call_blockdev_remove(self, node, bdev):
756 """Request removal of a given block device.
758 This is a single-node call.
761 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
763 def call_blockdev_rename(self, node, devlist):
764 """Request rename of the given block devices.
766 This is a single-node call.
769 return self._SingleNodeCall(node, "blockdev_rename",
770 [(d.ToDict(), uid) for d, uid in devlist])
772 def call_blockdev_assemble(self, node, disk, owner, on_primary):
773 """Request assembling of a given block device.
775 This is a single-node call.
778 return self._SingleNodeCall(node, "blockdev_assemble",
779 [disk.ToDict(), owner, on_primary])
781 def call_blockdev_shutdown(self, node, disk):
782 """Request shutdown of a given block device.
784 This is a single-node call.
787 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
789 def call_blockdev_addchildren(self, node, bdev, ndevs):
790 """Request adding a list of children to a (mirroring) device.
792 This is a single-node call.
795 return self._SingleNodeCall(node, "blockdev_addchildren",
797 [disk.ToDict() for disk in ndevs]])
799 def call_blockdev_removechildren(self, node, bdev, ndevs):
800 """Request removing a list of children from a (mirroring) device.
802 This is a single-node call.
805 return self._SingleNodeCall(node, "blockdev_removechildren",
807 [disk.ToDict() for disk in ndevs]])
809 def call_blockdev_getmirrorstatus(self, node, disks):
810 """Request status of a (mirroring) device.
812 This is a single-node call.
815 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
816 [dsk.ToDict() for dsk in disks])
817 if not result.fail_msg:
818 result.payload = [objects.BlockDevStatus.FromDict(i)
819 for i in result.payload]
822 def call_blockdev_find(self, node, disk):
823 """Request identification of a given block device.
825 This is a single-node call.
828 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
829 if not result.fail_msg and result.payload is not None:
830 result.payload = objects.BlockDevStatus.FromDict(result.payload)
833 def call_blockdev_close(self, node, instance_name, disks):
834 """Closes the given block devices.
836 This is a single-node call.
839 params = [instance_name, [cf.ToDict() for cf in disks]]
840 return self._SingleNodeCall(node, "blockdev_close", params)
842 def call_blockdev_getsizes(self, node, disks):
843 """Returns the size of the given disks.
845 This is a single-node call.
848 params = [[cf.ToDict() for cf in disks]]
849 return self._SingleNodeCall(node, "blockdev_getsize", params)
851 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
852 """Disconnects the network of the given drbd devices.
854 This is a multi-node call.
857 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
858 [nodes_ip, [cf.ToDict() for cf in disks]])
860 def call_drbd_attach_net(self, node_list, nodes_ip,
861 disks, instance_name, multimaster):
862 """Disconnects the given drbd devices.
864 This is a multi-node call.
867 return self._MultiNodeCall(node_list, "drbd_attach_net",
868 [nodes_ip, [cf.ToDict() for cf in disks],
869 instance_name, multimaster])
871 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
872 """Waits for the synchronization of drbd devices is complete.
874 This is a multi-node call.
877 return self._MultiNodeCall(node_list, "drbd_wait_sync",
878 [nodes_ip, [cf.ToDict() for cf in disks]])
881 def call_upload_file(cls, node_list, file_name, address_list=None):
884 The node will refuse the operation in case the file is not on the
887 This is a multi-node call.
889 @type node_list: list
890 @param node_list: the list of node names to upload to
892 @param file_name: the filename to upload
893 @type address_list: list or None
894 @keyword address_list: an optional list of node addresses, in order
895 to optimize the RPC speed
898 file_contents = utils.ReadFile(file_name)
899 data = cls._Compress(file_contents)
900 st = os.stat(file_name)
901 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
902 st.st_atime, st.st_mtime]
903 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
904 address_list=address_list)
907 def call_write_ssconf_files(cls, node_list, values):
908 """Write ssconf files.
910 This is a multi-node call.
913 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
915 def call_os_diagnose(self, node_list):
916 """Request a diagnose of OS definitions.
918 This is a multi-node call.
921 return self._MultiNodeCall(node_list, "os_diagnose", [])
923 def call_os_get(self, node, name):
924 """Returns an OS definition.
926 This is a single-node call.
929 result = self._SingleNodeCall(node, "os_get", [name])
930 if not result.fail_msg and isinstance(result.payload, dict):
931 result.payload = objects.OS.FromDict(result.payload)
934 def call_hooks_runner(self, node_list, hpath, phase, env):
935 """Call the hooks runner.
938 - op: the OpCode instance
939 - env: a dictionary with the environment
941 This is a multi-node call.
944 params = [hpath, phase, env]
945 return self._MultiNodeCall(node_list, "hooks_runner", params)
947 def call_iallocator_runner(self, node, name, idata):
948 """Call an iallocator on a remote node
951 - name: the iallocator name
952 - input: the json-encoded input string
954 This is a single-node call.
957 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
959 def call_blockdev_grow(self, node, cf_bdev, amount):
960 """Request a snapshot of the given block device.
962 This is a single-node call.
965 return self._SingleNodeCall(node, "blockdev_grow",
966 [cf_bdev.ToDict(), amount])
968 def call_blockdev_export(self, node, cf_bdev,
969 dest_node, dest_path, cluster_name):
970 """Export a given disk to another node.
972 This is a single-node call.
975 return self._SingleNodeCall(node, "blockdev_export",
976 [cf_bdev.ToDict(), dest_node, dest_path,
979 def call_blockdev_snapshot(self, node, cf_bdev):
980 """Request a snapshot of the given block device.
982 This is a single-node call.
985 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
987 def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
989 """Request the export of a given snapshot.
991 This is a single-node call.
994 return self._SingleNodeCall(node, "snapshot_export",
995 [snap_bdev.ToDict(), dest_node,
996 self._InstDict(instance), cluster_name, idx])
998 def call_finalize_export(self, node, instance, snap_disks):
999 """Request the completion of an export operation.
1001 This writes the export config file, etc.
1003 This is a single-node call.
1007 for disk in snap_disks:
1008 if isinstance(disk, bool):
1009 flat_disks.append(disk)
1011 flat_disks.append(disk.ToDict())
1013 return self._SingleNodeCall(node, "finalize_export",
1014 [self._InstDict(instance), flat_disks])
1016 def call_export_info(self, node, path):
1017 """Queries the export information in a given path.
1019 This is a single-node call.
1022 return self._SingleNodeCall(node, "export_info", [path])
1024 def call_instance_os_import(self, node, inst, src_node, src_images,
1026 """Request the import of a backup into an instance.
1028 This is a single-node call.
1031 return self._SingleNodeCall(node, "instance_os_import",
1032 [self._InstDict(inst), src_node, src_images,
1035 def call_export_list(self, node_list):
1036 """Gets the stored exports list.
1038 This is a multi-node call.
1041 return self._MultiNodeCall(node_list, "export_list", [])
1043 def call_export_remove(self, node, export):
1044 """Requests removal of a given export.
1046 This is a single-node call.
1049 return self._SingleNodeCall(node, "export_remove", [export])
1052 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1053 """Requests a node to clean the cluster information it has.
1055 This will remove the configuration information from the ganeti data
1058 This is a single-node call.
1061 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1064 def call_node_volumes(self, node_list):
1065 """Gets all volumes on node(s).
1067 This is a multi-node call.
1070 return self._MultiNodeCall(node_list, "node_volumes", [])
1072 def call_node_demote_from_mc(self, node):
1073 """Demote a node from the master candidate role.
1075 This is a single-node call.
1078 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1081 def call_node_powercycle(self, node, hypervisor):
1082 """Tries to powercycle a node.
1084 This is a single-node call.
1087 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1090 def call_test_delay(self, node_list, duration):
1091 """Sleep for a fixed time on given node(s).
1093 This is a multi-node call.
1096 return self._MultiNodeCall(node_list, "test_delay", [duration])
1098 def call_file_storage_dir_create(self, node, file_storage_dir):
1099 """Create the given file storage directory.
1101 This is a single-node call.
1104 return self._SingleNodeCall(node, "file_storage_dir_create",
1107 def call_file_storage_dir_remove(self, node, file_storage_dir):
1108 """Remove the given file storage directory.
1110 This is a single-node call.
1113 return self._SingleNodeCall(node, "file_storage_dir_remove",
1116 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1117 new_file_storage_dir):
1118 """Rename file storage directory.
1120 This is a single-node call.
1123 return self._SingleNodeCall(node, "file_storage_dir_rename",
1124 [old_file_storage_dir, new_file_storage_dir])
1127 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1128 """Update job queue.
1130 This is a multi-node call.
1133 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1134 [file_name, cls._Compress(content)],
1135 address_list=address_list)
1138 def call_jobqueue_purge(cls, node):
1141 This is a single-node call.
1144 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1147 def call_jobqueue_rename(cls, node_list, address_list, rename):
1148 """Rename a job queue file.
1150 This is a multi-node call.
1153 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1154 address_list=address_list)
1157 def call_jobqueue_set_drain(cls, node_list, drain_flag):
1158 """Set the drain flag on the queue.
1160 This is a multi-node call.
1162 @type node_list: list
1163 @param node_list: the list of nodes to query
1164 @type drain_flag: bool
1165 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1168 return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1171 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1172 """Validate the hypervisor params.
1174 This is a multi-node call.
1176 @type node_list: list
1177 @param node_list: the list of nodes to query
1178 @type hvname: string
1179 @param hvname: the hypervisor name
1180 @type hvparams: dict
1181 @param hvparams: the hypervisor parameters to be validated
1184 cluster = self._cfg.GetClusterInfo()
1185 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1186 return self._MultiNodeCall(node_list, "hypervisor_validate_params",