4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
45 import ganeti.http.client
48 # Module level variable
53 """Initializes the module-global HTTP client manager.
55 Must be called before using any RPC function.
60 assert not _http_manager, "RPC module initialized more than once"
62 _http_manager = http.client.HttpClientManager()
66 """Stops the module-global HTTP client manager.
68 Must be called before quitting the program.
74 _http_manager.Shutdown()
78 class RpcResult(object):
81 This class holds an RPC result. It is needed since in multi-node
82 calls we can't raise an exception just because one one out of many
83 failed, and therefore we use this class to encapsulate the result.
85 @ivar data: the data payload, for successful results, or None
87 @ivar failed: whether the operation failed at transport level (not
88 application level on the remote node)
89 @ivar call: the name of the RPC call
90 @ivar node: the name of the node to which we made the call
91 @ivar offline: whether the operation failed because the node was
92 offline, as opposed to actual failure; offline=True will always
93 imply failed=True, in order to allow simpler checking if
94 the user doesn't care about the exact failure mode
95 @ivar fail_msg: the error message if the call failed
98 def __init__(self, data=None, failed=False, offline=False,
99 call=None, node=None):
101 self.offline = offline
106 self.fail_msg = "Node is marked offline"
107 self.data = self.payload = None
109 self.fail_msg = self._EnsureErr(data)
110 self.data = self.payload = None
113 if not isinstance(self.data, (tuple, list)):
114 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
117 self.fail_msg = ("RPC layer error: invalid result length (%d), "
118 "expected 2" % len(self.data))
119 elif not self.data[0]:
120 self.fail_msg = self._EnsureErr(self.data[1])
124 self.payload = data[1]
128 """Helper to ensure we return a 'True' value for error."""
132 return "No error information"
134 def Raise(self, msg, prereq=False):
135 """If the result has failed, raise an OpExecError.
137 This is used so that LU code doesn't have to check for each
138 result, but instead can call this function.
141 if not self.fail_msg:
144 if not msg: # one could pass None for default message
145 msg = ("Call '%s' to node '%s' has failed: %s" %
146 (self.call, self.node, self.fail_msg))
148 msg = "%s: %s" % (msg, self.fail_msg)
150 ec = errors.OpPrereqError
152 ec = errors.OpExecError
155 def RemoteFailMsg(self):
156 """Check if the remote procedure failed.
158 @return: the fail_msg attribute
167 This class, given a (remote) method name, a list of parameters and a
168 list of nodes, will contact (in parallel) all nodes, and return a
169 dict of results (key: node name, value: result).
171 One current bug is that generic failure is still signaled by
172 'False' result, which is not good. This overloading of values can
176 def __init__(self, procedure, body, port):
177 self.procedure = procedure
183 http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
184 ssl_cert_path=constants.SSL_CERT_FILE)
186 def ConnectList(self, node_list, address_list=None):
187 """Add a list of nodes to the target nodes.
189 @type node_list: list
190 @param node_list: the list of node names to connect
191 @type address_list: list or None
192 @keyword address_list: either None or a list with node addresses,
193 which must have the same length as the node list
196 if address_list is None:
197 address_list = [None for _ in node_list]
199 assert len(node_list) == len(address_list), \
200 "Name and address lists should have the same length"
201 for node, address in zip(node_list, address_list):
202 self.ConnectNode(node, address)
204 def ConnectNode(self, name, address=None):
205 """Add a node to the target list.
208 @param name: the node name
210 @keyword address: the node address, if known
217 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
218 "/%s" % self.procedure,
220 ssl_params=self._ssl_params,
221 ssl_verify_peer=True)
223 def GetResults(self):
224 """Call nodes and return results.
227 @return: List of RPC results
230 assert _http_manager, "RPC module not initialized"
232 _http_manager.ExecRequests(self.nc.values())
236 for name, req in self.nc.iteritems():
237 if req.success and req.resp_status_code == http.HTTP_OK:
238 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
239 node=name, call=self.procedure)
242 # TODO: Better error reporting
248 logging.error("RPC error in %s from node %s: %s",
249 self.procedure, name, msg)
250 results[name] = RpcResult(data=msg, failed=True, node=name,
256 class RpcRunner(object):
257 """RPC runner class"""
259 def __init__(self, cfg):
260 """Initialized the rpc runner.
262 @type cfg: C{config.ConfigWriter}
263 @param cfg: the configuration object that will be used to get data
268 self.port = utils.GetDaemonPort(constants.NODED)
270 def _InstDict(self, instance, hvp=None, bep=None):
271 """Convert the given instance to a dict.
273 This is done via the instance's ToDict() method and additionally
274 we fill the hvparams with the cluster defaults.
276 @type instance: L{objects.Instance}
277 @param instance: an Instance object
278 @type hvp: dict or None
279 @param hvp: a dictionary with overridden hypervisor parameters
280 @type bep: dict or None
281 @param bep: a dictionary with overridden backend parameters
283 @return: the instance dict, with the hvparams filled with the
287 idict = instance.ToDict()
288 cluster = self._cfg.GetClusterInfo()
289 idict["hvparams"] = cluster.FillHV(instance)
291 idict["hvparams"].update(hvp)
292 idict["beparams"] = cluster.FillBE(instance)
294 idict["beparams"].update(bep)
295 for nic in idict["nics"]:
296 nic['nicparams'] = objects.FillDict(
297 cluster.nicparams[constants.PP_DEFAULT],
301 def _ConnectList(self, client, node_list, call):
302 """Helper for computing node addresses.
304 @type client: L{ganeti.rpc.Client}
305 @param client: a C{Client} instance
306 @type node_list: list
307 @param node_list: the node list we should connect
309 @param call: the name of the remote procedure call, for filling in
310 correctly any eventual offline nodes' results
313 all_nodes = self._cfg.GetAllNodesInfo()
317 for node in node_list:
318 if node in all_nodes:
319 if all_nodes[node].offline:
320 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
322 val = all_nodes[node].primary_ip
325 addr_list.append(val)
326 name_list.append(node)
328 client.ConnectList(name_list, address_list=addr_list)
331 def _ConnectNode(self, client, node, call):
332 """Helper for computing one node's address.
334 @type client: L{ganeti.rpc.Client}
335 @param client: a C{Client} instance
337 @param node: the node we should connect
339 @param call: the name of the remote procedure call, for filling in
340 correctly any eventual offline nodes' results
343 node_info = self._cfg.GetNodeInfo(node)
344 if node_info is not None:
345 if node_info.offline:
346 return RpcResult(node=node, offline=True, call=call)
347 addr = node_info.primary_ip
350 client.ConnectNode(node, address=addr)
352 def _MultiNodeCall(self, node_list, procedure, args):
353 """Helper for making a multi-node call
356 body = serializer.DumpJson(args, indent=False)
357 c = Client(procedure, body, self.port)
358 skip_dict = self._ConnectList(c, node_list, procedure)
359 skip_dict.update(c.GetResults())
363 def _StaticMultiNodeCall(cls, node_list, procedure, args,
365 """Helper for making a multi-node static call
368 body = serializer.DumpJson(args, indent=False)
369 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
370 c.ConnectList(node_list, address_list=address_list)
371 return c.GetResults()
373 def _SingleNodeCall(self, node, procedure, args):
374 """Helper for making a single-node call
377 body = serializer.DumpJson(args, indent=False)
378 c = Client(procedure, body, self.port)
379 result = self._ConnectNode(c, node, procedure)
381 # we did connect, node is not offline
382 result = c.GetResults()[node]
386 def _StaticSingleNodeCall(cls, node, procedure, args):
387 """Helper for making a single-node static call
390 body = serializer.DumpJson(args, indent=False)
391 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
393 return c.GetResults()[node]
397 """Compresses a string for transport over RPC.
399 Small amounts of data are not compressed.
404 @return: Encoded data to send
407 # Small amounts of data are not compressed
409 return (constants.RPC_ENCODING_NONE, data)
411 # Compress with zlib and encode in base64
412 return (constants.RPC_ENCODING_ZLIB_BASE64,
413 base64.b64encode(zlib.compress(data, 3)))
419 def call_lv_list(self, node_list, vg_name):
420 """Gets the logical volumes present in a given volume group.
422 This is a multi-node call.
425 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
427 def call_vg_list(self, node_list):
428 """Gets the volume group list.
430 This is a multi-node call.
433 return self._MultiNodeCall(node_list, "vg_list", [])
435 def call_storage_list(self, node_list, su_name, su_args, name, fields):
436 """Get list of storage units..
438 This is a multi-node call.
441 return self._MultiNodeCall(node_list, "storage_list",
442 [su_name, su_args, name, fields])
444 def call_bridges_exist(self, node, bridges_list):
445 """Checks if a node has all the bridges given.
447 This method checks if all bridges given in the bridges_list are
448 present on the remote node, so that an instance that uses interfaces
449 on those bridges can be started.
451 This is a single-node call.
454 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
456 def call_instance_start(self, node, instance, hvp, bep):
457 """Starts an instance.
459 This is a single-node call.
462 idict = self._InstDict(instance, hvp=hvp, bep=bep)
463 return self._SingleNodeCall(node, "instance_start", [idict])
465 def call_instance_shutdown(self, node, instance):
466 """Stops an instance.
468 This is a single-node call.
471 return self._SingleNodeCall(node, "instance_shutdown",
472 [self._InstDict(instance)])
474 def call_migration_info(self, node, instance):
475 """Gather the information necessary to prepare an instance migration.
477 This is a single-node call.
480 @param node: the node on which the instance is currently running
481 @type instance: C{objects.Instance}
482 @param instance: the instance definition
485 return self._SingleNodeCall(node, "migration_info",
486 [self._InstDict(instance)])
488 def call_accept_instance(self, node, instance, info, target):
489 """Prepare a node to accept an instance.
491 This is a single-node call.
494 @param node: the target node for the migration
495 @type instance: C{objects.Instance}
496 @param instance: the instance definition
497 @type info: opaque/hypervisor specific (string/data)
498 @param info: result for the call_migration_info call
500 @param target: target hostname (usually ip address) (on the node itself)
503 return self._SingleNodeCall(node, "accept_instance",
504 [self._InstDict(instance), info, target])
506 def call_finalize_migration(self, node, instance, info, success):
507 """Finalize any target-node migration specific operation.
509 This is called both in case of a successful migration and in case of error
510 (in which case it should abort the migration).
512 This is a single-node call.
515 @param node: the target node for the migration
516 @type instance: C{objects.Instance}
517 @param instance: the instance definition
518 @type info: opaque/hypervisor specific (string/data)
519 @param info: result for the call_migration_info call
520 @type success: boolean
521 @param success: whether the migration was a success or a failure
524 return self._SingleNodeCall(node, "finalize_migration",
525 [self._InstDict(instance), info, success])
527 def call_instance_migrate(self, node, instance, target, live):
528 """Migrate an instance.
530 This is a single-node call.
533 @param node: the node on which the instance is currently running
534 @type instance: C{objects.Instance}
535 @param instance: the instance definition
537 @param target: the target node name
539 @param live: whether the migration should be done live or not (the
540 interpretation of this parameter is left to the hypervisor)
543 return self._SingleNodeCall(node, "instance_migrate",
544 [self._InstDict(instance), target, live])
546 def call_instance_reboot(self, node, instance, reboot_type):
547 """Reboots an instance.
549 This is a single-node call.
552 return self._SingleNodeCall(node, "instance_reboot",
553 [self._InstDict(instance), reboot_type])
555 def call_instance_os_add(self, node, inst, reinstall):
556 """Installs an OS on the given instance.
558 This is a single-node call.
561 return self._SingleNodeCall(node, "instance_os_add",
562 [self._InstDict(inst), reinstall])
564 def call_instance_run_rename(self, node, inst, old_name):
565 """Run the OS rename script for an instance.
567 This is a single-node call.
570 return self._SingleNodeCall(node, "instance_run_rename",
571 [self._InstDict(inst), old_name])
573 def call_instance_info(self, node, instance, hname):
574 """Returns information about a single instance.
576 This is a single-node call.
579 @param node: the list of nodes to query
580 @type instance: string
581 @param instance: the instance name
583 @param hname: the hypervisor type of the instance
586 return self._SingleNodeCall(node, "instance_info", [instance, hname])
588 def call_instance_migratable(self, node, instance):
589 """Checks whether the given instance can be migrated.
591 This is a single-node call.
593 @param node: the node to query
594 @type instance: L{objects.Instance}
595 @param instance: the instance to check
599 return self._SingleNodeCall(node, "instance_migratable",
600 [self._InstDict(instance)])
602 def call_all_instances_info(self, node_list, hypervisor_list):
603 """Returns information about all instances on the given nodes.
605 This is a multi-node call.
607 @type node_list: list
608 @param node_list: the list of nodes to query
609 @type hypervisor_list: list
610 @param hypervisor_list: the hypervisors to query for instances
613 return self._MultiNodeCall(node_list, "all_instances_info",
616 def call_instance_list(self, node_list, hypervisor_list):
617 """Returns the list of running instances on a given node.
619 This is a multi-node call.
621 @type node_list: list
622 @param node_list: the list of nodes to query
623 @type hypervisor_list: list
624 @param hypervisor_list: the hypervisors to query for instances
627 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
629 def call_node_tcp_ping(self, node, source, target, port, timeout,
631 """Do a TcpPing on the remote node
633 This is a single-node call.
636 return self._SingleNodeCall(node, "node_tcp_ping",
637 [source, target, port, timeout,
640 def call_node_has_ip_address(self, node, address):
641 """Checks if a node has the given IP address.
643 This is a single-node call.
646 return self._SingleNodeCall(node, "node_has_ip_address", [address])
648 def call_node_info(self, node_list, vg_name, hypervisor_type):
649 """Return node information.
651 This will return memory information and volume group size and free
654 This is a multi-node call.
656 @type node_list: list
657 @param node_list: the list of nodes to query
658 @type vg_name: C{string}
659 @param vg_name: the name of the volume group to ask for disk space
661 @type hypervisor_type: C{str}
662 @param hypervisor_type: the name of the hypervisor to ask for
666 return self._MultiNodeCall(node_list, "node_info",
667 [vg_name, hypervisor_type])
669 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
670 """Add a node to the cluster.
672 This is a single-node call.
675 return self._SingleNodeCall(node, "node_add",
676 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
678 def call_node_verify(self, node_list, checkdict, cluster_name):
679 """Request verification of given parameters.
681 This is a multi-node call.
684 return self._MultiNodeCall(node_list, "node_verify",
685 [checkdict, cluster_name])
688 def call_node_start_master(cls, node, start_daemons, no_voting):
689 """Tells a node to activate itself as a master.
691 This is a single-node call.
694 return cls._StaticSingleNodeCall(node, "node_start_master",
695 [start_daemons, no_voting])
698 def call_node_stop_master(cls, node, stop_daemons):
699 """Tells a node to demote itself from master status.
701 This is a single-node call.
704 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
707 def call_master_info(cls, node_list):
708 """Query master info.
710 This is a multi-node call.
713 # TODO: should this method query down nodes?
714 return cls._StaticMultiNodeCall(node_list, "master_info", [])
716 def call_version(self, node_list):
717 """Query node version.
719 This is a multi-node call.
722 return self._MultiNodeCall(node_list, "version", [])
724 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
725 """Request creation of a given block device.
727 This is a single-node call.
730 return self._SingleNodeCall(node, "blockdev_create",
731 [bdev.ToDict(), size, owner, on_primary, info])
733 def call_blockdev_remove(self, node, bdev):
734 """Request removal of a given block device.
736 This is a single-node call.
739 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
741 def call_blockdev_rename(self, node, devlist):
742 """Request rename of the given block devices.
744 This is a single-node call.
747 return self._SingleNodeCall(node, "blockdev_rename",
748 [(d.ToDict(), uid) for d, uid in devlist])
750 def call_blockdev_assemble(self, node, disk, owner, on_primary):
751 """Request assembling of a given block device.
753 This is a single-node call.
756 return self._SingleNodeCall(node, "blockdev_assemble",
757 [disk.ToDict(), owner, on_primary])
759 def call_blockdev_shutdown(self, node, disk):
760 """Request shutdown of a given block device.
762 This is a single-node call.
765 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
767 def call_blockdev_addchildren(self, node, bdev, ndevs):
768 """Request adding a list of children to a (mirroring) device.
770 This is a single-node call.
773 return self._SingleNodeCall(node, "blockdev_addchildren",
775 [disk.ToDict() for disk in ndevs]])
777 def call_blockdev_removechildren(self, node, bdev, ndevs):
778 """Request removing a list of children from a (mirroring) device.
780 This is a single-node call.
783 return self._SingleNodeCall(node, "blockdev_removechildren",
785 [disk.ToDict() for disk in ndevs]])
787 def call_blockdev_getmirrorstatus(self, node, disks):
788 """Request status of a (mirroring) device.
790 This is a single-node call.
793 return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
794 [dsk.ToDict() for dsk in disks])
796 def call_blockdev_find(self, node, disk):
797 """Request identification of a given block device.
799 This is a single-node call.
802 return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
804 def call_blockdev_close(self, node, instance_name, disks):
805 """Closes the given block devices.
807 This is a single-node call.
810 params = [instance_name, [cf.ToDict() for cf in disks]]
811 return self._SingleNodeCall(node, "blockdev_close", params)
813 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
814 """Disconnects the network of the given drbd devices.
816 This is a multi-node call.
819 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
820 [nodes_ip, [cf.ToDict() for cf in disks]])
822 def call_drbd_attach_net(self, node_list, nodes_ip,
823 disks, instance_name, multimaster):
824 """Disconnects the given drbd devices.
826 This is a multi-node call.
829 return self._MultiNodeCall(node_list, "drbd_attach_net",
830 [nodes_ip, [cf.ToDict() for cf in disks],
831 instance_name, multimaster])
833 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
834 """Waits for the synchronization of drbd devices is complete.
836 This is a multi-node call.
839 return self._MultiNodeCall(node_list, "drbd_wait_sync",
840 [nodes_ip, [cf.ToDict() for cf in disks]])
843 def call_upload_file(cls, node_list, file_name, address_list=None):
846 The node will refuse the operation in case the file is not on the
849 This is a multi-node call.
851 @type node_list: list
852 @param node_list: the list of node names to upload to
854 @param file_name: the filename to upload
855 @type address_list: list or None
856 @keyword address_list: an optional list of node addresses, in order
857 to optimize the RPC speed
860 file_contents = utils.ReadFile(file_name)
861 data = cls._Compress(file_contents)
862 st = os.stat(file_name)
863 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
864 st.st_atime, st.st_mtime]
865 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
866 address_list=address_list)
869 def call_write_ssconf_files(cls, node_list, values):
870 """Write ssconf files.
872 This is a multi-node call.
875 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
877 def call_os_diagnose(self, node_list):
878 """Request a diagnose of OS definitions.
880 This is a multi-node call.
883 return self._MultiNodeCall(node_list, "os_diagnose", [])
885 def call_os_get(self, node, name):
886 """Returns an OS definition.
888 This is a single-node call.
891 result = self._SingleNodeCall(node, "os_get", [name])
892 if not result.failed and isinstance(result.data, dict):
893 result.data = objects.OS.FromDict(result.data)
896 def call_hooks_runner(self, node_list, hpath, phase, env):
897 """Call the hooks runner.
900 - op: the OpCode instance
901 - env: a dictionary with the environment
903 This is a multi-node call.
906 params = [hpath, phase, env]
907 return self._MultiNodeCall(node_list, "hooks_runner", params)
909 def call_iallocator_runner(self, node, name, idata):
910 """Call an iallocator on a remote node
913 - name: the iallocator name
914 - input: the json-encoded input string
916 This is a single-node call.
919 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
921 def call_blockdev_grow(self, node, cf_bdev, amount):
922 """Request a snapshot of the given block device.
924 This is a single-node call.
927 return self._SingleNodeCall(node, "blockdev_grow",
928 [cf_bdev.ToDict(), amount])
930 def call_blockdev_snapshot(self, node, cf_bdev):
931 """Request a snapshot of the given block device.
933 This is a single-node call.
936 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
938 def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
940 """Request the export of a given snapshot.
942 This is a single-node call.
945 return self._SingleNodeCall(node, "snapshot_export",
946 [snap_bdev.ToDict(), dest_node,
947 self._InstDict(instance), cluster_name, idx])
949 def call_finalize_export(self, node, instance, snap_disks):
950 """Request the completion of an export operation.
952 This writes the export config file, etc.
954 This is a single-node call.
958 for disk in snap_disks:
959 if isinstance(disk, bool):
960 flat_disks.append(disk)
962 flat_disks.append(disk.ToDict())
964 return self._SingleNodeCall(node, "finalize_export",
965 [self._InstDict(instance), flat_disks])
967 def call_export_info(self, node, path):
968 """Queries the export information in a given path.
970 This is a single-node call.
973 return self._SingleNodeCall(node, "export_info", [path])
975 def call_instance_os_import(self, node, inst, src_node, src_images,
977 """Request the import of a backup into an instance.
979 This is a single-node call.
982 return self._SingleNodeCall(node, "instance_os_import",
983 [self._InstDict(inst), src_node, src_images,
986 def call_export_list(self, node_list):
987 """Gets the stored exports list.
989 This is a multi-node call.
992 return self._MultiNodeCall(node_list, "export_list", [])
994 def call_export_remove(self, node, export):
995 """Requests removal of a given export.
997 This is a single-node call.
1000 return self._SingleNodeCall(node, "export_remove", [export])
1003 def call_node_leave_cluster(cls, node):
1004 """Requests a node to clean the cluster information it has.
1006 This will remove the configuration information from the ganeti data
1009 This is a single-node call.
1012 return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1014 def call_node_volumes(self, node_list):
1015 """Gets all volumes on node(s).
1017 This is a multi-node call.
1020 return self._MultiNodeCall(node_list, "node_volumes", [])
1022 def call_node_demote_from_mc(self, node):
1023 """Demote a node from the master candidate role.
1025 This is a single-node call.
1028 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1031 def call_node_powercycle(self, node, hypervisor):
1032 """Tries to powercycle a node.
1034 This is a single-node call.
1037 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1040 def call_test_delay(self, node_list, duration):
1041 """Sleep for a fixed time on given node(s).
1043 This is a multi-node call.
1046 return self._MultiNodeCall(node_list, "test_delay", [duration])
1048 def call_file_storage_dir_create(self, node, file_storage_dir):
1049 """Create the given file storage directory.
1051 This is a single-node call.
1054 return self._SingleNodeCall(node, "file_storage_dir_create",
1057 def call_file_storage_dir_remove(self, node, file_storage_dir):
1058 """Remove the given file storage directory.
1060 This is a single-node call.
1063 return self._SingleNodeCall(node, "file_storage_dir_remove",
1066 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1067 new_file_storage_dir):
1068 """Rename file storage directory.
1070 This is a single-node call.
1073 return self._SingleNodeCall(node, "file_storage_dir_rename",
1074 [old_file_storage_dir, new_file_storage_dir])
1077 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1078 """Update job queue.
1080 This is a multi-node call.
1083 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1084 [file_name, cls._Compress(content)],
1085 address_list=address_list)
1088 def call_jobqueue_purge(cls, node):
1091 This is a single-node call.
1094 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1097 def call_jobqueue_rename(cls, node_list, address_list, rename):
1098 """Rename a job queue file.
1100 This is a multi-node call.
1103 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1104 address_list=address_list)
1107 def call_jobqueue_set_drain(cls, node_list, drain_flag):
1108 """Set the drain flag on the queue.
1110 This is a multi-node call.
1112 @type node_list: list
1113 @param node_list: the list of nodes to query
1114 @type drain_flag: bool
1115 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1118 return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1121 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1122 """Validate the hypervisor params.
1124 This is a multi-node call.
1126 @type node_list: list
1127 @param node_list: the list of nodes to query
1128 @type hvname: string
1129 @param hvname: the hypervisor name
1130 @type hvparams: dict
1131 @param hvparams: the hypervisor parameters to be validated
1134 cluster = self._cfg.GetClusterInfo()
1135 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1136 return self._MultiNodeCall(node_list, "hypervisor_validate_params",