4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
46 import ganeti.http.client
49 # Module level variable
54 """Initializes the module-global HTTP client manager.
56 Must be called before using any RPC function.
61 assert not _http_manager, "RPC module initialized more than once"
63 _http_manager = http.client.HttpClientManager()
67 """Stops the module-global HTTP client manager.
69 Must be called before quitting the program.
75 _http_manager.Shutdown()
79 class RpcResult(object):
82 This class holds an RPC result. It is needed since in multi-node
83 calls we can't raise an exception just because one one out of many
84 failed, and therefore we use this class to encapsulate the result.
86 @ivar data: the data payload, for successfull results, or None
88 @ivar failed: whether the operation failed at RPC level (not
89 application level on the remote node)
90 @ivar call: the name of the RPC call
91 @ivar node: the name of the node to which we made the call
92 @ivar offline: whether the operation failed because the node was
93 offline, as opposed to actual failure; offline=True will always
94 imply failed=True, in order to allow simpler checking if
95 the user doesn't care about the exact failure mode
98 def __init__(self, data=None, failed=False, offline=False,
99 call=None, node=None):
101 self.offline = offline
106 self.error = "Node is marked offline"
107 self.data = self.payload = None
110 self.data = self.payload = None
114 if isinstance(data, (tuple, list)) and len(data) == 2:
115 self.payload = data[1]
120 """If the result has failed, raise an OpExecError.
122 This is used so that LU code doesn't have to check for each
123 result, but instead can call this function.
127 raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
128 (self.call, self.node, self.error))
130 def RemoteFailMsg(self):
131 """Check if the remote procedure failed.
133 This is valid only for RPC calls which return result of the form
134 (status, data | error_msg).
136 @return: empty string for succcess, otherwise an error message
140 """Helper to ensure we return a 'True' value for error."""
144 return "No error information"
147 return _EnsureErr(self.error)
148 if not isinstance(self.data, (tuple, list)):
149 return "Invalid result type (%s)" % type(self.data)
150 if len(self.data) != 2:
151 return "Invalid result length (%d), expected 2" % len(self.data)
153 return _EnsureErr(self.data[1])
160 This class, given a (remote) method name, a list of parameters and a
161 list of nodes, will contact (in parallel) all nodes, and return a
162 dict of results (key: node name, value: result).
164 One current bug is that generic failure is still signalled by
165 'False' result, which is not good. This overloading of values can
169 def __init__(self, procedure, body, port):
170 self.procedure = procedure
176 http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
177 ssl_cert_path=constants.SSL_CERT_FILE)
179 def ConnectList(self, node_list, address_list=None):
180 """Add a list of nodes to the target nodes.
182 @type node_list: list
183 @param node_list: the list of node names to connect
184 @type address_list: list or None
185 @keyword address_list: either None or a list with node addresses,
186 which must have the same length as the node list
189 if address_list is None:
190 address_list = [None for _ in node_list]
192 assert len(node_list) == len(address_list), \
193 "Name and address lists should have the same length"
194 for node, address in zip(node_list, address_list):
195 self.ConnectNode(node, address)
197 def ConnectNode(self, name, address=None):
198 """Add a node to the target list.
201 @param name: the node name
203 @keyword address: the node address, if known
210 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
211 "/%s" % self.procedure,
213 ssl_params=self._ssl_params,
214 ssl_verify_peer=True)
216 def GetResults(self):
217 """Call nodes and return results.
220 @return: List of RPC results
223 assert _http_manager, "RPC module not intialized"
225 _http_manager.ExecRequests(self.nc.values())
229 for name, req in self.nc.iteritems():
230 if req.success and req.resp_status_code == http.HTTP_OK:
231 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
232 node=name, call=self.procedure)
235 # TODO: Better error reporting
241 logging.error("RPC error in %s from node %s: %s",
242 self.procedure, name, msg)
243 results[name] = RpcResult(data=msg, failed=True, node=name,
249 class RpcRunner(object):
250 """RPC runner class"""
252 def __init__(self, cfg):
253 """Initialized the rpc runner.
255 @type cfg: C{config.ConfigWriter}
256 @param cfg: the configuration object that will be used to get data
261 self.port = utils.GetNodeDaemonPort()
263 def _InstDict(self, instance, hvp=None, bep=None):
264 """Convert the given instance to a dict.
266 This is done via the instance's ToDict() method and additionally
267 we fill the hvparams with the cluster defaults.
269 @type instance: L{objects.Instance}
270 @param instance: an Instance object
271 @type hvp: dict or None
272 @param hvp: a dictionary with overriden hypervisor parameters
273 @type bep: dict or None
274 @param bep: a dictionary with overriden backend parameters
276 @return: the instance dict, with the hvparams filled with the
280 idict = instance.ToDict()
281 cluster = self._cfg.GetClusterInfo()
282 idict["hvparams"] = cluster.FillHV(instance)
284 idict["hvparams"].update(hvp)
285 idict["beparams"] = cluster.FillBE(instance)
287 idict["beparams"].update(bep)
288 for nic in idict["nics"]:
289 nic['nicparams'] = objects.FillDict(
290 cluster.nicparams[constants.PP_DEFAULT],
294 def _ConnectList(self, client, node_list, call):
295 """Helper for computing node addresses.
297 @type client: L{Client}
298 @param client: a C{Client} instance
299 @type node_list: list
300 @param node_list: the node list we should connect
302 @param call: the name of the remote procedure call, for filling in
303 correctly any eventual offline nodes' results
306 all_nodes = self._cfg.GetAllNodesInfo()
310 for node in node_list:
311 if node in all_nodes:
312 if all_nodes[node].offline:
313 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
315 val = all_nodes[node].primary_ip
318 addr_list.append(val)
319 name_list.append(node)
321 client.ConnectList(name_list, address_list=addr_list)
324 def _ConnectNode(self, client, node, call):
325 """Helper for computing one node's address.
327 @type client: L{Client}
328 @param client: a C{Client} instance
330 @param node: the node we should connect
332 @param call: the name of the remote procedure call, for filling in
333 correctly any eventual offline nodes' results
336 node_info = self._cfg.GetNodeInfo(node)
337 if node_info is not None:
338 if node_info.offline:
339 return RpcResult(node=node, offline=True, call=call)
340 addr = node_info.primary_ip
343 client.ConnectNode(node, address=addr)
345 def _MultiNodeCall(self, node_list, procedure, args):
346 """Helper for making a multi-node call
349 body = serializer.DumpJson(args, indent=False)
350 c = Client(procedure, body, self.port)
351 skip_dict = self._ConnectList(c, node_list, procedure)
352 skip_dict.update(c.GetResults())
356 def _StaticMultiNodeCall(cls, node_list, procedure, args,
358 """Helper for making a multi-node static call
361 body = serializer.DumpJson(args, indent=False)
362 c = Client(procedure, body, utils.GetNodeDaemonPort())
363 c.ConnectList(node_list, address_list=address_list)
364 return c.GetResults()
366 def _SingleNodeCall(self, node, procedure, args):
367 """Helper for making a single-node call
370 body = serializer.DumpJson(args, indent=False)
371 c = Client(procedure, body, self.port)
372 result = self._ConnectNode(c, node, procedure)
374 # we did connect, node is not offline
375 result = c.GetResults()[node]
379 def _StaticSingleNodeCall(cls, node, procedure, args):
380 """Helper for making a single-node static call
383 body = serializer.DumpJson(args, indent=False)
384 c = Client(procedure, body, utils.GetNodeDaemonPort())
386 return c.GetResults()[node]
390 """Compresses a string for transport over RPC.
392 Small amounts of data are not compressed.
397 @return: Encoded data to send
400 # Small amounts of data are not compressed
402 return (constants.RPC_ENCODING_NONE, data)
404 # Compress with zlib and encode in base64
405 return (constants.RPC_ENCODING_ZLIB_BASE64,
406 base64.b64encode(zlib.compress(data, 3)))
412 def call_volume_list(self, node_list, vg_name):
413 """Gets the logical volumes present in a given volume group.
415 This is a multi-node call.
418 return self._MultiNodeCall(node_list, "volume_list", [vg_name])
420 def call_vg_list(self, node_list):
421 """Gets the volume group list.
423 This is a multi-node call.
426 return self._MultiNodeCall(node_list, "vg_list", [])
428 def call_bridges_exist(self, node, bridges_list):
429 """Checks if a node has all the bridges given.
431 This method checks if all bridges given in the bridges_list are
432 present on the remote node, so that an instance that uses interfaces
433 on those bridges can be started.
435 This is a single-node call.
438 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
440 def call_instance_start(self, node, instance, hvp, bep):
441 """Starts an instance.
443 This is a single-node call.
446 idict = self._InstDict(instance, hvp=hvp, bep=bep)
447 return self._SingleNodeCall(node, "instance_start", [idict])
449 def call_instance_shutdown(self, node, instance):
450 """Stops an instance.
452 This is a single-node call.
455 return self._SingleNodeCall(node, "instance_shutdown",
456 [self._InstDict(instance)])
458 def call_migration_info(self, node, instance):
459 """Gather the information necessary to prepare an instance migration.
461 This is a single-node call.
464 @param node: the node on which the instance is currently running
465 @type instance: C{objects.Instance}
466 @param instance: the instance definition
469 return self._SingleNodeCall(node, "migration_info",
470 [self._InstDict(instance)])
472 def call_accept_instance(self, node, instance, info, target):
473 """Prepare a node to accept an instance.
475 This is a single-node call.
478 @param node: the target node for the migration
479 @type instance: C{objects.Instance}
480 @param instance: the instance definition
481 @type info: opaque/hypervisor specific (string/data)
482 @param info: result for the call_migration_info call
484 @param target: target hostname (usually ip address) (on the node itself)
487 return self._SingleNodeCall(node, "accept_instance",
488 [self._InstDict(instance), info, target])
490 def call_finalize_migration(self, node, instance, info, success):
491 """Finalize any target-node migration specific operation.
493 This is called both in case of a successful migration and in case of error
494 (in which case it should abort the migration).
496 This is a single-node call.
499 @param node: the target node for the migration
500 @type instance: C{objects.Instance}
501 @param instance: the instance definition
502 @type info: opaque/hypervisor specific (string/data)
503 @param info: result for the call_migration_info call
504 @type success: boolean
505 @param success: whether the migration was a success or a failure
508 return self._SingleNodeCall(node, "finalize_migration",
509 [self._InstDict(instance), info, success])
511 def call_instance_migrate(self, node, instance, target, live):
512 """Migrate an instance.
514 This is a single-node call.
517 @param node: the node on which the instance is currently running
518 @type instance: C{objects.Instance}
519 @param instance: the instance definition
521 @param target: the target node name
523 @param live: whether the migration should be done live or not (the
524 interpretation of this parameter is left to the hypervisor)
527 return self._SingleNodeCall(node, "instance_migrate",
528 [self._InstDict(instance), target, live])
530 def call_instance_reboot(self, node, instance, reboot_type):
531 """Reboots an instance.
533 This is a single-node call.
536 return self._SingleNodeCall(node, "instance_reboot",
537 [self._InstDict(instance), reboot_type])
539 def call_instance_os_add(self, node, inst, reinstall):
540 """Installs an OS on the given instance.
542 This is a single-node call.
545 return self._SingleNodeCall(node, "instance_os_add",
546 [self._InstDict(inst), reinstall])
548 def call_instance_run_rename(self, node, inst, old_name):
549 """Run the OS rename script for an instance.
551 This is a single-node call.
554 return self._SingleNodeCall(node, "instance_run_rename",
555 [self._InstDict(inst), old_name])
557 def call_instance_info(self, node, instance, hname):
558 """Returns information about a single instance.
560 This is a single-node call.
563 @param node: the list of nodes to query
564 @type instance: string
565 @param instance: the instance name
567 @param hname: the hypervisor type of the instance
570 return self._SingleNodeCall(node, "instance_info", [instance, hname])
572 def call_instance_migratable(self, node, instance):
573 """Checks whether the given instance can be migrated.
575 This is a single-node call.
577 @param node: the node to query
578 @type instance: L{objects.Instance}
579 @param instance: the instance to check
583 return self._SingleNodeCall(node, "instance_migratable",
584 [self._InstDict(instance)])
586 def call_all_instances_info(self, node_list, hypervisor_list):
587 """Returns information about all instances on the given nodes.
589 This is a multi-node call.
591 @type node_list: list
592 @param node_list: the list of nodes to query
593 @type hypervisor_list: list
594 @param hypervisor_list: the hypervisors to query for instances
597 return self._MultiNodeCall(node_list, "all_instances_info",
600 def call_instance_list(self, node_list, hypervisor_list):
601 """Returns the list of running instances on a given node.
603 This is a multi-node call.
605 @type node_list: list
606 @param node_list: the list of nodes to query
607 @type hypervisor_list: list
608 @param hypervisor_list: the hypervisors to query for instances
611 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
613 def call_node_tcp_ping(self, node, source, target, port, timeout,
615 """Do a TcpPing on the remote node
617 This is a single-node call.
620 return self._SingleNodeCall(node, "node_tcp_ping",
621 [source, target, port, timeout,
624 def call_node_has_ip_address(self, node, address):
625 """Checks if a node has the given IP address.
627 This is a single-node call.
630 return self._SingleNodeCall(node, "node_has_ip_address", [address])
632 def call_node_info(self, node_list, vg_name, hypervisor_type):
633 """Return node information.
635 This will return memory information and volume group size and free
638 This is a multi-node call.
640 @type node_list: list
641 @param node_list: the list of nodes to query
642 @type vg_name: C{string}
643 @param vg_name: the name of the volume group to ask for disk space
645 @type hypervisor_type: C{str}
646 @param hypervisor_type: the name of the hypervisor to ask for
650 retux = self._MultiNodeCall(node_list, "node_info",
651 [vg_name, hypervisor_type])
653 for result in retux.itervalues():
654 if result.failed or not isinstance(result.data, dict):
659 log_name = "call_node_info"
661 utils.CheckDict(result.data, {
662 'memory_total' : '-',
665 'vg_size' : 'node_unreachable',
670 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
671 """Add a node to the cluster.
673 This is a single-node call.
676 return self._SingleNodeCall(node, "node_add",
677 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
679 def call_node_verify(self, node_list, checkdict, cluster_name):
680 """Request verification of given parameters.
682 This is a multi-node call.
685 return self._MultiNodeCall(node_list, "node_verify",
686 [checkdict, cluster_name])
689 def call_node_start_master(cls, node, start_daemons):
690 """Tells a node to activate itself as a master.
692 This is a single-node call.
695 return cls._StaticSingleNodeCall(node, "node_start_master",
699 def call_node_stop_master(cls, node, stop_daemons):
700 """Tells a node to demote itself from master status.
702 This is a single-node call.
705 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
708 def call_master_info(cls, node_list):
709 """Query master info.
711 This is a multi-node call.
714 # TODO: should this method query down nodes?
715 return cls._StaticMultiNodeCall(node_list, "master_info", [])
717 def call_version(self, node_list):
718 """Query node version.
720 This is a multi-node call.
723 return self._MultiNodeCall(node_list, "version", [])
725 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
726 """Request creation of a given block device.
728 This is a single-node call.
731 return self._SingleNodeCall(node, "blockdev_create",
732 [bdev.ToDict(), size, owner, on_primary, info])
734 def call_blockdev_remove(self, node, bdev):
735 """Request removal of a given block device.
737 This is a single-node call.
740 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
742 def call_blockdev_rename(self, node, devlist):
743 """Request rename of the given block devices.
745 This is a single-node call.
748 return self._SingleNodeCall(node, "blockdev_rename",
749 [(d.ToDict(), uid) for d, uid in devlist])
751 def call_blockdev_assemble(self, node, disk, owner, on_primary):
752 """Request assembling of a given block device.
754 This is a single-node call.
757 return self._SingleNodeCall(node, "blockdev_assemble",
758 [disk.ToDict(), owner, on_primary])
760 def call_blockdev_shutdown(self, node, disk):
761 """Request shutdown of a given block device.
763 This is a single-node call.
766 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
768 def call_blockdev_addchildren(self, node, bdev, ndevs):
769 """Request adding a list of children to a (mirroring) device.
771 This is a single-node call.
774 return self._SingleNodeCall(node, "blockdev_addchildren",
776 [disk.ToDict() for disk in ndevs]])
778 def call_blockdev_removechildren(self, node, bdev, ndevs):
779 """Request removing a list of children from a (mirroring) device.
781 This is a single-node call.
784 return self._SingleNodeCall(node, "blockdev_removechildren",
786 [disk.ToDict() for disk in ndevs]])
788 def call_blockdev_getmirrorstatus(self, node, disks):
789 """Request status of a (mirroring) device.
791 This is a single-node call.
794 return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
795 [dsk.ToDict() for dsk in disks])
797 def call_blockdev_find(self, node, disk):
798 """Request identification of a given block device.
800 This is a single-node call.
803 return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
805 def call_blockdev_close(self, node, instance_name, disks):
806 """Closes the given block devices.
808 This is a single-node call.
811 params = [instance_name, [cf.ToDict() for cf in disks]]
812 return self._SingleNodeCall(node, "blockdev_close", params)
814 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
815 """Disconnects the network of the given drbd devices.
817 This is a multi-node call.
820 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
821 [nodes_ip, [cf.ToDict() for cf in disks]])
823 def call_drbd_attach_net(self, node_list, nodes_ip,
824 disks, instance_name, multimaster):
825 """Disconnects the given drbd devices.
827 This is a multi-node call.
830 return self._MultiNodeCall(node_list, "drbd_attach_net",
831 [nodes_ip, [cf.ToDict() for cf in disks],
832 instance_name, multimaster])
834 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
835 """Waits for the synchronization of drbd devices is complete.
837 This is a multi-node call.
840 return self._MultiNodeCall(node_list, "drbd_wait_sync",
841 [nodes_ip, [cf.ToDict() for cf in disks]])
844 def call_upload_file(cls, node_list, file_name, address_list=None):
847 The node will refuse the operation in case the file is not on the
850 This is a multi-node call.
852 @type node_list: list
853 @param node_list: the list of node names to upload to
855 @param file_name: the filename to upload
856 @type address_list: list or None
857 @keyword address_list: an optional list of node addresses, in order
858 to optimize the RPC speed
861 file_contents = utils.ReadFile(file_name)
862 data = cls._Compress(file_contents)
863 st = os.stat(file_name)
864 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
865 st.st_atime, st.st_mtime]
866 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
867 address_list=address_list)
870 def call_write_ssconf_files(cls, node_list, values):
871 """Write ssconf files.
873 This is a multi-node call.
876 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
878 def call_os_diagnose(self, node_list):
879 """Request a diagnose of OS definitions.
881 This is a multi-node call.
884 result = self._MultiNodeCall(node_list, "os_diagnose", [])
886 for node_result in result.values():
887 if not node_result.failed and node_result.data:
888 node_result.data = [objects.OS.FromDict(oss)
889 for oss in node_result.data]
892 def call_os_get(self, node, name):
893 """Returns an OS definition.
895 This is a single-node call.
898 result = self._SingleNodeCall(node, "os_get", [name])
899 if not result.failed and isinstance(result.data, dict):
900 result.data = objects.OS.FromDict(result.data)
903 def call_hooks_runner(self, node_list, hpath, phase, env):
904 """Call the hooks runner.
907 - op: the OpCode instance
908 - env: a dictionary with the environment
910 This is a multi-node call.
913 params = [hpath, phase, env]
914 return self._MultiNodeCall(node_list, "hooks_runner", params)
916 def call_iallocator_runner(self, node, name, idata):
917 """Call an iallocator on a remote node
920 - name: the iallocator name
921 - input: the json-encoded input string
923 This is a single-node call.
926 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
928 def call_blockdev_grow(self, node, cf_bdev, amount):
929 """Request a snapshot of the given block device.
931 This is a single-node call.
934 return self._SingleNodeCall(node, "blockdev_grow",
935 [cf_bdev.ToDict(), amount])
937 def call_blockdev_snapshot(self, node, cf_bdev):
938 """Request a snapshot of the given block device.
940 This is a single-node call.
943 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
945 def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
947 """Request the export of a given snapshot.
949 This is a single-node call.
952 return self._SingleNodeCall(node, "snapshot_export",
953 [snap_bdev.ToDict(), dest_node,
954 self._InstDict(instance), cluster_name, idx])
956 def call_finalize_export(self, node, instance, snap_disks):
957 """Request the completion of an export operation.
959 This writes the export config file, etc.
961 This is a single-node call.
965 for disk in snap_disks:
966 flat_disks.append(disk.ToDict())
968 return self._SingleNodeCall(node, "finalize_export",
969 [self._InstDict(instance), flat_disks])
971 def call_export_info(self, node, path):
972 """Queries the export information in a given path.
974 This is a single-node call.
977 return self._SingleNodeCall(node, "export_info", [path])
979 def call_instance_os_import(self, node, inst, src_node, src_images,
981 """Request the import of a backup into an instance.
983 This is a single-node call.
986 return self._SingleNodeCall(node, "instance_os_import",
987 [self._InstDict(inst), src_node, src_images,
990 def call_export_list(self, node_list):
991 """Gets the stored exports list.
993 This is a multi-node call.
996 return self._MultiNodeCall(node_list, "export_list", [])
998 def call_export_remove(self, node, export):
999 """Requests removal of a given export.
1001 This is a single-node call.
1004 return self._SingleNodeCall(node, "export_remove", [export])
1007 def call_node_leave_cluster(cls, node):
1008 """Requests a node to clean the cluster information it has.
1010 This will remove the configuration information from the ganeti data
1013 This is a single-node call.
1016 return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1018 def call_node_volumes(self, node_list):
1019 """Gets all volumes on node(s).
1021 This is a multi-node call.
1024 return self._MultiNodeCall(node_list, "node_volumes", [])
1026 def call_node_demote_from_mc(self, node):
1027 """Demote a node from the master candidate role.
1029 This is a single-node call.
1032 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1035 def call_node_powercycle(self, node, hypervisor):
1036 """Tries to powercycle a node.
1038 This is a single-node call.
1041 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1044 def call_test_delay(self, node_list, duration):
1045 """Sleep for a fixed time on given node(s).
1047 This is a multi-node call.
1050 return self._MultiNodeCall(node_list, "test_delay", [duration])
1052 def call_file_storage_dir_create(self, node, file_storage_dir):
1053 """Create the given file storage directory.
1055 This is a single-node call.
1058 return self._SingleNodeCall(node, "file_storage_dir_create",
1061 def call_file_storage_dir_remove(self, node, file_storage_dir):
1062 """Remove the given file storage directory.
1064 This is a single-node call.
1067 return self._SingleNodeCall(node, "file_storage_dir_remove",
1070 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1071 new_file_storage_dir):
1072 """Rename file storage directory.
1074 This is a single-node call.
1077 return self._SingleNodeCall(node, "file_storage_dir_rename",
1078 [old_file_storage_dir, new_file_storage_dir])
1081 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1082 """Update job queue.
1084 This is a multi-node call.
1087 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1088 [file_name, cls._Compress(content)],
1089 address_list=address_list)
1092 def call_jobqueue_purge(cls, node):
1095 This is a single-node call.
1098 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1101 def call_jobqueue_rename(cls, node_list, address_list, rename):
1102 """Rename a job queue file.
1104 This is a multi-node call.
1107 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1108 address_list=address_list)
1111 def call_jobqueue_set_drain(cls, node_list, drain_flag):
1112 """Set the drain flag on the queue.
1114 This is a multi-node call.
1116 @type node_list: list
1117 @param node_list: the list of nodes to query
1118 @type drain_flag: bool
1119 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1122 return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1125 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1126 """Validate the hypervisor params.
1128 This is a multi-node call.
1130 @type node_list: list
1131 @param node_list: the list of nodes to query
1132 @type hvname: string
1133 @param hvname: the hypervisor name
1134 @type hvparams: dict
1135 @param hvparams: the hypervisor parameters to be validated
1138 cluster = self._cfg.GetClusterInfo()
1139 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1140 return self._MultiNodeCall(node_list, "hypervisor_validate_params",