4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
46 import ganeti.http.client
49 # Module level variable
54 """Initializes the module-global HTTP client manager.
56 Must be called before using any RPC function.
61 assert not _http_manager, "RPC module initialized more than once"
63 _http_manager = http.client.HttpClientManager()
67 """Stops the module-global HTTP client manager.
69 Must be called before quitting the program.
75 _http_manager.Shutdown()
79 class RpcResult(object):
82 This class holds an RPC result. It is needed since in multi-node
83 calls we can't raise an exception just because one one out of many
84 failed, and therefore we use this class to encapsulate the result.
86 @ivar data: the data payload, for successfull results, or None
88 @ivar failed: whether the operation failed at RPC level (not
89 application level on the remote node)
90 @ivar call: the name of the RPC call
91 @ivar node: the name of the node to which we made the call
92 @ivar offline: whether the operation failed because the node was
93 offline, as opposed to actual failure; offline=True will always
94 imply failed=True, in order to allow simpler checking if
95 the user doesn't care about the exact failure mode
98 def __init__(self, data=None, failed=False, offline=False,
99 call=None, node=None):
101 self.offline = offline
106 self.error = "Node is marked offline"
116 """If the result has failed, raise an OpExecError.
118 This is used so that LU code doesn't have to check for each
119 result, but instead can call this function.
123 raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
124 (self.call, self.node, self.error))
126 def RemoteFailMsg(self):
127 """Check if the remote procedure failed.
129 This is valid only for RPC calls which return result of the form
130 (status, data | error_msg).
132 @return: empty string for succcess, otherwise an error message
136 """Helper to ensure we return a 'True' value for error."""
140 return "No error information"
143 return _EnsureErr(self.error)
144 if not isinstance(self.data, (tuple, list)):
145 return "Invalid result type (%s)" % type(self.data)
146 if len(self.data) != 2:
147 return "Invalid result length (%d), expected 2" % len(self.data)
149 return _EnsureErr(self.data[1])
156 This class, given a (remote) method name, a list of parameters and a
157 list of nodes, will contact (in parallel) all nodes, and return a
158 dict of results (key: node name, value: result).
160 One current bug is that generic failure is still signalled by
161 'False' result, which is not good. This overloading of values can
165 def __init__(self, procedure, body, port):
166 self.procedure = procedure
172 http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
173 ssl_cert_path=constants.SSL_CERT_FILE)
175 def ConnectList(self, node_list, address_list=None):
176 """Add a list of nodes to the target nodes.
178 @type node_list: list
179 @param node_list: the list of node names to connect
180 @type address_list: list or None
181 @keyword address_list: either None or a list with node addresses,
182 which must have the same length as the node list
185 if address_list is None:
186 address_list = [None for _ in node_list]
188 assert len(node_list) == len(address_list), \
189 "Name and address lists should have the same length"
190 for node, address in zip(node_list, address_list):
191 self.ConnectNode(node, address)
193 def ConnectNode(self, name, address=None):
194 """Add a node to the target list.
197 @param name: the node name
199 @keyword address: the node address, if known
206 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
207 "/%s" % self.procedure,
209 ssl_params=self._ssl_params,
210 ssl_verify_peer=True)
212 def GetResults(self):
213 """Call nodes and return results.
216 @returns: List of RPC results
219 assert _http_manager, "RPC module not intialized"
221 _http_manager.ExecRequests(self.nc.values())
225 for name, req in self.nc.iteritems():
226 if req.success and req.resp_status_code == http.HTTP_OK:
227 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
228 node=name, call=self.procedure)
231 # TODO: Better error reporting
237 logging.error("RPC error in %s from node %s: %s",
238 self.procedure, name, msg)
239 results[name] = RpcResult(data=msg, failed=True, node=name,
245 class RpcRunner(object):
246 """RPC runner class"""
248 def __init__(self, cfg):
249 """Initialized the rpc runner.
251 @type cfg: C{config.ConfigWriter}
252 @param cfg: the configuration object that will be used to get data
257 self.port = utils.GetNodeDaemonPort()
259 def _InstDict(self, instance):
260 """Convert the given instance to a dict.
262 This is done via the instance's ToDict() method and additionally
263 we fill the hvparams with the cluster defaults.
265 @type instance: L{objects.Instance}
266 @param instance: an Instance object
268 @return: the instance dict, with the hvparams filled with the
272 idict = instance.ToDict()
273 cluster = self._cfg.GetClusterInfo()
274 idict["hvparams"] = cluster.FillHV(instance)
275 idict["beparams"] = cluster.FillBE(instance)
278 def _ConnectList(self, client, node_list, call):
279 """Helper for computing node addresses.
281 @type client: L{Client}
282 @param client: a C{Client} instance
283 @type node_list: list
284 @param node_list: the node list we should connect
286 @param call: the name of the remote procedure call, for filling in
287 correctly any eventual offline nodes' results
290 all_nodes = self._cfg.GetAllNodesInfo()
294 for node in node_list:
295 if node in all_nodes:
296 if all_nodes[node].offline:
297 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
299 val = all_nodes[node].primary_ip
302 addr_list.append(val)
303 name_list.append(node)
305 client.ConnectList(name_list, address_list=addr_list)
308 def _ConnectNode(self, client, node, call):
309 """Helper for computing one node's address.
311 @type client: L{Client}
312 @param client: a C{Client} instance
314 @param node: the node we should connect
316 @param call: the name of the remote procedure call, for filling in
317 correctly any eventual offline nodes' results
320 node_info = self._cfg.GetNodeInfo(node)
321 if node_info is not None:
322 if node_info.offline:
323 return RpcResult(node=node, offline=True, call=call)
324 addr = node_info.primary_ip
327 client.ConnectNode(node, address=addr)
329 def _MultiNodeCall(self, node_list, procedure, args):
330 """Helper for making a multi-node call
333 body = serializer.DumpJson(args, indent=False)
334 c = Client(procedure, body, self.port)
335 skip_dict = self._ConnectList(c, node_list, procedure)
336 skip_dict.update(c.GetResults())
340 def _StaticMultiNodeCall(cls, node_list, procedure, args,
342 """Helper for making a multi-node static call
345 body = serializer.DumpJson(args, indent=False)
346 c = Client(procedure, body, utils.GetNodeDaemonPort())
347 c.ConnectList(node_list, address_list=address_list)
348 return c.GetResults()
350 def _SingleNodeCall(self, node, procedure, args):
351 """Helper for making a single-node call
354 body = serializer.DumpJson(args, indent=False)
355 c = Client(procedure, body, self.port)
356 result = self._ConnectNode(c, node, procedure)
358 # we did connect, node is not offline
359 result = c.GetResults()[node]
363 def _StaticSingleNodeCall(cls, node, procedure, args):
364 """Helper for making a single-node static call
367 body = serializer.DumpJson(args, indent=False)
368 c = Client(procedure, body, utils.GetNodeDaemonPort())
370 return c.GetResults()[node]
374 """Compresses a string for transport over RPC.
376 Small amounts of data are not compressed.
381 @return: Encoded data to send
384 # Small amounts of data are not compressed
386 return (constants.RPC_ENCODING_NONE, data)
388 # Compress with zlib and encode in base64
389 return (constants.RPC_ENCODING_ZLIB_BASE64,
390 base64.b64encode(zlib.compress(data, 3)))
396 def call_volume_list(self, node_list, vg_name):
397 """Gets the logical volumes present in a given volume group.
399 This is a multi-node call.
402 return self._MultiNodeCall(node_list, "volume_list", [vg_name])
404 def call_vg_list(self, node_list):
405 """Gets the volume group list.
407 This is a multi-node call.
410 return self._MultiNodeCall(node_list, "vg_list", [])
412 def call_bridges_exist(self, node, bridges_list):
413 """Checks if a node has all the bridges given.
415 This method checks if all bridges given in the bridges_list are
416 present on the remote node, so that an instance that uses interfaces
417 on those bridges can be started.
419 This is a single-node call.
422 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
424 def call_instance_start(self, node, instance, extra_args):
425 """Starts an instance.
427 This is a single-node call.
430 return self._SingleNodeCall(node, "instance_start",
431 [self._InstDict(instance), extra_args])
433 def call_instance_shutdown(self, node, instance):
434 """Stops an instance.
436 This is a single-node call.
439 return self._SingleNodeCall(node, "instance_shutdown",
440 [self._InstDict(instance)])
442 def call_migration_info(self, node, instance):
443 """Gather the information necessary to prepare an instance migration.
445 This is a single-node call.
448 @param node: the node on which the instance is currently running
449 @type instance: C{objects.Instance}
450 @param instance: the instance definition
453 return self._SingleNodeCall(node, "migration_info",
454 [self._InstDict(instance)])
456 def call_accept_instance(self, node, instance, info, target):
457 """Prepare a node to accept an instance.
459 This is a single-node call.
462 @param node: the target node for the migration
463 @type instance: C{objects.Instance}
464 @param instance: the instance definition
465 @type info: opaque/hypervisor specific (string/data)
466 @param info: result for the call_migration_info call
468 @param target: target hostname (usually ip address) (on the node itself)
471 return self._SingleNodeCall(node, "accept_instance",
472 [self._InstDict(instance), info, target])
474 def call_finalize_migration(self, node, instance, info, success):
475 """Finalize any target-node migration specific operation.
477 This is called both in case of a successful migration and in case of error
478 (in which case it should abort the migration).
480 This is a single-node call.
483 @param node: the target node for the migration
484 @type instance: C{objects.Instance}
485 @param instance: the instance definition
486 @type info: opaque/hypervisor specific (string/data)
487 @param info: result for the call_migration_info call
488 @type success: boolean
489 @param success: whether the migration was a success or a failure
492 return self._SingleNodeCall(node, "finalize_migration",
493 [self._InstDict(instance), info, success])
495 def call_instance_migrate(self, node, instance, target, live):
496 """Migrate an instance.
498 This is a single-node call.
501 @param node: the node on which the instance is currently running
502 @type instance: C{objects.Instance}
503 @param instance: the instance definition
505 @param target: the target node name
507 @param live: whether the migration should be done live or not (the
508 interpretation of this parameter is left to the hypervisor)
511 return self._SingleNodeCall(node, "instance_migrate",
512 [self._InstDict(instance), target, live])
514 def call_instance_reboot(self, node, instance, reboot_type, extra_args):
515 """Reboots an instance.
517 This is a single-node call.
520 return self._SingleNodeCall(node, "instance_reboot",
521 [self._InstDict(instance), reboot_type,
524 def call_instance_os_add(self, node, inst):
525 """Installs an OS on the given instance.
527 This is a single-node call.
530 return self._SingleNodeCall(node, "instance_os_add",
531 [self._InstDict(inst)])
533 def call_instance_run_rename(self, node, inst, old_name):
534 """Run the OS rename script for an instance.
536 This is a single-node call.
539 return self._SingleNodeCall(node, "instance_run_rename",
540 [self._InstDict(inst), old_name])
542 def call_instance_info(self, node, instance, hname):
543 """Returns information about a single instance.
545 This is a single-node call.
548 @param node: the list of nodes to query
549 @type instance: string
550 @param instance: the instance name
552 @param hname: the hypervisor type of the instance
555 return self._SingleNodeCall(node, "instance_info", [instance, hname])
557 def call_instance_migratable(self, node, instance):
558 """Checks whether the given instance can be migrated.
560 This is a single-node call.
562 @param node: the node to query
563 @type instance: L{objects.Instance}
564 @param instance: the instance to check
568 return self._SingleNodeCall(node, "instance_migratable",
569 [self._InstDict(instance)])
571 def call_all_instances_info(self, node_list, hypervisor_list):
572 """Returns information about all instances on the given nodes.
574 This is a multi-node call.
576 @type node_list: list
577 @param node_list: the list of nodes to query
578 @type hypervisor_list: list
579 @param hypervisor_list: the hypervisors to query for instances
582 return self._MultiNodeCall(node_list, "all_instances_info",
585 def call_instance_list(self, node_list, hypervisor_list):
586 """Returns the list of running instances on a given node.
588 This is a multi-node call.
590 @type node_list: list
591 @param node_list: the list of nodes to query
592 @type hypervisor_list: list
593 @param hypervisor_list: the hypervisors to query for instances
596 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
598 def call_node_tcp_ping(self, node, source, target, port, timeout,
600 """Do a TcpPing on the remote node
602 This is a single-node call.
605 return self._SingleNodeCall(node, "node_tcp_ping",
606 [source, target, port, timeout,
609 def call_node_has_ip_address(self, node, address):
610 """Checks if a node has the given IP address.
612 This is a single-node call.
615 return self._SingleNodeCall(node, "node_has_ip_address", [address])
617 def call_node_info(self, node_list, vg_name, hypervisor_type):
618 """Return node information.
620 This will return memory information and volume group size and free
623 This is a multi-node call.
625 @type node_list: list
626 @param node_list: the list of nodes to query
627 @type vg_name: C{string}
628 @param vg_name: the name of the volume group to ask for disk space
630 @type hypervisor_type: C{str}
631 @param hypervisor_type: the name of the hypervisor to ask for
635 retux = self._MultiNodeCall(node_list, "node_info",
636 [vg_name, hypervisor_type])
638 for result in retux.itervalues():
639 if result.failed or not isinstance(result.data, dict):
644 log_name = "call_node_info"
646 utils.CheckDict(result.data, {
647 'memory_total' : '-',
650 'vg_size' : 'node_unreachable',
655 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
656 """Add a node to the cluster.
658 This is a single-node call.
661 return self._SingleNodeCall(node, "node_add",
662 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
664 def call_node_verify(self, node_list, checkdict, cluster_name):
665 """Request verification of given parameters.
667 This is a multi-node call.
670 return self._MultiNodeCall(node_list, "node_verify",
671 [checkdict, cluster_name])
674 def call_node_start_master(cls, node, start_daemons):
675 """Tells a node to activate itself as a master.
677 This is a single-node call.
680 return cls._StaticSingleNodeCall(node, "node_start_master",
684 def call_node_stop_master(cls, node, stop_daemons):
685 """Tells a node to demote itself from master status.
687 This is a single-node call.
690 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
693 def call_master_info(cls, node_list):
694 """Query master info.
696 This is a multi-node call.
699 # TODO: should this method query down nodes?
700 return cls._StaticMultiNodeCall(node_list, "master_info", [])
702 def call_version(self, node_list):
703 """Query node version.
705 This is a multi-node call.
708 return self._MultiNodeCall(node_list, "version", [])
710 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
711 """Request creation of a given block device.
713 This is a single-node call.
716 return self._SingleNodeCall(node, "blockdev_create",
717 [bdev.ToDict(), size, owner, on_primary, info])
719 def call_blockdev_remove(self, node, bdev):
720 """Request removal of a given block device.
722 This is a single-node call.
725 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
727 def call_blockdev_rename(self, node, devlist):
728 """Request rename of the given block devices.
730 This is a single-node call.
733 return self._SingleNodeCall(node, "blockdev_rename",
734 [(d.ToDict(), uid) for d, uid in devlist])
736 def call_blockdev_assemble(self, node, disk, owner, on_primary):
737 """Request assembling of a given block device.
739 This is a single-node call.
742 return self._SingleNodeCall(node, "blockdev_assemble",
743 [disk.ToDict(), owner, on_primary])
745 def call_blockdev_shutdown(self, node, disk):
746 """Request shutdown of a given block device.
748 This is a single-node call.
751 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
753 def call_blockdev_addchildren(self, node, bdev, ndevs):
754 """Request adding a list of children to a (mirroring) device.
756 This is a single-node call.
759 return self._SingleNodeCall(node, "blockdev_addchildren",
761 [disk.ToDict() for disk in ndevs]])
763 def call_blockdev_removechildren(self, node, bdev, ndevs):
764 """Request removing a list of children from a (mirroring) device.
766 This is a single-node call.
769 return self._SingleNodeCall(node, "blockdev_removechildren",
771 [disk.ToDict() for disk in ndevs]])
773 def call_blockdev_getmirrorstatus(self, node, disks):
774 """Request status of a (mirroring) device.
776 This is a single-node call.
779 return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
780 [dsk.ToDict() for dsk in disks])
782 def call_blockdev_find(self, node, disk):
783 """Request identification of a given block device.
785 This is a single-node call.
788 return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
790 def call_blockdev_close(self, node, instance_name, disks):
791 """Closes the given block devices.
793 This is a single-node call.
796 params = [instance_name, [cf.ToDict() for cf in disks]]
797 return self._SingleNodeCall(node, "blockdev_close", params)
799 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
800 """Disconnects the network of the given drbd devices.
802 This is a multi-node call.
805 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
806 [nodes_ip, [cf.ToDict() for cf in disks]])
808 def call_drbd_attach_net(self, node_list, nodes_ip,
809 disks, instance_name, multimaster):
810 """Disconnects the given drbd devices.
812 This is a multi-node call.
815 return self._MultiNodeCall(node_list, "drbd_attach_net",
816 [nodes_ip, [cf.ToDict() for cf in disks],
817 instance_name, multimaster])
819 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
820 """Waits for the synchronization of drbd devices is complete.
822 This is a multi-node call.
825 return self._MultiNodeCall(node_list, "drbd_wait_sync",
826 [nodes_ip, [cf.ToDict() for cf in disks]])
829 def call_upload_file(cls, node_list, file_name, address_list=None):
832 The node will refuse the operation in case the file is not on the
835 This is a multi-node call.
837 @type node_list: list
838 @param node_list: the list of node names to upload to
840 @param file_name: the filename to upload
841 @type address_list: list or None
842 @keyword address_list: an optional list of node addresses, in order
843 to optimize the RPC speed
846 file_contents = utils.ReadFile(file_name)
847 data = cls._Compress(file_contents)
848 st = os.stat(file_name)
849 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
850 st.st_atime, st.st_mtime]
851 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
852 address_list=address_list)
855 def call_write_ssconf_files(cls, node_list, values):
856 """Write ssconf files.
858 This is a multi-node call.
861 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
863 def call_os_diagnose(self, node_list):
864 """Request a diagnose of OS definitions.
866 This is a multi-node call.
869 result = self._MultiNodeCall(node_list, "os_diagnose", [])
871 for node_result in result.values():
872 if not node_result.failed and node_result.data:
873 node_result.data = [objects.OS.FromDict(oss)
874 for oss in node_result.data]
877 def call_os_get(self, node, name):
878 """Returns an OS definition.
880 This is a single-node call.
883 result = self._SingleNodeCall(node, "os_get", [name])
884 if not result.failed and isinstance(result.data, dict):
885 result.data = objects.OS.FromDict(result.data)
888 def call_hooks_runner(self, node_list, hpath, phase, env):
889 """Call the hooks runner.
892 - op: the OpCode instance
893 - env: a dictionary with the environment
895 This is a multi-node call.
898 params = [hpath, phase, env]
899 return self._MultiNodeCall(node_list, "hooks_runner", params)
901 def call_iallocator_runner(self, node, name, idata):
902 """Call an iallocator on a remote node
905 - name: the iallocator name
906 - input: the json-encoded input string
908 This is a single-node call.
911 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
913 def call_blockdev_grow(self, node, cf_bdev, amount):
914 """Request a snapshot of the given block device.
916 This is a single-node call.
919 return self._SingleNodeCall(node, "blockdev_grow",
920 [cf_bdev.ToDict(), amount])
922 def call_blockdev_snapshot(self, node, cf_bdev):
923 """Request a snapshot of the given block device.
925 This is a single-node call.
928 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
930 def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
932 """Request the export of a given snapshot.
934 This is a single-node call.
937 return self._SingleNodeCall(node, "snapshot_export",
938 [snap_bdev.ToDict(), dest_node,
939 self._InstDict(instance), cluster_name, idx])
941 def call_finalize_export(self, node, instance, snap_disks):
942 """Request the completion of an export operation.
944 This writes the export config file, etc.
946 This is a single-node call.
950 for disk in snap_disks:
951 flat_disks.append(disk.ToDict())
953 return self._SingleNodeCall(node, "finalize_export",
954 [self._InstDict(instance), flat_disks])
956 def call_export_info(self, node, path):
957 """Queries the export information in a given path.
959 This is a single-node call.
962 result = self._SingleNodeCall(node, "export_info", [path])
963 if not result.failed and result.data:
964 result.data = objects.SerializableConfigParser.Loads(str(result.data))
967 def call_instance_os_import(self, node, inst, src_node, src_images,
969 """Request the import of a backup into an instance.
971 This is a single-node call.
974 return self._SingleNodeCall(node, "instance_os_import",
975 [self._InstDict(inst), src_node, src_images,
978 def call_export_list(self, node_list):
979 """Gets the stored exports list.
981 This is a multi-node call.
984 return self._MultiNodeCall(node_list, "export_list", [])
986 def call_export_remove(self, node, export):
987 """Requests removal of a given export.
989 This is a single-node call.
992 return self._SingleNodeCall(node, "export_remove", [export])
995 def call_node_leave_cluster(cls, node):
996 """Requests a node to clean the cluster information it has.
998 This will remove the configuration information from the ganeti data
1001 This is a single-node call.
1004 return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1006 def call_node_volumes(self, node_list):
1007 """Gets all volumes on node(s).
1009 This is a multi-node call.
1012 return self._MultiNodeCall(node_list, "node_volumes", [])
1014 def call_node_demote_from_mc(self, node):
1015 """Demote a node from the master candidate role.
1017 This is a single-node call.
1020 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1022 def call_test_delay(self, node_list, duration):
1023 """Sleep for a fixed time on given node(s).
1025 This is a multi-node call.
1028 return self._MultiNodeCall(node_list, "test_delay", [duration])
1030 def call_file_storage_dir_create(self, node, file_storage_dir):
1031 """Create the given file storage directory.
1033 This is a single-node call.
1036 return self._SingleNodeCall(node, "file_storage_dir_create",
1039 def call_file_storage_dir_remove(self, node, file_storage_dir):
1040 """Remove the given file storage directory.
1042 This is a single-node call.
1045 return self._SingleNodeCall(node, "file_storage_dir_remove",
1048 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1049 new_file_storage_dir):
1050 """Rename file storage directory.
1052 This is a single-node call.
1055 return self._SingleNodeCall(node, "file_storage_dir_rename",
1056 [old_file_storage_dir, new_file_storage_dir])
1059 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1060 """Update job queue.
1062 This is a multi-node call.
1065 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1066 [file_name, cls._Compress(content)],
1067 address_list=address_list)
1070 def call_jobqueue_purge(cls, node):
1073 This is a single-node call.
1076 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1079 def call_jobqueue_rename(cls, node_list, address_list, rename):
1080 """Rename a job queue file.
1082 This is a multi-node call.
1085 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1086 address_list=address_list)
1089 def call_jobqueue_set_drain(cls, node_list, drain_flag):
1090 """Set the drain flag on the queue.
1092 This is a multi-node call.
1094 @type node_list: list
1095 @param node_list: the list of nodes to query
1096 @type drain_flag: bool
1097 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1100 return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1103 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1104 """Validate the hypervisor params.
1106 This is a multi-node call.
1108 @type node_list: list
1109 @param node_list: the list of nodes to query
1110 @type hvname: string
1111 @param hvname: the hypervisor name
1112 @type hvparams: dict
1113 @param hvparams: the hypervisor parameters to be validated
1116 cluster = self._cfg.GetClusterInfo()
1117 hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1118 return self._MultiNodeCall(node_list, "hypervisor_validate_params",