4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
45 import ganeti.http.client
48 # Module level variable
53 """Initializes the module-global HTTP client manager.
55 Must be called before using any RPC function.
60 assert not _http_manager, "RPC module initialized more than once"
62 _http_manager = http.client.HttpClientManager()
66 """Stops the module-global HTTP client manager.
68 Must be called before quitting the program.
74 _http_manager.Shutdown()
78 class RpcResult(object):
81 This class holds an RPC result. It is needed since in multi-node
82 calls we can't raise an exception just because one one out of many
83 failed, and therefore we use this class to encapsulate the result.
85 @ivar data: the data payload, for successful results, or None
86 @ivar call: the name of the RPC call
87 @ivar node: the name of the node to which we made the call
88 @ivar offline: whether the operation failed because the node was
89 offline, as opposed to actual failure; offline=True will always
90 imply failed=True, in order to allow simpler checking if
91 the user doesn't care about the exact failure mode
92 @ivar fail_msg: the error message if the call failed
95 def __init__(self, data=None, failed=False, offline=False,
96 call=None, node=None):
97 self.offline = offline
101 self.fail_msg = "Node is marked offline"
102 self.data = self.payload = None
104 self.fail_msg = self._EnsureErr(data)
105 self.data = self.payload = None
108 if not isinstance(self.data, (tuple, list)):
109 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
112 self.fail_msg = ("RPC layer error: invalid result length (%d), "
113 "expected 2" % len(self.data))
114 elif not self.data[0]:
115 self.fail_msg = self._EnsureErr(self.data[1])
119 self.payload = data[1]
123 """Helper to ensure we return a 'True' value for error."""
127 return "No error information"
129 def Raise(self, msg, prereq=False):
130 """If the result has failed, raise an OpExecError.
132 This is used so that LU code doesn't have to check for each
133 result, but instead can call this function.
136 if not self.fail_msg:
139 if not msg: # one could pass None for default message
140 msg = ("Call '%s' to node '%s' has failed: %s" %
141 (self.call, self.node, self.fail_msg))
143 msg = "%s: %s" % (msg, self.fail_msg)
145 ec = errors.OpPrereqError
147 ec = errors.OpExecError
154 This class, given a (remote) method name, a list of parameters and a
155 list of nodes, will contact (in parallel) all nodes, and return a
156 dict of results (key: node name, value: result).
158 One current bug is that generic failure is still signaled by
159 'False' result, which is not good. This overloading of values can
163 def __init__(self, procedure, body, port):
164 self.procedure = procedure
170 http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
171 ssl_cert_path=constants.SSL_CERT_FILE)
173 def ConnectList(self, node_list, address_list=None):
174 """Add a list of nodes to the target nodes.
176 @type node_list: list
177 @param node_list: the list of node names to connect
178 @type address_list: list or None
179 @keyword address_list: either None or a list with node addresses,
180 which must have the same length as the node list
183 if address_list is None:
184 address_list = [None for _ in node_list]
186 assert len(node_list) == len(address_list), \
187 "Name and address lists should have the same length"
188 for node, address in zip(node_list, address_list):
189 self.ConnectNode(node, address)
191 def ConnectNode(self, name, address=None):
192 """Add a node to the target list.
195 @param name: the node name
197 @keyword address: the node address, if known
204 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
205 "/%s" % self.procedure,
207 ssl_params=self._ssl_params,
208 ssl_verify_peer=True)
210 def GetResults(self):
211 """Call nodes and return results.
214 @return: List of RPC results
217 assert _http_manager, "RPC module not initialized"
219 _http_manager.ExecRequests(self.nc.values())
223 for name, req in self.nc.iteritems():
224 if req.success and req.resp_status_code == http.HTTP_OK:
225 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
226 node=name, call=self.procedure)
229 # TODO: Better error reporting
235 logging.error("RPC error in %s from node %s: %s",
236 self.procedure, name, msg)
237 results[name] = RpcResult(data=msg, failed=True, node=name,
243 class RpcRunner(object):
244 """RPC runner class"""
246 def __init__(self, cfg):
247 """Initialized the rpc runner.
249 @type cfg: C{config.ConfigWriter}
250 @param cfg: the configuration object that will be used to get data
255 self.port = utils.GetDaemonPort(constants.NODED)
257 def _InstDict(self, instance, hvp=None, bep=None):
258 """Convert the given instance to a dict.
260 This is done via the instance's ToDict() method and additionally
261 we fill the hvparams with the cluster defaults.
263 @type instance: L{objects.Instance}
264 @param instance: an Instance object
265 @type hvp: dict or None
266 @param hvp: a dictionary with overridden hypervisor parameters
267 @type bep: dict or None
268 @param bep: a dictionary with overridden backend parameters
270 @return: the instance dict, with the hvparams filled with the
274 idict = instance.ToDict()
275 cluster = self._cfg.GetClusterInfo()
276 idict["hvparams"] = cluster.FillHV(instance)
278 idict["hvparams"].update(hvp)
279 idict["beparams"] = cluster.FillBE(instance)
281 idict["beparams"].update(bep)
282 for nic in idict["nics"]:
283 nic['nicparams'] = objects.FillDict(
284 cluster.nicparams[constants.PP_DEFAULT],
288 def _ConnectList(self, client, node_list, call):
289 """Helper for computing node addresses.
291 @type client: L{ganeti.rpc.Client}
292 @param client: a C{Client} instance
293 @type node_list: list
294 @param node_list: the node list we should connect
296 @param call: the name of the remote procedure call, for filling in
297 correctly any eventual offline nodes' results
300 all_nodes = self._cfg.GetAllNodesInfo()
304 for node in node_list:
305 if node in all_nodes:
306 if all_nodes[node].offline:
307 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
309 val = all_nodes[node].primary_ip
312 addr_list.append(val)
313 name_list.append(node)
315 client.ConnectList(name_list, address_list=addr_list)
318 def _ConnectNode(self, client, node, call):
319 """Helper for computing one node's address.
321 @type client: L{ganeti.rpc.Client}
322 @param client: a C{Client} instance
324 @param node: the node we should connect
326 @param call: the name of the remote procedure call, for filling in
327 correctly any eventual offline nodes' results
330 node_info = self._cfg.GetNodeInfo(node)
331 if node_info is not None:
332 if node_info.offline:
333 return RpcResult(node=node, offline=True, call=call)
334 addr = node_info.primary_ip
337 client.ConnectNode(node, address=addr)
339 def _MultiNodeCall(self, node_list, procedure, args):
340 """Helper for making a multi-node call
343 body = serializer.DumpJson(args, indent=False)
344 c = Client(procedure, body, self.port)
345 skip_dict = self._ConnectList(c, node_list, procedure)
346 skip_dict.update(c.GetResults())
350 def _StaticMultiNodeCall(cls, node_list, procedure, args,
352 """Helper for making a multi-node static call
355 body = serializer.DumpJson(args, indent=False)
356 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
357 c.ConnectList(node_list, address_list=address_list)
358 return c.GetResults()
360 def _SingleNodeCall(self, node, procedure, args):
361 """Helper for making a single-node call
364 body = serializer.DumpJson(args, indent=False)
365 c = Client(procedure, body, self.port)
366 result = self._ConnectNode(c, node, procedure)
368 # we did connect, node is not offline
369 result = c.GetResults()[node]
373 def _StaticSingleNodeCall(cls, node, procedure, args):
374 """Helper for making a single-node static call
377 body = serializer.DumpJson(args, indent=False)
378 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
380 return c.GetResults()[node]
384 """Compresses a string for transport over RPC.
386 Small amounts of data are not compressed.
391 @return: Encoded data to send
394 # Small amounts of data are not compressed
396 return (constants.RPC_ENCODING_NONE, data)
398 # Compress with zlib and encode in base64
399 return (constants.RPC_ENCODING_ZLIB_BASE64,
400 base64.b64encode(zlib.compress(data, 3)))
406 def call_lv_list(self, node_list, vg_name):
407 """Gets the logical volumes present in a given volume group.
409 This is a multi-node call.
412 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
414 def call_vg_list(self, node_list):
415 """Gets the volume group list.
417 This is a multi-node call.
420 return self._MultiNodeCall(node_list, "vg_list", [])
422 def call_storage_list(self, node_list, su_name, su_args, name, fields):
423 """Get list of storage units.
425 This is a multi-node call.
428 return self._MultiNodeCall(node_list, "storage_list",
429 [su_name, su_args, name, fields])
431 def call_storage_modify(self, node, su_name, su_args, name, changes):
432 """Modify a storage unit.
434 This is a single-node call.
437 return self._SingleNodeCall(node, "storage_modify",
438 [su_name, su_args, name, changes])
440 def call_storage_execute(self, node, su_name, su_args, name, op):
441 """Executes an operation on a storage unit.
443 This is a single-node call.
446 return self._SingleNodeCall(node, "storage_execute",
447 [su_name, su_args, name, op])
449 def call_bridges_exist(self, node, bridges_list):
450 """Checks if a node has all the bridges given.
452 This method checks if all bridges given in the bridges_list are
453 present on the remote node, so that an instance that uses interfaces
454 on those bridges can be started.
456 This is a single-node call.
459 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
461 def call_instance_start(self, node, instance, hvp, bep):
462 """Starts an instance.
464 This is a single-node call.
467 idict = self._InstDict(instance, hvp=hvp, bep=bep)
468 return self._SingleNodeCall(node, "instance_start", [idict])
470 def call_instance_shutdown(self, node, instance, timeout):
471 """Stops an instance.
473 This is a single-node call.
476 return self._SingleNodeCall(node, "instance_shutdown",
477 [self._InstDict(instance), timeout])
479 def call_migration_info(self, node, instance):
480 """Gather the information necessary to prepare an instance migration.
482 This is a single-node call.
485 @param node: the node on which the instance is currently running
486 @type instance: C{objects.Instance}
487 @param instance: the instance definition
490 return self._SingleNodeCall(node, "migration_info",
491 [self._InstDict(instance)])
493 def call_accept_instance(self, node, instance, info, target):
494 """Prepare a node to accept an instance.
496 This is a single-node call.
499 @param node: the target node for the migration
500 @type instance: C{objects.Instance}
501 @param instance: the instance definition
502 @type info: opaque/hypervisor specific (string/data)
503 @param info: result for the call_migration_info call
505 @param target: target hostname (usually ip address) (on the node itself)
508 return self._SingleNodeCall(node, "accept_instance",
509 [self._InstDict(instance), info, target])
511 def call_finalize_migration(self, node, instance, info, success):
512 """Finalize any target-node migration specific operation.
514 This is called both in case of a successful migration and in case of error
515 (in which case it should abort the migration).
517 This is a single-node call.
520 @param node: the target node for the migration
521 @type instance: C{objects.Instance}
522 @param instance: the instance definition
523 @type info: opaque/hypervisor specific (string/data)
524 @param info: result for the call_migration_info call
525 @type success: boolean
526 @param success: whether the migration was a success or a failure
529 return self._SingleNodeCall(node, "finalize_migration",
530 [self._InstDict(instance), info, success])
532 def call_instance_migrate(self, node, instance, target, live):
533 """Migrate an instance.
535 This is a single-node call.
538 @param node: the node on which the instance is currently running
539 @type instance: C{objects.Instance}
540 @param instance: the instance definition
542 @param target: the target node name
544 @param live: whether the migration should be done live or not (the
545 interpretation of this parameter is left to the hypervisor)
548 return self._SingleNodeCall(node, "instance_migrate",
549 [self._InstDict(instance), target, live])
551 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
552 """Reboots an instance.
554 This is a single-node call.
557 return self._SingleNodeCall(node, "instance_reboot",
558 [self._InstDict(inst), reboot_type,
561 def call_instance_os_add(self, node, inst, reinstall):
562 """Installs an OS on the given instance.
564 This is a single-node call.
567 return self._SingleNodeCall(node, "instance_os_add",
568 [self._InstDict(inst), reinstall])
570 def call_instance_run_rename(self, node, inst, old_name):
571 """Run the OS rename script for an instance.
573 This is a single-node call.
576 return self._SingleNodeCall(node, "instance_run_rename",
577 [self._InstDict(inst), old_name])
579 def call_instance_info(self, node, instance, hname):
580 """Returns information about a single instance.
582 This is a single-node call.
585 @param node: the list of nodes to query
586 @type instance: string
587 @param instance: the instance name
589 @param hname: the hypervisor type of the instance
592 return self._SingleNodeCall(node, "instance_info", [instance, hname])
594 def call_instance_migratable(self, node, instance):
595 """Checks whether the given instance can be migrated.
597 This is a single-node call.
599 @param node: the node to query
600 @type instance: L{objects.Instance}
601 @param instance: the instance to check
605 return self._SingleNodeCall(node, "instance_migratable",
606 [self._InstDict(instance)])
608 def call_all_instances_info(self, node_list, hypervisor_list):
609 """Returns information about all instances on the given nodes.
611 This is a multi-node call.
613 @type node_list: list
614 @param node_list: the list of nodes to query
615 @type hypervisor_list: list
616 @param hypervisor_list: the hypervisors to query for instances
619 return self._MultiNodeCall(node_list, "all_instances_info",
622 def call_instance_list(self, node_list, hypervisor_list):
623 """Returns the list of running instances on a given node.
625 This is a multi-node call.
627 @type node_list: list
628 @param node_list: the list of nodes to query
629 @type hypervisor_list: list
630 @param hypervisor_list: the hypervisors to query for instances
633 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
635 def call_node_tcp_ping(self, node, source, target, port, timeout,
637 """Do a TcpPing on the remote node
639 This is a single-node call.
642 return self._SingleNodeCall(node, "node_tcp_ping",
643 [source, target, port, timeout,
646 def call_node_has_ip_address(self, node, address):
647 """Checks if a node has the given IP address.
649 This is a single-node call.
652 return self._SingleNodeCall(node, "node_has_ip_address", [address])
654 def call_node_info(self, node_list, vg_name, hypervisor_type):
655 """Return node information.
657 This will return memory information and volume group size and free
660 This is a multi-node call.
662 @type node_list: list
663 @param node_list: the list of nodes to query
664 @type vg_name: C{string}
665 @param vg_name: the name of the volume group to ask for disk space
667 @type hypervisor_type: C{str}
668 @param hypervisor_type: the name of the hypervisor to ask for
672 return self._MultiNodeCall(node_list, "node_info",
673 [vg_name, hypervisor_type])
675 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
676 """Add a node to the cluster.
678 This is a single-node call.
681 return self._SingleNodeCall(node, "node_add",
682 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
684 def call_node_verify(self, node_list, checkdict, cluster_name):
685 """Request verification of given parameters.
687 This is a multi-node call.
690 return self._MultiNodeCall(node_list, "node_verify",
691 [checkdict, cluster_name])
694 def call_node_start_master(cls, node, start_daemons, no_voting):
695 """Tells a node to activate itself as a master.
697 This is a single-node call.
700 return cls._StaticSingleNodeCall(node, "node_start_master",
701 [start_daemons, no_voting])
704 def call_node_stop_master(cls, node, stop_daemons):
705 """Tells a node to demote itself from master status.
707 This is a single-node call.
710 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
713 def call_master_info(cls, node_list):
714 """Query master info.
716 This is a multi-node call.
719 # TODO: should this method query down nodes?
720 return cls._StaticMultiNodeCall(node_list, "master_info", [])
722 def call_version(self, node_list):
723 """Query node version.
725 This is a multi-node call.
728 return self._MultiNodeCall(node_list, "version", [])
730 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
731 """Request creation of a given block device.
733 This is a single-node call.
736 return self._SingleNodeCall(node, "blockdev_create",
737 [bdev.ToDict(), size, owner, on_primary, info])
739 def call_blockdev_remove(self, node, bdev):
740 """Request removal of a given block device.
742 This is a single-node call.
745 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
747 def call_blockdev_rename(self, node, devlist):
748 """Request rename of the given block devices.
750 This is a single-node call.
753 return self._SingleNodeCall(node, "blockdev_rename",
754 [(d.ToDict(), uid) for d, uid in devlist])
756 def call_blockdev_assemble(self, node, disk, owner, on_primary):
757 """Request assembling of a given block device.
759 This is a single-node call.
762 return self._SingleNodeCall(node, "blockdev_assemble",
763 [disk.ToDict(), owner, on_primary])
765 def call_blockdev_shutdown(self, node, disk):
766 """Request shutdown of a given block device.
768 This is a single-node call.
771 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
773 def call_blockdev_addchildren(self, node, bdev, ndevs):
774 """Request adding a list of children to a (mirroring) device.
776 This is a single-node call.
779 return self._SingleNodeCall(node, "blockdev_addchildren",
781 [disk.ToDict() for disk in ndevs]])
783 def call_blockdev_removechildren(self, node, bdev, ndevs):
784 """Request removing a list of children from a (mirroring) device.
786 This is a single-node call.
789 return self._SingleNodeCall(node, "blockdev_removechildren",
791 [disk.ToDict() for disk in ndevs]])
793 def call_blockdev_getmirrorstatus(self, node, disks):
794 """Request status of a (mirroring) device.
796 This is a single-node call.
799 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
800 [dsk.ToDict() for dsk in disks])
801 if not result.fail_msg:
802 result.payload = [objects.BlockDevStatus.FromDict(i)
803 for i in result.payload]
806 def call_blockdev_find(self, node, disk):
807 """Request identification of a given block device.
809 This is a single-node call.
812 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
813 if not result.fail_msg and result.payload is not None:
814 result.payload = objects.BlockDevStatus.FromDict(result.payload)
817 def call_blockdev_close(self, node, instance_name, disks):
818 """Closes the given block devices.
820 This is a single-node call.
823 params = [instance_name, [cf.ToDict() for cf in disks]]
824 return self._SingleNodeCall(node, "blockdev_close", params)
826 def call_blockdev_getsizes(self, node, disks):
827 """Returns the size of the given disks.
829 This is a single-node call.
832 params = [[cf.ToDict() for cf in disks]]
833 return self._SingleNodeCall(node, "blockdev_getsize", params)
835 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
836 """Disconnects the network of the given drbd devices.
838 This is a multi-node call.
841 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
842 [nodes_ip, [cf.ToDict() for cf in disks]])
844 def call_drbd_attach_net(self, node_list, nodes_ip,
845 disks, instance_name, multimaster):
846 """Disconnects the given drbd devices.
848 This is a multi-node call.
851 return self._MultiNodeCall(node_list, "drbd_attach_net",
852 [nodes_ip, [cf.ToDict() for cf in disks],
853 instance_name, multimaster])
855 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
856 """Waits for the synchronization of drbd devices is complete.
858 This is a multi-node call.
861 return self._MultiNodeCall(node_list, "drbd_wait_sync",
862 [nodes_ip, [cf.ToDict() for cf in disks]])
865 def call_upload_file(cls, node_list, file_name, address_list=None):
868 The node will refuse the operation in case the file is not on the
871 This is a multi-node call.
873 @type node_list: list
874 @param node_list: the list of node names to upload to
876 @param file_name: the filename to upload
877 @type address_list: list or None
878 @keyword address_list: an optional list of node addresses, in order
879 to optimize the RPC speed
882 file_contents = utils.ReadFile(file_name)
883 data = cls._Compress(file_contents)
884 st = os.stat(file_name)
885 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
886 st.st_atime, st.st_mtime]
887 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
888 address_list=address_list)
891 def call_write_ssconf_files(cls, node_list, values):
892 """Write ssconf files.
894 This is a multi-node call.
897 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
899 def call_os_diagnose(self, node_list):
900 """Request a diagnose of OS definitions.
902 This is a multi-node call.
905 return self._MultiNodeCall(node_list, "os_diagnose", [])
907 def call_os_get(self, node, name):
908 """Returns an OS definition.
910 This is a single-node call.
913 result = self._SingleNodeCall(node, "os_get", [name])
914 if not result.fail_msg and isinstance(result.payload, dict):
915 result.payload = objects.OS.FromDict(result.payload)
918 def call_hooks_runner(self, node_list, hpath, phase, env):
919 """Call the hooks runner.
922 - op: the OpCode instance
923 - env: a dictionary with the environment
925 This is a multi-node call.
928 params = [hpath, phase, env]
929 return self._MultiNodeCall(node_list, "hooks_runner", params)
931 def call_iallocator_runner(self, node, name, idata):
932 """Call an iallocator on a remote node
935 - name: the iallocator name
936 - input: the json-encoded input string
938 This is a single-node call.
941 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
943 def call_blockdev_grow(self, node, cf_bdev, amount):
944 """Request a snapshot of the given block device.
946 This is a single-node call.
949 return self._SingleNodeCall(node, "blockdev_grow",
950 [cf_bdev.ToDict(), amount])
952 def call_blockdev_export(self, node, cf_bdev,
953 dest_node, dest_path, cluster_name):
954 """Export a given disk to another node.
956 This is a single-node call.
959 return self._SingleNodeCall(node, "blockdev_export",
960 [cf_bdev.ToDict(), dest_node, dest_path,
963 def call_blockdev_snapshot(self, node, cf_bdev):
964 """Request a snapshot of the given block device.
966 This is a single-node call.
969 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
971 def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
973 """Request the export of a given snapshot.
975 This is a single-node call.
978 return self._SingleNodeCall(node, "snapshot_export",
979 [snap_bdev.ToDict(), dest_node,
980 self._InstDict(instance), cluster_name, idx])
982 def call_finalize_export(self, node, instance, snap_disks):
983 """Request the completion of an export operation.
985 This writes the export config file, etc.
987 This is a single-node call.
991 for disk in snap_disks:
992 if isinstance(disk, bool):
993 flat_disks.append(disk)
995 flat_disks.append(disk.ToDict())
997 return self._SingleNodeCall(node, "finalize_export",
998 [self._InstDict(instance), flat_disks])
1000 def call_export_info(self, node, path):
1001 """Queries the export information in a given path.
1003 This is a single-node call.
1006 return self._SingleNodeCall(node, "export_info", [path])
1008 def call_instance_os_import(self, node, inst, src_node, src_images,
1010 """Request the import of a backup into an instance.
1012 This is a single-node call.
1015 return self._SingleNodeCall(node, "instance_os_import",
1016 [self._InstDict(inst), src_node, src_images,
1019 def call_export_list(self, node_list):
1020 """Gets the stored exports list.
1022 This is a multi-node call.
1025 return self._MultiNodeCall(node_list, "export_list", [])
1027 def call_export_remove(self, node, export):
1028 """Requests removal of a given export.
1030 This is a single-node call.
1033 return self._SingleNodeCall(node, "export_remove", [export])
1036 def call_node_leave_cluster(cls, node):
1037 """Requests a node to clean the cluster information it has.
1039 This will remove the configuration information from the ganeti data
1042 This is a single-node call.
1045 return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1047 def call_node_volumes(self, node_list):
1048 """Gets all volumes on node(s).
1050 This is a multi-node call.
1053 return self._MultiNodeCall(node_list, "node_volumes", [])
1055 def call_node_demote_from_mc(self, node):
1056 """Demote a node from the master candidate role.
1058 This is a single-node call.
1061 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1064 def call_node_powercycle(self, node, hypervisor):
1065 """Tries to powercycle a node.
1067 This is a single-node call.
1070 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1073 def call_test_delay(self, node_list, duration):
1074 """Sleep for a fixed time on given node(s).
1076 This is a multi-node call.
1079 return self._MultiNodeCall(node_list, "test_delay", [duration])
1081 def call_file_storage_dir_create(self, node, file_storage_dir):
1082 """Create the given file storage directory.
1084 This is a single-node call.
1087 return self._SingleNodeCall(node, "file_storage_dir_create",
1090 def call_file_storage_dir_remove(self, node, file_storage_dir):
1091 """Remove the given file storage directory.
1093 This is a single-node call.
1096 return self._SingleNodeCall(node, "file_storage_dir_remove",
1099 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1100 new_file_storage_dir):
1101 """Rename file storage directory.
1103 This is a single-node call.
1106 return self._SingleNodeCall(node, "file_storage_dir_rename",
1107 [old_file_storage_dir, new_file_storage_dir])
1110 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1111 """Update job queue.
1113 This is a multi-node call.
1116 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1117 [file_name, cls._Compress(content)],
1118 address_list=address_list)
1121 def call_jobqueue_purge(cls, node):
1124 This is a single-node call.
1127 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1130 def call_jobqueue_rename(cls, node_list, address_list, rename):
1131 """Rename a job queue file.
1133 This is a multi-node call.
1136 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1137 address_list=address_list)
1140 def call_jobqueue_set_drain(cls, node_list, drain_flag):
1141 """Set the drain flag on the queue.
1143 This is a multi-node call.
1145 @type node_list: list
1146 @param node_list: the list of nodes to query
1147 @type drain_flag: bool
1148 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1151 return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1154 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1155 """Validate the hypervisor params.
1157 This is a multi-node call.
1159 @type node_list: list
1160 @param node_list: the list of nodes to query
1161 @type hvname: string
1162 @param hvname: the hypervisor name
1163 @type hvparams: dict
1164 @param hvparams: the hypervisor parameters to be validated
1167 cluster = self._cfg.GetClusterInfo()
1168 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1169 return self._MultiNodeCall(node_list, "hypervisor_validate_params",