4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
46 import ganeti.http.client
49 # Module level variable
54 """Initializes the module-global HTTP client manager.
56 Must be called before using any RPC function.
61 assert not _http_manager, "RPC module initialized more than once"
63 _http_manager = http.client.HttpClientManager()
67 """Stops the module-global HTTP client manager.
69 Must be called before quitting the program.
75 _http_manager.Shutdown()
79 class RpcResult(object):
82 This class holds an RPC result. It is needed since in multi-node
83 calls we can't raise an exception just because one one out of many
84 failed, and therefore we use this class to encapsulate the result.
86 @ivar data: the data payload, for successfull results, or None
88 @ivar failed: whether the operation failed at RPC level (not
89 application level on the remote node)
90 @ivar call: the name of the RPC call
91 @ivar node: the name of the node to which we made the call
92 @ivar offline: whether the operation failed because the node was
93 offline, as opposed to actual failure; offline=True will always
94 imply failed=True, in order to allow simpler checking if
95 the user doesn't care about the exact failure mode
98 def __init__(self, data=None, failed=False, offline=False,
99 call=None, node=None):
101 self.offline = offline
106 self.error = "Node is marked offline"
107 self.data = self.payload = None
110 self.data = self.payload = None
114 if isinstance(data, (tuple, list)) and len(data) == 2:
115 self.payload = data[1]
120 """If the result has failed, raise an OpExecError.
122 This is used so that LU code doesn't have to check for each
123 result, but instead can call this function.
127 raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
128 (self.call, self.node, self.error))
130 def RemoteFailMsg(self):
131 """Check if the remote procedure failed.
133 This is valid only for RPC calls which return result of the form
134 (status, data | error_msg).
136 @return: empty string for succcess, otherwise an error message
140 """Helper to ensure we return a 'True' value for error."""
144 return "No error information"
147 return _EnsureErr(self.error)
148 if not isinstance(self.data, (tuple, list)):
149 return "Invalid result type (%s)" % type(self.data)
150 if len(self.data) != 2:
151 return "Invalid result length (%d), expected 2" % len(self.data)
153 return _EnsureErr(self.data[1])
160 This class, given a (remote) method name, a list of parameters and a
161 list of nodes, will contact (in parallel) all nodes, and return a
162 dict of results (key: node name, value: result).
164 One current bug is that generic failure is still signalled by
165 'False' result, which is not good. This overloading of values can
169 def __init__(self, procedure, body, port):
170 self.procedure = procedure
176 http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
177 ssl_cert_path=constants.SSL_CERT_FILE)
179 def ConnectList(self, node_list, address_list=None):
180 """Add a list of nodes to the target nodes.
182 @type node_list: list
183 @param node_list: the list of node names to connect
184 @type address_list: list or None
185 @keyword address_list: either None or a list with node addresses,
186 which must have the same length as the node list
189 if address_list is None:
190 address_list = [None for _ in node_list]
192 assert len(node_list) == len(address_list), \
193 "Name and address lists should have the same length"
194 for node, address in zip(node_list, address_list):
195 self.ConnectNode(node, address)
197 def ConnectNode(self, name, address=None):
198 """Add a node to the target list.
201 @param name: the node name
203 @keyword address: the node address, if known
210 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
211 "/%s" % self.procedure,
213 ssl_params=self._ssl_params,
214 ssl_verify_peer=True)
216 def GetResults(self):
217 """Call nodes and return results.
220 @return: List of RPC results
223 assert _http_manager, "RPC module not intialized"
225 _http_manager.ExecRequests(self.nc.values())
229 for name, req in self.nc.iteritems():
230 if req.success and req.resp_status_code == http.HTTP_OK:
231 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
232 node=name, call=self.procedure)
235 # TODO: Better error reporting
241 logging.error("RPC error in %s from node %s: %s",
242 self.procedure, name, msg)
243 results[name] = RpcResult(data=msg, failed=True, node=name,
249 class RpcRunner(object):
250 """RPC runner class"""
252 def __init__(self, cfg):
253 """Initialized the rpc runner.
255 @type cfg: C{config.ConfigWriter}
256 @param cfg: the configuration object that will be used to get data
261 self.port = utils.GetNodeDaemonPort()
263 def _InstDict(self, instance, hvp=None, bep=None):
264 """Convert the given instance to a dict.
266 This is done via the instance's ToDict() method and additionally
267 we fill the hvparams with the cluster defaults.
269 @type instance: L{objects.Instance}
270 @param instance: an Instance object
271 @type hvp: dict or None
272 @param hvp: a dictionary with overriden hypervisor parameters
273 @type bep: dict or None
274 @param bep: a dictionary with overriden backend parameters
276 @return: the instance dict, with the hvparams filled with the
280 idict = instance.ToDict()
281 cluster = self._cfg.GetClusterInfo()
282 idict["hvparams"] = cluster.FillHV(instance)
284 idict["hvparams"].update(hvp)
285 idict["beparams"] = cluster.FillBE(instance)
287 idict["beparams"].update(bep)
290 def _ConnectList(self, client, node_list, call):
291 """Helper for computing node addresses.
293 @type client: L{Client}
294 @param client: a C{Client} instance
295 @type node_list: list
296 @param node_list: the node list we should connect
298 @param call: the name of the remote procedure call, for filling in
299 correctly any eventual offline nodes' results
302 all_nodes = self._cfg.GetAllNodesInfo()
306 for node in node_list:
307 if node in all_nodes:
308 if all_nodes[node].offline:
309 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
311 val = all_nodes[node].primary_ip
314 addr_list.append(val)
315 name_list.append(node)
317 client.ConnectList(name_list, address_list=addr_list)
320 def _ConnectNode(self, client, node, call):
321 """Helper for computing one node's address.
323 @type client: L{Client}
324 @param client: a C{Client} instance
326 @param node: the node we should connect
328 @param call: the name of the remote procedure call, for filling in
329 correctly any eventual offline nodes' results
332 node_info = self._cfg.GetNodeInfo(node)
333 if node_info is not None:
334 if node_info.offline:
335 return RpcResult(node=node, offline=True, call=call)
336 addr = node_info.primary_ip
339 client.ConnectNode(node, address=addr)
341 def _MultiNodeCall(self, node_list, procedure, args):
342 """Helper for making a multi-node call
345 body = serializer.DumpJson(args, indent=False)
346 c = Client(procedure, body, self.port)
347 skip_dict = self._ConnectList(c, node_list, procedure)
348 skip_dict.update(c.GetResults())
352 def _StaticMultiNodeCall(cls, node_list, procedure, args,
354 """Helper for making a multi-node static call
357 body = serializer.DumpJson(args, indent=False)
358 c = Client(procedure, body, utils.GetNodeDaemonPort())
359 c.ConnectList(node_list, address_list=address_list)
360 return c.GetResults()
362 def _SingleNodeCall(self, node, procedure, args):
363 """Helper for making a single-node call
366 body = serializer.DumpJson(args, indent=False)
367 c = Client(procedure, body, self.port)
368 result = self._ConnectNode(c, node, procedure)
370 # we did connect, node is not offline
371 result = c.GetResults()[node]
375 def _StaticSingleNodeCall(cls, node, procedure, args):
376 """Helper for making a single-node static call
379 body = serializer.DumpJson(args, indent=False)
380 c = Client(procedure, body, utils.GetNodeDaemonPort())
382 return c.GetResults()[node]
386 """Compresses a string for transport over RPC.
388 Small amounts of data are not compressed.
393 @return: Encoded data to send
396 # Small amounts of data are not compressed
398 return (constants.RPC_ENCODING_NONE, data)
400 # Compress with zlib and encode in base64
401 return (constants.RPC_ENCODING_ZLIB_BASE64,
402 base64.b64encode(zlib.compress(data, 3)))
408 def call_volume_list(self, node_list, vg_name):
409 """Gets the logical volumes present in a given volume group.
411 This is a multi-node call.
414 return self._MultiNodeCall(node_list, "volume_list", [vg_name])
416 def call_vg_list(self, node_list):
417 """Gets the volume group list.
419 This is a multi-node call.
422 return self._MultiNodeCall(node_list, "vg_list", [])
424 def call_bridges_exist(self, node, bridges_list):
425 """Checks if a node has all the bridges given.
427 This method checks if all bridges given in the bridges_list are
428 present on the remote node, so that an instance that uses interfaces
429 on those bridges can be started.
431 This is a single-node call.
434 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
436 def call_instance_start(self, node, instance, hvp, bep):
437 """Starts an instance.
439 This is a single-node call.
442 idict = self._InstDict(instance, hvp=hvp, bep=bep)
443 return self._SingleNodeCall(node, "instance_start", [idict])
445 def call_instance_shutdown(self, node, instance):
446 """Stops an instance.
448 This is a single-node call.
451 return self._SingleNodeCall(node, "instance_shutdown",
452 [self._InstDict(instance)])
454 def call_migration_info(self, node, instance):
455 """Gather the information necessary to prepare an instance migration.
457 This is a single-node call.
460 @param node: the node on which the instance is currently running
461 @type instance: C{objects.Instance}
462 @param instance: the instance definition
465 return self._SingleNodeCall(node, "migration_info",
466 [self._InstDict(instance)])
468 def call_accept_instance(self, node, instance, info, target):
469 """Prepare a node to accept an instance.
471 This is a single-node call.
474 @param node: the target node for the migration
475 @type instance: C{objects.Instance}
476 @param instance: the instance definition
477 @type info: opaque/hypervisor specific (string/data)
478 @param info: result for the call_migration_info call
480 @param target: target hostname (usually ip address) (on the node itself)
483 return self._SingleNodeCall(node, "accept_instance",
484 [self._InstDict(instance), info, target])
486 def call_finalize_migration(self, node, instance, info, success):
487 """Finalize any target-node migration specific operation.
489 This is called both in case of a successful migration and in case of error
490 (in which case it should abort the migration).
492 This is a single-node call.
495 @param node: the target node for the migration
496 @type instance: C{objects.Instance}
497 @param instance: the instance definition
498 @type info: opaque/hypervisor specific (string/data)
499 @param info: result for the call_migration_info call
500 @type success: boolean
501 @param success: whether the migration was a success or a failure
504 return self._SingleNodeCall(node, "finalize_migration",
505 [self._InstDict(instance), info, success])
507 def call_instance_migrate(self, node, instance, target, live):
508 """Migrate an instance.
510 This is a single-node call.
513 @param node: the node on which the instance is currently running
514 @type instance: C{objects.Instance}
515 @param instance: the instance definition
517 @param target: the target node name
519 @param live: whether the migration should be done live or not (the
520 interpretation of this parameter is left to the hypervisor)
523 return self._SingleNodeCall(node, "instance_migrate",
524 [self._InstDict(instance), target, live])
526 def call_instance_reboot(self, node, instance, reboot_type):
527 """Reboots an instance.
529 This is a single-node call.
532 return self._SingleNodeCall(node, "instance_reboot",
533 [self._InstDict(instance), reboot_type])
535 def call_instance_os_add(self, node, inst, reinstall):
536 """Installs an OS on the given instance.
538 This is a single-node call.
541 return self._SingleNodeCall(node, "instance_os_add",
542 [self._InstDict(inst), reinstall])
544 def call_instance_run_rename(self, node, inst, old_name):
545 """Run the OS rename script for an instance.
547 This is a single-node call.
550 return self._SingleNodeCall(node, "instance_run_rename",
551 [self._InstDict(inst), old_name])
553 def call_instance_info(self, node, instance, hname):
554 """Returns information about a single instance.
556 This is a single-node call.
559 @param node: the list of nodes to query
560 @type instance: string
561 @param instance: the instance name
563 @param hname: the hypervisor type of the instance
566 return self._SingleNodeCall(node, "instance_info", [instance, hname])
568 def call_instance_migratable(self, node, instance):
569 """Checks whether the given instance can be migrated.
571 This is a single-node call.
573 @param node: the node to query
574 @type instance: L{objects.Instance}
575 @param instance: the instance to check
579 return self._SingleNodeCall(node, "instance_migratable",
580 [self._InstDict(instance)])
582 def call_all_instances_info(self, node_list, hypervisor_list):
583 """Returns information about all instances on the given nodes.
585 This is a multi-node call.
587 @type node_list: list
588 @param node_list: the list of nodes to query
589 @type hypervisor_list: list
590 @param hypervisor_list: the hypervisors to query for instances
593 return self._MultiNodeCall(node_list, "all_instances_info",
596 def call_instance_list(self, node_list, hypervisor_list):
597 """Returns the list of running instances on a given node.
599 This is a multi-node call.
601 @type node_list: list
602 @param node_list: the list of nodes to query
603 @type hypervisor_list: list
604 @param hypervisor_list: the hypervisors to query for instances
607 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
609 def call_node_tcp_ping(self, node, source, target, port, timeout,
611 """Do a TcpPing on the remote node
613 This is a single-node call.
616 return self._SingleNodeCall(node, "node_tcp_ping",
617 [source, target, port, timeout,
620 def call_node_has_ip_address(self, node, address):
621 """Checks if a node has the given IP address.
623 This is a single-node call.
626 return self._SingleNodeCall(node, "node_has_ip_address", [address])
628 def call_node_info(self, node_list, vg_name, hypervisor_type):
629 """Return node information.
631 This will return memory information and volume group size and free
634 This is a multi-node call.
636 @type node_list: list
637 @param node_list: the list of nodes to query
638 @type vg_name: C{string}
639 @param vg_name: the name of the volume group to ask for disk space
641 @type hypervisor_type: C{str}
642 @param hypervisor_type: the name of the hypervisor to ask for
646 retux = self._MultiNodeCall(node_list, "node_info",
647 [vg_name, hypervisor_type])
649 for result in retux.itervalues():
650 if result.failed or not isinstance(result.data, dict):
655 log_name = "call_node_info"
657 utils.CheckDict(result.data, {
658 'memory_total' : '-',
661 'vg_size' : 'node_unreachable',
666 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
667 """Add a node to the cluster.
669 This is a single-node call.
672 return self._SingleNodeCall(node, "node_add",
673 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
675 def call_node_verify(self, node_list, checkdict, cluster_name):
676 """Request verification of given parameters.
678 This is a multi-node call.
681 return self._MultiNodeCall(node_list, "node_verify",
682 [checkdict, cluster_name])
685 def call_node_start_master(cls, node, start_daemons):
686 """Tells a node to activate itself as a master.
688 This is a single-node call.
691 return cls._StaticSingleNodeCall(node, "node_start_master",
695 def call_node_stop_master(cls, node, stop_daemons):
696 """Tells a node to demote itself from master status.
698 This is a single-node call.
701 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
704 def call_master_info(cls, node_list):
705 """Query master info.
707 This is a multi-node call.
710 # TODO: should this method query down nodes?
711 return cls._StaticMultiNodeCall(node_list, "master_info", [])
713 def call_version(self, node_list):
714 """Query node version.
716 This is a multi-node call.
719 return self._MultiNodeCall(node_list, "version", [])
721 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
722 """Request creation of a given block device.
724 This is a single-node call.
727 return self._SingleNodeCall(node, "blockdev_create",
728 [bdev.ToDict(), size, owner, on_primary, info])
730 def call_blockdev_remove(self, node, bdev):
731 """Request removal of a given block device.
733 This is a single-node call.
736 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
738 def call_blockdev_rename(self, node, devlist):
739 """Request rename of the given block devices.
741 This is a single-node call.
744 return self._SingleNodeCall(node, "blockdev_rename",
745 [(d.ToDict(), uid) for d, uid in devlist])
747 def call_blockdev_assemble(self, node, disk, owner, on_primary):
748 """Request assembling of a given block device.
750 This is a single-node call.
753 return self._SingleNodeCall(node, "blockdev_assemble",
754 [disk.ToDict(), owner, on_primary])
756 def call_blockdev_shutdown(self, node, disk):
757 """Request shutdown of a given block device.
759 This is a single-node call.
762 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
764 def call_blockdev_addchildren(self, node, bdev, ndevs):
765 """Request adding a list of children to a (mirroring) device.
767 This is a single-node call.
770 return self._SingleNodeCall(node, "blockdev_addchildren",
772 [disk.ToDict() for disk in ndevs]])
774 def call_blockdev_removechildren(self, node, bdev, ndevs):
775 """Request removing a list of children from a (mirroring) device.
777 This is a single-node call.
780 return self._SingleNodeCall(node, "blockdev_removechildren",
782 [disk.ToDict() for disk in ndevs]])
784 def call_blockdev_getmirrorstatus(self, node, disks):
785 """Request status of a (mirroring) device.
787 This is a single-node call.
790 return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
791 [dsk.ToDict() for dsk in disks])
793 def call_blockdev_find(self, node, disk):
794 """Request identification of a given block device.
796 This is a single-node call.
799 return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
801 def call_blockdev_close(self, node, instance_name, disks):
802 """Closes the given block devices.
804 This is a single-node call.
807 params = [instance_name, [cf.ToDict() for cf in disks]]
808 return self._SingleNodeCall(node, "blockdev_close", params)
810 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
811 """Disconnects the network of the given drbd devices.
813 This is a multi-node call.
816 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
817 [nodes_ip, [cf.ToDict() for cf in disks]])
819 def call_drbd_attach_net(self, node_list, nodes_ip,
820 disks, instance_name, multimaster):
821 """Disconnects the given drbd devices.
823 This is a multi-node call.
826 return self._MultiNodeCall(node_list, "drbd_attach_net",
827 [nodes_ip, [cf.ToDict() for cf in disks],
828 instance_name, multimaster])
830 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
831 """Waits for the synchronization of drbd devices is complete.
833 This is a multi-node call.
836 return self._MultiNodeCall(node_list, "drbd_wait_sync",
837 [nodes_ip, [cf.ToDict() for cf in disks]])
840 def call_upload_file(cls, node_list, file_name, address_list=None):
843 The node will refuse the operation in case the file is not on the
846 This is a multi-node call.
848 @type node_list: list
849 @param node_list: the list of node names to upload to
851 @param file_name: the filename to upload
852 @type address_list: list or None
853 @keyword address_list: an optional list of node addresses, in order
854 to optimize the RPC speed
857 file_contents = utils.ReadFile(file_name)
858 data = cls._Compress(file_contents)
859 st = os.stat(file_name)
860 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
861 st.st_atime, st.st_mtime]
862 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
863 address_list=address_list)
866 def call_write_ssconf_files(cls, node_list, values):
867 """Write ssconf files.
869 This is a multi-node call.
872 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
874 def call_os_diagnose(self, node_list):
875 """Request a diagnose of OS definitions.
877 This is a multi-node call.
880 result = self._MultiNodeCall(node_list, "os_diagnose", [])
882 for node_result in result.values():
883 if not node_result.failed and node_result.data:
884 node_result.data = [objects.OS.FromDict(oss)
885 for oss in node_result.data]
888 def call_os_get(self, node, name):
889 """Returns an OS definition.
891 This is a single-node call.
894 result = self._SingleNodeCall(node, "os_get", [name])
895 if not result.failed and isinstance(result.data, dict):
896 result.data = objects.OS.FromDict(result.data)
899 def call_hooks_runner(self, node_list, hpath, phase, env):
900 """Call the hooks runner.
903 - op: the OpCode instance
904 - env: a dictionary with the environment
906 This is a multi-node call.
909 params = [hpath, phase, env]
910 return self._MultiNodeCall(node_list, "hooks_runner", params)
912 def call_iallocator_runner(self, node, name, idata):
913 """Call an iallocator on a remote node
916 - name: the iallocator name
917 - input: the json-encoded input string
919 This is a single-node call.
922 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
924 def call_blockdev_grow(self, node, cf_bdev, amount):
925 """Request a snapshot of the given block device.
927 This is a single-node call.
930 return self._SingleNodeCall(node, "blockdev_grow",
931 [cf_bdev.ToDict(), amount])
933 def call_blockdev_snapshot(self, node, cf_bdev):
934 """Request a snapshot of the given block device.
936 This is a single-node call.
939 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
941 def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
943 """Request the export of a given snapshot.
945 This is a single-node call.
948 return self._SingleNodeCall(node, "snapshot_export",
949 [snap_bdev.ToDict(), dest_node,
950 self._InstDict(instance), cluster_name, idx])
952 def call_finalize_export(self, node, instance, snap_disks):
953 """Request the completion of an export operation.
955 This writes the export config file, etc.
957 This is a single-node call.
961 for disk in snap_disks:
962 flat_disks.append(disk.ToDict())
964 return self._SingleNodeCall(node, "finalize_export",
965 [self._InstDict(instance), flat_disks])
967 def call_export_info(self, node, path):
968 """Queries the export information in a given path.
970 This is a single-node call.
973 result = self._SingleNodeCall(node, "export_info", [path])
974 if not result.failed and result.data:
975 result.data = objects.SerializableConfigParser.Loads(str(result.data))
978 def call_instance_os_import(self, node, inst, src_node, src_images,
980 """Request the import of a backup into an instance.
982 This is a single-node call.
985 return self._SingleNodeCall(node, "instance_os_import",
986 [self._InstDict(inst), src_node, src_images,
989 def call_export_list(self, node_list):
990 """Gets the stored exports list.
992 This is a multi-node call.
995 return self._MultiNodeCall(node_list, "export_list", [])
997 def call_export_remove(self, node, export):
998 """Requests removal of a given export.
1000 This is a single-node call.
1003 return self._SingleNodeCall(node, "export_remove", [export])
1006 def call_node_leave_cluster(cls, node):
1007 """Requests a node to clean the cluster information it has.
1009 This will remove the configuration information from the ganeti data
1012 This is a single-node call.
1015 return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1017 def call_node_volumes(self, node_list):
1018 """Gets all volumes on node(s).
1020 This is a multi-node call.
1023 return self._MultiNodeCall(node_list, "node_volumes", [])
1025 def call_node_demote_from_mc(self, node):
1026 """Demote a node from the master candidate role.
1028 This is a single-node call.
1031 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1034 def call_node_powercycle(self, node, hypervisor):
1035 """Tries to powercycle a node.
1037 This is a single-node call.
1040 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1043 def call_test_delay(self, node_list, duration):
1044 """Sleep for a fixed time on given node(s).
1046 This is a multi-node call.
1049 return self._MultiNodeCall(node_list, "test_delay", [duration])
1051 def call_file_storage_dir_create(self, node, file_storage_dir):
1052 """Create the given file storage directory.
1054 This is a single-node call.
1057 return self._SingleNodeCall(node, "file_storage_dir_create",
1060 def call_file_storage_dir_remove(self, node, file_storage_dir):
1061 """Remove the given file storage directory.
1063 This is a single-node call.
1066 return self._SingleNodeCall(node, "file_storage_dir_remove",
1069 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1070 new_file_storage_dir):
1071 """Rename file storage directory.
1073 This is a single-node call.
1076 return self._SingleNodeCall(node, "file_storage_dir_rename",
1077 [old_file_storage_dir, new_file_storage_dir])
1080 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1081 """Update job queue.
1083 This is a multi-node call.
1086 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1087 [file_name, cls._Compress(content)],
1088 address_list=address_list)
1091 def call_jobqueue_purge(cls, node):
1094 This is a single-node call.
1097 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1100 def call_jobqueue_rename(cls, node_list, address_list, rename):
1101 """Rename a job queue file.
1103 This is a multi-node call.
1106 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1107 address_list=address_list)
1110 def call_jobqueue_set_drain(cls, node_list, drain_flag):
1111 """Set the drain flag on the queue.
1113 This is a multi-node call.
1115 @type node_list: list
1116 @param node_list: the list of nodes to query
1117 @type drain_flag: bool
1118 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1121 return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1124 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1125 """Validate the hypervisor params.
1127 This is a multi-node call.
1129 @type node_list: list
1130 @param node_list: the list of nodes to query
1131 @type hvname: string
1132 @param hvname: the hypervisor name
1133 @type hvparams: dict
1134 @param hvparams: the hypervisor parameters to be validated
1137 cluster = self._cfg.GetClusterInfo()
1138 hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1139 return self._MultiNodeCall(node_list, "hypervisor_validate_params",