4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
45 import ganeti.http.client
48 # Module level variable
53 """Initializes the module-global HTTP client manager.
55 Must be called before using any RPC function.
60 assert not _http_manager, "RPC module initialized more than once"
62 _http_manager = http.client.HttpClientManager()
66 """Stops the module-global HTTP client manager.
68 Must be called before quitting the program.
74 _http_manager.Shutdown()
78 class RpcResult(object):
81 This class holds an RPC result. It is needed since in multi-node
82 calls we can't raise an exception just because one one out of many
83 failed, and therefore we use this class to encapsulate the result.
85 @ivar data: the data payload, for successful results, or None
86 @ivar call: the name of the RPC call
87 @ivar node: the name of the node to which we made the call
88 @ivar offline: whether the operation failed because the node was
89 offline, as opposed to actual failure; offline=True will always
90 imply failed=True, in order to allow simpler checking if
91 the user doesn't care about the exact failure mode
92 @ivar fail_msg: the error message if the call failed
95 def __init__(self, data=None, failed=False, offline=False,
96 call=None, node=None):
97 self.offline = offline
101 self.fail_msg = "Node is marked offline"
102 self.data = self.payload = None
104 self.fail_msg = self._EnsureErr(data)
105 self.data = self.payload = None
108 if not isinstance(self.data, (tuple, list)):
109 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
112 self.fail_msg = ("RPC layer error: invalid result length (%d), "
113 "expected 2" % len(self.data))
114 elif not self.data[0]:
115 self.fail_msg = self._EnsureErr(self.data[1])
119 self.payload = data[1]
123 """Helper to ensure we return a 'True' value for error."""
127 return "No error information"
129 def Raise(self, msg, prereq=False):
130 """If the result has failed, raise an OpExecError.
132 This is used so that LU code doesn't have to check for each
133 result, but instead can call this function.
136 if not self.fail_msg:
139 if not msg: # one could pass None for default message
140 msg = ("Call '%s' to node '%s' has failed: %s" %
141 (self.call, self.node, self.fail_msg))
143 msg = "%s: %s" % (msg, self.fail_msg)
145 ec = errors.OpPrereqError
147 ec = errors.OpExecError
154 This class, given a (remote) method name, a list of parameters and a
155 list of nodes, will contact (in parallel) all nodes, and return a
156 dict of results (key: node name, value: result).
158 One current bug is that generic failure is still signaled by
159 'False' result, which is not good. This overloading of values can
163 def __init__(self, procedure, body, port):
164 self.procedure = procedure
170 http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
171 ssl_cert_path=constants.SSL_CERT_FILE)
173 def ConnectList(self, node_list, address_list=None):
174 """Add a list of nodes to the target nodes.
176 @type node_list: list
177 @param node_list: the list of node names to connect
178 @type address_list: list or None
179 @keyword address_list: either None or a list with node addresses,
180 which must have the same length as the node list
183 if address_list is None:
184 address_list = [None for _ in node_list]
186 assert len(node_list) == len(address_list), \
187 "Name and address lists should have the same length"
188 for node, address in zip(node_list, address_list):
189 self.ConnectNode(node, address)
191 def ConnectNode(self, name, address=None):
192 """Add a node to the target list.
195 @param name: the node name
197 @keyword address: the node address, if known
204 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
205 "/%s" % self.procedure,
207 ssl_params=self._ssl_params,
208 ssl_verify_peer=True)
210 def GetResults(self):
211 """Call nodes and return results.
214 @return: List of RPC results
217 assert _http_manager, "RPC module not initialized"
219 _http_manager.ExecRequests(self.nc.values())
223 for name, req in self.nc.iteritems():
224 if req.success and req.resp_status_code == http.HTTP_OK:
225 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
226 node=name, call=self.procedure)
229 # TODO: Better error reporting
235 logging.error("RPC error in %s from node %s: %s",
236 self.procedure, name, msg)
237 results[name] = RpcResult(data=msg, failed=True, node=name,
243 class RpcRunner(object):
244 """RPC runner class"""
246 def __init__(self, cfg):
247 """Initialized the rpc runner.
249 @type cfg: C{config.ConfigWriter}
250 @param cfg: the configuration object that will be used to get data
255 self.port = utils.GetDaemonPort(constants.NODED)
257 def _InstDict(self, instance, hvp=None, bep=None):
258 """Convert the given instance to a dict.
260 This is done via the instance's ToDict() method and additionally
261 we fill the hvparams with the cluster defaults.
263 @type instance: L{objects.Instance}
264 @param instance: an Instance object
265 @type hvp: dict or None
266 @param hvp: a dictionary with overridden hypervisor parameters
267 @type bep: dict or None
268 @param bep: a dictionary with overridden backend parameters
270 @return: the instance dict, with the hvparams filled with the
274 idict = instance.ToDict()
275 cluster = self._cfg.GetClusterInfo()
276 idict["hvparams"] = cluster.FillHV(instance)
278 idict["hvparams"].update(hvp)
279 idict["beparams"] = cluster.FillBE(instance)
281 idict["beparams"].update(bep)
282 for nic in idict["nics"]:
283 nic['nicparams'] = objects.FillDict(
284 cluster.nicparams[constants.PP_DEFAULT],
288 def _ConnectList(self, client, node_list, call):
289 """Helper for computing node addresses.
291 @type client: L{ganeti.rpc.Client}
292 @param client: a C{Client} instance
293 @type node_list: list
294 @param node_list: the node list we should connect
296 @param call: the name of the remote procedure call, for filling in
297 correctly any eventual offline nodes' results
300 all_nodes = self._cfg.GetAllNodesInfo()
304 for node in node_list:
305 if node in all_nodes:
306 if all_nodes[node].offline:
307 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
309 val = all_nodes[node].primary_ip
312 addr_list.append(val)
313 name_list.append(node)
315 client.ConnectList(name_list, address_list=addr_list)
318 def _ConnectNode(self, client, node, call):
319 """Helper for computing one node's address.
321 @type client: L{ganeti.rpc.Client}
322 @param client: a C{Client} instance
324 @param node: the node we should connect
326 @param call: the name of the remote procedure call, for filling in
327 correctly any eventual offline nodes' results
330 node_info = self._cfg.GetNodeInfo(node)
331 if node_info is not None:
332 if node_info.offline:
333 return RpcResult(node=node, offline=True, call=call)
334 addr = node_info.primary_ip
337 client.ConnectNode(node, address=addr)
339 def _MultiNodeCall(self, node_list, procedure, args):
340 """Helper for making a multi-node call
343 body = serializer.DumpJson(args, indent=False)
344 c = Client(procedure, body, self.port)
345 skip_dict = self._ConnectList(c, node_list, procedure)
346 skip_dict.update(c.GetResults())
350 def _StaticMultiNodeCall(cls, node_list, procedure, args,
352 """Helper for making a multi-node static call
355 body = serializer.DumpJson(args, indent=False)
356 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
357 c.ConnectList(node_list, address_list=address_list)
358 return c.GetResults()
360 def _SingleNodeCall(self, node, procedure, args):
361 """Helper for making a single-node call
364 body = serializer.DumpJson(args, indent=False)
365 c = Client(procedure, body, self.port)
366 result = self._ConnectNode(c, node, procedure)
368 # we did connect, node is not offline
369 result = c.GetResults()[node]
373 def _StaticSingleNodeCall(cls, node, procedure, args):
374 """Helper for making a single-node static call
377 body = serializer.DumpJson(args, indent=False)
378 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
380 return c.GetResults()[node]
384 """Compresses a string for transport over RPC.
386 Small amounts of data are not compressed.
391 @return: Encoded data to send
394 # Small amounts of data are not compressed
396 return (constants.RPC_ENCODING_NONE, data)
398 # Compress with zlib and encode in base64
399 return (constants.RPC_ENCODING_ZLIB_BASE64,
400 base64.b64encode(zlib.compress(data, 3)))
406 def call_lv_list(self, node_list, vg_name):
407 """Gets the logical volumes present in a given volume group.
409 This is a multi-node call.
412 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
414 def call_vg_list(self, node_list):
415 """Gets the volume group list.
417 This is a multi-node call.
420 return self._MultiNodeCall(node_list, "vg_list", [])
422 def call_storage_list(self, node_list, su_name, su_args, name, fields):
423 """Get list of storage units.
425 This is a multi-node call.
428 return self._MultiNodeCall(node_list, "storage_list",
429 [su_name, su_args, name, fields])
431 def call_storage_modify(self, node, su_name, su_args, name, changes):
432 """Modify a storage unit.
434 This is a single-node call.
437 return self._SingleNodeCall(node, "storage_modify",
438 [su_name, su_args, name, changes])
440 def call_storage_execute(self, node, su_name, su_args, name, op):
441 """Executes an operation on a storage unit.
443 This is a single-node call.
446 return self._SingleNodeCall(node, "storage_execute",
447 [su_name, su_args, name, op])
449 def call_bridges_exist(self, node, bridges_list):
450 """Checks if a node has all the bridges given.
452 This method checks if all bridges given in the bridges_list are
453 present on the remote node, so that an instance that uses interfaces
454 on those bridges can be started.
456 This is a single-node call.
459 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
461 def call_instance_start(self, node, instance, hvp, bep):
462 """Starts an instance.
464 This is a single-node call.
467 idict = self._InstDict(instance, hvp=hvp, bep=bep)
468 return self._SingleNodeCall(node, "instance_start", [idict])
470 def call_instance_shutdown(self, node, instance, timeout):
471 """Stops an instance.
473 This is a single-node call.
476 return self._SingleNodeCall(node, "instance_shutdown",
477 [self._InstDict(instance), timeout])
479 def call_migration_info(self, node, instance):
480 """Gather the information necessary to prepare an instance migration.
482 This is a single-node call.
485 @param node: the node on which the instance is currently running
486 @type instance: C{objects.Instance}
487 @param instance: the instance definition
490 return self._SingleNodeCall(node, "migration_info",
491 [self._InstDict(instance)])
493 def call_accept_instance(self, node, instance, info, target):
494 """Prepare a node to accept an instance.
496 This is a single-node call.
499 @param node: the target node for the migration
500 @type instance: C{objects.Instance}
501 @param instance: the instance definition
502 @type info: opaque/hypervisor specific (string/data)
503 @param info: result for the call_migration_info call
505 @param target: target hostname (usually ip address) (on the node itself)
508 return self._SingleNodeCall(node, "accept_instance",
509 [self._InstDict(instance), info, target])
511 def call_finalize_migration(self, node, instance, info, success):
512 """Finalize any target-node migration specific operation.
514 This is called both in case of a successful migration and in case of error
515 (in which case it should abort the migration).
517 This is a single-node call.
520 @param node: the target node for the migration
521 @type instance: C{objects.Instance}
522 @param instance: the instance definition
523 @type info: opaque/hypervisor specific (string/data)
524 @param info: result for the call_migration_info call
525 @type success: boolean
526 @param success: whether the migration was a success or a failure
529 return self._SingleNodeCall(node, "finalize_migration",
530 [self._InstDict(instance), info, success])
532 def call_instance_migrate(self, node, instance, target, live):
533 """Migrate an instance.
535 This is a single-node call.
538 @param node: the node on which the instance is currently running
539 @type instance: C{objects.Instance}
540 @param instance: the instance definition
542 @param target: the target node name
544 @param live: whether the migration should be done live or not (the
545 interpretation of this parameter is left to the hypervisor)
548 return self._SingleNodeCall(node, "instance_migrate",
549 [self._InstDict(instance), target, live])
551 def call_instance_reboot(self, node, instance, reboot_type):
552 """Reboots an instance.
554 This is a single-node call.
557 return self._SingleNodeCall(node, "instance_reboot",
558 [self._InstDict(instance), reboot_type])
560 def call_instance_os_add(self, node, inst, reinstall):
561 """Installs an OS on the given instance.
563 This is a single-node call.
566 return self._SingleNodeCall(node, "instance_os_add",
567 [self._InstDict(inst), reinstall])
569 def call_instance_run_rename(self, node, inst, old_name):
570 """Run the OS rename script for an instance.
572 This is a single-node call.
575 return self._SingleNodeCall(node, "instance_run_rename",
576 [self._InstDict(inst), old_name])
578 def call_instance_info(self, node, instance, hname):
579 """Returns information about a single instance.
581 This is a single-node call.
584 @param node: the list of nodes to query
585 @type instance: string
586 @param instance: the instance name
588 @param hname: the hypervisor type of the instance
591 return self._SingleNodeCall(node, "instance_info", [instance, hname])
593 def call_instance_migratable(self, node, instance):
594 """Checks whether the given instance can be migrated.
596 This is a single-node call.
598 @param node: the node to query
599 @type instance: L{objects.Instance}
600 @param instance: the instance to check
604 return self._SingleNodeCall(node, "instance_migratable",
605 [self._InstDict(instance)])
607 def call_all_instances_info(self, node_list, hypervisor_list):
608 """Returns information about all instances on the given nodes.
610 This is a multi-node call.
612 @type node_list: list
613 @param node_list: the list of nodes to query
614 @type hypervisor_list: list
615 @param hypervisor_list: the hypervisors to query for instances
618 return self._MultiNodeCall(node_list, "all_instances_info",
621 def call_instance_list(self, node_list, hypervisor_list):
622 """Returns the list of running instances on a given node.
624 This is a multi-node call.
626 @type node_list: list
627 @param node_list: the list of nodes to query
628 @type hypervisor_list: list
629 @param hypervisor_list: the hypervisors to query for instances
632 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
634 def call_node_tcp_ping(self, node, source, target, port, timeout,
636 """Do a TcpPing on the remote node
638 This is a single-node call.
641 return self._SingleNodeCall(node, "node_tcp_ping",
642 [source, target, port, timeout,
645 def call_node_has_ip_address(self, node, address):
646 """Checks if a node has the given IP address.
648 This is a single-node call.
651 return self._SingleNodeCall(node, "node_has_ip_address", [address])
653 def call_node_info(self, node_list, vg_name, hypervisor_type):
654 """Return node information.
656 This will return memory information and volume group size and free
659 This is a multi-node call.
661 @type node_list: list
662 @param node_list: the list of nodes to query
663 @type vg_name: C{string}
664 @param vg_name: the name of the volume group to ask for disk space
666 @type hypervisor_type: C{str}
667 @param hypervisor_type: the name of the hypervisor to ask for
671 return self._MultiNodeCall(node_list, "node_info",
672 [vg_name, hypervisor_type])
674 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
675 """Add a node to the cluster.
677 This is a single-node call.
680 return self._SingleNodeCall(node, "node_add",
681 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
683 def call_node_verify(self, node_list, checkdict, cluster_name):
684 """Request verification of given parameters.
686 This is a multi-node call.
689 return self._MultiNodeCall(node_list, "node_verify",
690 [checkdict, cluster_name])
693 def call_node_start_master(cls, node, start_daemons, no_voting):
694 """Tells a node to activate itself as a master.
696 This is a single-node call.
699 return cls._StaticSingleNodeCall(node, "node_start_master",
700 [start_daemons, no_voting])
703 def call_node_stop_master(cls, node, stop_daemons):
704 """Tells a node to demote itself from master status.
706 This is a single-node call.
709 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
712 def call_master_info(cls, node_list):
713 """Query master info.
715 This is a multi-node call.
718 # TODO: should this method query down nodes?
719 return cls._StaticMultiNodeCall(node_list, "master_info", [])
721 def call_version(self, node_list):
722 """Query node version.
724 This is a multi-node call.
727 return self._MultiNodeCall(node_list, "version", [])
729 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
730 """Request creation of a given block device.
732 This is a single-node call.
735 return self._SingleNodeCall(node, "blockdev_create",
736 [bdev.ToDict(), size, owner, on_primary, info])
738 def call_blockdev_remove(self, node, bdev):
739 """Request removal of a given block device.
741 This is a single-node call.
744 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
746 def call_blockdev_rename(self, node, devlist):
747 """Request rename of the given block devices.
749 This is a single-node call.
752 return self._SingleNodeCall(node, "blockdev_rename",
753 [(d.ToDict(), uid) for d, uid in devlist])
755 def call_blockdev_assemble(self, node, disk, owner, on_primary):
756 """Request assembling of a given block device.
758 This is a single-node call.
761 return self._SingleNodeCall(node, "blockdev_assemble",
762 [disk.ToDict(), owner, on_primary])
764 def call_blockdev_shutdown(self, node, disk):
765 """Request shutdown of a given block device.
767 This is a single-node call.
770 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
772 def call_blockdev_addchildren(self, node, bdev, ndevs):
773 """Request adding a list of children to a (mirroring) device.
775 This is a single-node call.
778 return self._SingleNodeCall(node, "blockdev_addchildren",
780 [disk.ToDict() for disk in ndevs]])
782 def call_blockdev_removechildren(self, node, bdev, ndevs):
783 """Request removing a list of children from a (mirroring) device.
785 This is a single-node call.
788 return self._SingleNodeCall(node, "blockdev_removechildren",
790 [disk.ToDict() for disk in ndevs]])
792 def call_blockdev_getmirrorstatus(self, node, disks):
793 """Request status of a (mirroring) device.
795 This is a single-node call.
798 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
799 [dsk.ToDict() for dsk in disks])
800 if not result.fail_msg:
801 result.payload = [objects.BlockDevStatus.FromDict(i)
802 for i in result.payload]
805 def call_blockdev_find(self, node, disk):
806 """Request identification of a given block device.
808 This is a single-node call.
811 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
812 if not result.fail_msg and result.payload is not None:
813 result.payload = objects.BlockDevStatus.FromDict(result.payload)
816 def call_blockdev_close(self, node, instance_name, disks):
817 """Closes the given block devices.
819 This is a single-node call.
822 params = [instance_name, [cf.ToDict() for cf in disks]]
823 return self._SingleNodeCall(node, "blockdev_close", params)
825 def call_blockdev_getsizes(self, node, disks):
826 """Returns the size of the given disks.
828 This is a single-node call.
831 params = [[cf.ToDict() for cf in disks]]
832 return self._SingleNodeCall(node, "blockdev_getsize", params)
834 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
835 """Disconnects the network of the given drbd devices.
837 This is a multi-node call.
840 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
841 [nodes_ip, [cf.ToDict() for cf in disks]])
843 def call_drbd_attach_net(self, node_list, nodes_ip,
844 disks, instance_name, multimaster):
845 """Disconnects the given drbd devices.
847 This is a multi-node call.
850 return self._MultiNodeCall(node_list, "drbd_attach_net",
851 [nodes_ip, [cf.ToDict() for cf in disks],
852 instance_name, multimaster])
854 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
855 """Waits for the synchronization of drbd devices is complete.
857 This is a multi-node call.
860 return self._MultiNodeCall(node_list, "drbd_wait_sync",
861 [nodes_ip, [cf.ToDict() for cf in disks]])
864 def call_upload_file(cls, node_list, file_name, address_list=None):
867 The node will refuse the operation in case the file is not on the
870 This is a multi-node call.
872 @type node_list: list
873 @param node_list: the list of node names to upload to
875 @param file_name: the filename to upload
876 @type address_list: list or None
877 @keyword address_list: an optional list of node addresses, in order
878 to optimize the RPC speed
881 file_contents = utils.ReadFile(file_name)
882 data = cls._Compress(file_contents)
883 st = os.stat(file_name)
884 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
885 st.st_atime, st.st_mtime]
886 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
887 address_list=address_list)
890 def call_write_ssconf_files(cls, node_list, values):
891 """Write ssconf files.
893 This is a multi-node call.
896 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
898 def call_os_diagnose(self, node_list):
899 """Request a diagnose of OS definitions.
901 This is a multi-node call.
904 return self._MultiNodeCall(node_list, "os_diagnose", [])
906 def call_os_get(self, node, name):
907 """Returns an OS definition.
909 This is a single-node call.
912 result = self._SingleNodeCall(node, "os_get", [name])
913 if not result.fail_msg and isinstance(result.payload, dict):
914 result.payload = objects.OS.FromDict(result.payload)
917 def call_hooks_runner(self, node_list, hpath, phase, env):
918 """Call the hooks runner.
921 - op: the OpCode instance
922 - env: a dictionary with the environment
924 This is a multi-node call.
927 params = [hpath, phase, env]
928 return self._MultiNodeCall(node_list, "hooks_runner", params)
930 def call_iallocator_runner(self, node, name, idata):
931 """Call an iallocator on a remote node
934 - name: the iallocator name
935 - input: the json-encoded input string
937 This is a single-node call.
940 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
942 def call_blockdev_grow(self, node, cf_bdev, amount):
943 """Request a snapshot of the given block device.
945 This is a single-node call.
948 return self._SingleNodeCall(node, "blockdev_grow",
949 [cf_bdev.ToDict(), amount])
951 def call_blockdev_export(self, node, cf_bdev,
952 dest_node, dest_path, cluster_name):
953 """Export a given disk to another node.
955 This is a single-node call.
958 return self._SingleNodeCall(node, "blockdev_export",
959 [cf_bdev.ToDict(), dest_node, dest_path,
962 def call_blockdev_snapshot(self, node, cf_bdev):
963 """Request a snapshot of the given block device.
965 This is a single-node call.
968 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
970 def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
972 """Request the export of a given snapshot.
974 This is a single-node call.
977 return self._SingleNodeCall(node, "snapshot_export",
978 [snap_bdev.ToDict(), dest_node,
979 self._InstDict(instance), cluster_name, idx])
981 def call_finalize_export(self, node, instance, snap_disks):
982 """Request the completion of an export operation.
984 This writes the export config file, etc.
986 This is a single-node call.
990 for disk in snap_disks:
991 if isinstance(disk, bool):
992 flat_disks.append(disk)
994 flat_disks.append(disk.ToDict())
996 return self._SingleNodeCall(node, "finalize_export",
997 [self._InstDict(instance), flat_disks])
999 def call_export_info(self, node, path):
1000 """Queries the export information in a given path.
1002 This is a single-node call.
1005 return self._SingleNodeCall(node, "export_info", [path])
1007 def call_instance_os_import(self, node, inst, src_node, src_images,
1009 """Request the import of a backup into an instance.
1011 This is a single-node call.
1014 return self._SingleNodeCall(node, "instance_os_import",
1015 [self._InstDict(inst), src_node, src_images,
1018 def call_export_list(self, node_list):
1019 """Gets the stored exports list.
1021 This is a multi-node call.
1024 return self._MultiNodeCall(node_list, "export_list", [])
1026 def call_export_remove(self, node, export):
1027 """Requests removal of a given export.
1029 This is a single-node call.
1032 return self._SingleNodeCall(node, "export_remove", [export])
1035 def call_node_leave_cluster(cls, node):
1036 """Requests a node to clean the cluster information it has.
1038 This will remove the configuration information from the ganeti data
1041 This is a single-node call.
1044 return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1046 def call_node_volumes(self, node_list):
1047 """Gets all volumes on node(s).
1049 This is a multi-node call.
1052 return self._MultiNodeCall(node_list, "node_volumes", [])
1054 def call_node_demote_from_mc(self, node):
1055 """Demote a node from the master candidate role.
1057 This is a single-node call.
1060 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1063 def call_node_powercycle(self, node, hypervisor):
1064 """Tries to powercycle a node.
1066 This is a single-node call.
1069 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1072 def call_test_delay(self, node_list, duration):
1073 """Sleep for a fixed time on given node(s).
1075 This is a multi-node call.
1078 return self._MultiNodeCall(node_list, "test_delay", [duration])
1080 def call_file_storage_dir_create(self, node, file_storage_dir):
1081 """Create the given file storage directory.
1083 This is a single-node call.
1086 return self._SingleNodeCall(node, "file_storage_dir_create",
1089 def call_file_storage_dir_remove(self, node, file_storage_dir):
1090 """Remove the given file storage directory.
1092 This is a single-node call.
1095 return self._SingleNodeCall(node, "file_storage_dir_remove",
1098 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1099 new_file_storage_dir):
1100 """Rename file storage directory.
1102 This is a single-node call.
1105 return self._SingleNodeCall(node, "file_storage_dir_rename",
1106 [old_file_storage_dir, new_file_storage_dir])
1109 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1110 """Update job queue.
1112 This is a multi-node call.
1115 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1116 [file_name, cls._Compress(content)],
1117 address_list=address_list)
1120 def call_jobqueue_purge(cls, node):
1123 This is a single-node call.
1126 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1129 def call_jobqueue_rename(cls, node_list, address_list, rename):
1130 """Rename a job queue file.
1132 This is a multi-node call.
1135 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1136 address_list=address_list)
1139 def call_jobqueue_set_drain(cls, node_list, drain_flag):
1140 """Set the drain flag on the queue.
1142 This is a multi-node call.
1144 @type node_list: list
1145 @param node_list: the list of nodes to query
1146 @type drain_flag: bool
1147 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1150 return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1153 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1154 """Validate the hypervisor params.
1156 This is a multi-node call.
1158 @type node_list: list
1159 @param node_list: the list of nodes to query
1160 @type hvname: string
1161 @param hvname: the hypervisor name
1162 @type hvparams: dict
1163 @param hvparams: the hypervisor parameters to be validated
1166 cluster = self._cfg.GetClusterInfo()
1167 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1168 return self._MultiNodeCall(node_list, "hypervisor_validate_params",