4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
46 import ganeti.http.client
49 # Module level variable
54 """Initializes the module-global HTTP client manager.
56 Must be called before using any RPC function.
61 assert not _http_manager, "RPC module initialized more than once"
63 _http_manager = http.client.HttpClientManager()
67 """Stops the module-global HTTP client manager.
69 Must be called before quitting the program.
75 _http_manager.Shutdown()
79 class RpcResult(object):
82 This class holds an RPC result. It is needed since in multi-node
83 calls we can't raise an exception just because one one out of many
84 failed, and therefore we use this class to encapsulate the result.
86 @ivar data: the data payload, for successfull results, or None
88 @ivar failed: whether the operation failed at transport level (not
89 application level on the remote node)
90 @ivar call: the name of the RPC call
91 @ivar node: the name of the node to which we made the call
92 @ivar offline: whether the operation failed because the node was
93 offline, as opposed to actual failure; offline=True will always
94 imply failed=True, in order to allow simpler checking if
95 the user doesn't care about the exact failure mode
96 @ivar error: the error message if the call failed
99 def __init__(self, data=None, failed=False, offline=False,
100 call=None, node=None):
102 self.offline = offline
107 self.error = "Node is marked offline"
108 self.data = self.payload = None
110 self.error = self._EnsureErr(data)
111 self.data = self.payload = None
114 if not isinstance(self.data, (tuple, list)):
115 self.error = ("RPC layer error: invalid result type (%s)" %
118 self.error = ("RPC layer error: invalid result length (%d), "
119 "expected 2" % len(self.data))
120 elif not self.data[0]:
121 self.error = self._EnsureErr(self.data[1])
125 self.payload = data[1]
129 """Helper to ensure we return a 'True' value for error."""
133 return "No error information"
136 """If the result has failed, raise an OpExecError.
138 This is used so that LU code doesn't have to check for each
139 result, but instead can call this function.
143 raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
144 (self.call, self.node, self.error))
146 def RemoteFailMsg(self):
147 """Check if the remote procedure failed.
149 @return: the fail_msg attribute
158 This class, given a (remote) method name, a list of parameters and a
159 list of nodes, will contact (in parallel) all nodes, and return a
160 dict of results (key: node name, value: result).
162 One current bug is that generic failure is still signalled by
163 'False' result, which is not good. This overloading of values can
167 def __init__(self, procedure, body, port):
168 self.procedure = procedure
174 http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
175 ssl_cert_path=constants.SSL_CERT_FILE)
177 def ConnectList(self, node_list, address_list=None):
178 """Add a list of nodes to the target nodes.
180 @type node_list: list
181 @param node_list: the list of node names to connect
182 @type address_list: list or None
183 @keyword address_list: either None or a list with node addresses,
184 which must have the same length as the node list
187 if address_list is None:
188 address_list = [None for _ in node_list]
190 assert len(node_list) == len(address_list), \
191 "Name and address lists should have the same length"
192 for node, address in zip(node_list, address_list):
193 self.ConnectNode(node, address)
195 def ConnectNode(self, name, address=None):
196 """Add a node to the target list.
199 @param name: the node name
201 @keyword address: the node address, if known
208 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
209 "/%s" % self.procedure,
211 ssl_params=self._ssl_params,
212 ssl_verify_peer=True)
214 def GetResults(self):
215 """Call nodes and return results.
218 @return: List of RPC results
221 assert _http_manager, "RPC module not intialized"
223 _http_manager.ExecRequests(self.nc.values())
227 for name, req in self.nc.iteritems():
228 if req.success and req.resp_status_code == http.HTTP_OK:
229 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
230 node=name, call=self.procedure)
233 # TODO: Better error reporting
239 logging.error("RPC error in %s from node %s: %s",
240 self.procedure, name, msg)
241 results[name] = RpcResult(data=msg, failed=True, node=name,
247 class RpcRunner(object):
248 """RPC runner class"""
250 def __init__(self, cfg):
251 """Initialized the rpc runner.
253 @type cfg: C{config.ConfigWriter}
254 @param cfg: the configuration object that will be used to get data
259 self.port = utils.GetNodeDaemonPort()
261 def _InstDict(self, instance, hvp=None, bep=None):
262 """Convert the given instance to a dict.
264 This is done via the instance's ToDict() method and additionally
265 we fill the hvparams with the cluster defaults.
267 @type instance: L{objects.Instance}
268 @param instance: an Instance object
269 @type hvp: dict or None
270 @param hvp: a dictionary with overriden hypervisor parameters
271 @type bep: dict or None
272 @param bep: a dictionary with overriden backend parameters
274 @return: the instance dict, with the hvparams filled with the
278 idict = instance.ToDict()
279 cluster = self._cfg.GetClusterInfo()
280 idict["hvparams"] = cluster.FillHV(instance)
282 idict["hvparams"].update(hvp)
283 idict["beparams"] = cluster.FillBE(instance)
285 idict["beparams"].update(bep)
286 for nic in idict["nics"]:
287 nic['nicparams'] = objects.FillDict(
288 cluster.nicparams[constants.PP_DEFAULT],
292 def _ConnectList(self, client, node_list, call):
293 """Helper for computing node addresses.
295 @type client: L{Client}
296 @param client: a C{Client} instance
297 @type node_list: list
298 @param node_list: the node list we should connect
300 @param call: the name of the remote procedure call, for filling in
301 correctly any eventual offline nodes' results
304 all_nodes = self._cfg.GetAllNodesInfo()
308 for node in node_list:
309 if node in all_nodes:
310 if all_nodes[node].offline:
311 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
313 val = all_nodes[node].primary_ip
316 addr_list.append(val)
317 name_list.append(node)
319 client.ConnectList(name_list, address_list=addr_list)
322 def _ConnectNode(self, client, node, call):
323 """Helper for computing one node's address.
325 @type client: L{Client}
326 @param client: a C{Client} instance
328 @param node: the node we should connect
330 @param call: the name of the remote procedure call, for filling in
331 correctly any eventual offline nodes' results
334 node_info = self._cfg.GetNodeInfo(node)
335 if node_info is not None:
336 if node_info.offline:
337 return RpcResult(node=node, offline=True, call=call)
338 addr = node_info.primary_ip
341 client.ConnectNode(node, address=addr)
343 def _MultiNodeCall(self, node_list, procedure, args):
344 """Helper for making a multi-node call
347 body = serializer.DumpJson(args, indent=False)
348 c = Client(procedure, body, self.port)
349 skip_dict = self._ConnectList(c, node_list, procedure)
350 skip_dict.update(c.GetResults())
354 def _StaticMultiNodeCall(cls, node_list, procedure, args,
356 """Helper for making a multi-node static call
359 body = serializer.DumpJson(args, indent=False)
360 c = Client(procedure, body, utils.GetNodeDaemonPort())
361 c.ConnectList(node_list, address_list=address_list)
362 return c.GetResults()
364 def _SingleNodeCall(self, node, procedure, args):
365 """Helper for making a single-node call
368 body = serializer.DumpJson(args, indent=False)
369 c = Client(procedure, body, self.port)
370 result = self._ConnectNode(c, node, procedure)
372 # we did connect, node is not offline
373 result = c.GetResults()[node]
377 def _StaticSingleNodeCall(cls, node, procedure, args):
378 """Helper for making a single-node static call
381 body = serializer.DumpJson(args, indent=False)
382 c = Client(procedure, body, utils.GetNodeDaemonPort())
384 return c.GetResults()[node]
388 """Compresses a string for transport over RPC.
390 Small amounts of data are not compressed.
395 @return: Encoded data to send
398 # Small amounts of data are not compressed
400 return (constants.RPC_ENCODING_NONE, data)
402 # Compress with zlib and encode in base64
403 return (constants.RPC_ENCODING_ZLIB_BASE64,
404 base64.b64encode(zlib.compress(data, 3)))
410 def call_volume_list(self, node_list, vg_name):
411 """Gets the logical volumes present in a given volume group.
413 This is a multi-node call.
416 return self._MultiNodeCall(node_list, "volume_list", [vg_name])
418 def call_vg_list(self, node_list):
419 """Gets the volume group list.
421 This is a multi-node call.
424 return self._MultiNodeCall(node_list, "vg_list", [])
426 def call_bridges_exist(self, node, bridges_list):
427 """Checks if a node has all the bridges given.
429 This method checks if all bridges given in the bridges_list are
430 present on the remote node, so that an instance that uses interfaces
431 on those bridges can be started.
433 This is a single-node call.
436 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
438 def call_instance_start(self, node, instance, hvp, bep):
439 """Starts an instance.
441 This is a single-node call.
444 idict = self._InstDict(instance, hvp=hvp, bep=bep)
445 return self._SingleNodeCall(node, "instance_start", [idict])
447 def call_instance_shutdown(self, node, instance):
448 """Stops an instance.
450 This is a single-node call.
453 return self._SingleNodeCall(node, "instance_shutdown",
454 [self._InstDict(instance)])
456 def call_migration_info(self, node, instance):
457 """Gather the information necessary to prepare an instance migration.
459 This is a single-node call.
462 @param node: the node on which the instance is currently running
463 @type instance: C{objects.Instance}
464 @param instance: the instance definition
467 return self._SingleNodeCall(node, "migration_info",
468 [self._InstDict(instance)])
470 def call_accept_instance(self, node, instance, info, target):
471 """Prepare a node to accept an instance.
473 This is a single-node call.
476 @param node: the target node for the migration
477 @type instance: C{objects.Instance}
478 @param instance: the instance definition
479 @type info: opaque/hypervisor specific (string/data)
480 @param info: result for the call_migration_info call
482 @param target: target hostname (usually ip address) (on the node itself)
485 return self._SingleNodeCall(node, "accept_instance",
486 [self._InstDict(instance), info, target])
488 def call_finalize_migration(self, node, instance, info, success):
489 """Finalize any target-node migration specific operation.
491 This is called both in case of a successful migration and in case of error
492 (in which case it should abort the migration).
494 This is a single-node call.
497 @param node: the target node for the migration
498 @type instance: C{objects.Instance}
499 @param instance: the instance definition
500 @type info: opaque/hypervisor specific (string/data)
501 @param info: result for the call_migration_info call
502 @type success: boolean
503 @param success: whether the migration was a success or a failure
506 return self._SingleNodeCall(node, "finalize_migration",
507 [self._InstDict(instance), info, success])
509 def call_instance_migrate(self, node, instance, target, live):
510 """Migrate an instance.
512 This is a single-node call.
515 @param node: the node on which the instance is currently running
516 @type instance: C{objects.Instance}
517 @param instance: the instance definition
519 @param target: the target node name
521 @param live: whether the migration should be done live or not (the
522 interpretation of this parameter is left to the hypervisor)
525 return self._SingleNodeCall(node, "instance_migrate",
526 [self._InstDict(instance), target, live])
528 def call_instance_reboot(self, node, instance, reboot_type):
529 """Reboots an instance.
531 This is a single-node call.
534 return self._SingleNodeCall(node, "instance_reboot",
535 [self._InstDict(instance), reboot_type])
537 def call_instance_os_add(self, node, inst, reinstall):
538 """Installs an OS on the given instance.
540 This is a single-node call.
543 return self._SingleNodeCall(node, "instance_os_add",
544 [self._InstDict(inst), reinstall])
546 def call_instance_run_rename(self, node, inst, old_name):
547 """Run the OS rename script for an instance.
549 This is a single-node call.
552 return self._SingleNodeCall(node, "instance_run_rename",
553 [self._InstDict(inst), old_name])
555 def call_instance_info(self, node, instance, hname):
556 """Returns information about a single instance.
558 This is a single-node call.
561 @param node: the list of nodes to query
562 @type instance: string
563 @param instance: the instance name
565 @param hname: the hypervisor type of the instance
568 return self._SingleNodeCall(node, "instance_info", [instance, hname])
570 def call_instance_migratable(self, node, instance):
571 """Checks whether the given instance can be migrated.
573 This is a single-node call.
575 @param node: the node to query
576 @type instance: L{objects.Instance}
577 @param instance: the instance to check
581 return self._SingleNodeCall(node, "instance_migratable",
582 [self._InstDict(instance)])
584 def call_all_instances_info(self, node_list, hypervisor_list):
585 """Returns information about all instances on the given nodes.
587 This is a multi-node call.
589 @type node_list: list
590 @param node_list: the list of nodes to query
591 @type hypervisor_list: list
592 @param hypervisor_list: the hypervisors to query for instances
595 return self._MultiNodeCall(node_list, "all_instances_info",
598 def call_instance_list(self, node_list, hypervisor_list):
599 """Returns the list of running instances on a given node.
601 This is a multi-node call.
603 @type node_list: list
604 @param node_list: the list of nodes to query
605 @type hypervisor_list: list
606 @param hypervisor_list: the hypervisors to query for instances
609 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
611 def call_node_tcp_ping(self, node, source, target, port, timeout,
613 """Do a TcpPing on the remote node
615 This is a single-node call.
618 return self._SingleNodeCall(node, "node_tcp_ping",
619 [source, target, port, timeout,
622 def call_node_has_ip_address(self, node, address):
623 """Checks if a node has the given IP address.
625 This is a single-node call.
628 return self._SingleNodeCall(node, "node_has_ip_address", [address])
630 def call_node_info(self, node_list, vg_name, hypervisor_type):
631 """Return node information.
633 This will return memory information and volume group size and free
636 This is a multi-node call.
638 @type node_list: list
639 @param node_list: the list of nodes to query
640 @type vg_name: C{string}
641 @param vg_name: the name of the volume group to ask for disk space
643 @type hypervisor_type: C{str}
644 @param hypervisor_type: the name of the hypervisor to ask for
648 return self._MultiNodeCall(node_list, "node_info",
649 [vg_name, hypervisor_type])
651 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
652 """Add a node to the cluster.
654 This is a single-node call.
657 return self._SingleNodeCall(node, "node_add",
658 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
660 def call_node_verify(self, node_list, checkdict, cluster_name):
661 """Request verification of given parameters.
663 This is a multi-node call.
666 return self._MultiNodeCall(node_list, "node_verify",
667 [checkdict, cluster_name])
670 def call_node_start_master(cls, node, start_daemons):
671 """Tells a node to activate itself as a master.
673 This is a single-node call.
676 return cls._StaticSingleNodeCall(node, "node_start_master",
680 def call_node_stop_master(cls, node, stop_daemons):
681 """Tells a node to demote itself from master status.
683 This is a single-node call.
686 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
689 def call_master_info(cls, node_list):
690 """Query master info.
692 This is a multi-node call.
695 # TODO: should this method query down nodes?
696 return cls._StaticMultiNodeCall(node_list, "master_info", [])
698 def call_version(self, node_list):
699 """Query node version.
701 This is a multi-node call.
704 return self._MultiNodeCall(node_list, "version", [])
706 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
707 """Request creation of a given block device.
709 This is a single-node call.
712 return self._SingleNodeCall(node, "blockdev_create",
713 [bdev.ToDict(), size, owner, on_primary, info])
715 def call_blockdev_remove(self, node, bdev):
716 """Request removal of a given block device.
718 This is a single-node call.
721 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
723 def call_blockdev_rename(self, node, devlist):
724 """Request rename of the given block devices.
726 This is a single-node call.
729 return self._SingleNodeCall(node, "blockdev_rename",
730 [(d.ToDict(), uid) for d, uid in devlist])
732 def call_blockdev_assemble(self, node, disk, owner, on_primary):
733 """Request assembling of a given block device.
735 This is a single-node call.
738 return self._SingleNodeCall(node, "blockdev_assemble",
739 [disk.ToDict(), owner, on_primary])
741 def call_blockdev_shutdown(self, node, disk):
742 """Request shutdown of a given block device.
744 This is a single-node call.
747 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
749 def call_blockdev_addchildren(self, node, bdev, ndevs):
750 """Request adding a list of children to a (mirroring) device.
752 This is a single-node call.
755 return self._SingleNodeCall(node, "blockdev_addchildren",
757 [disk.ToDict() for disk in ndevs]])
759 def call_blockdev_removechildren(self, node, bdev, ndevs):
760 """Request removing a list of children from a (mirroring) device.
762 This is a single-node call.
765 return self._SingleNodeCall(node, "blockdev_removechildren",
767 [disk.ToDict() for disk in ndevs]])
769 def call_blockdev_getmirrorstatus(self, node, disks):
770 """Request status of a (mirroring) device.
772 This is a single-node call.
775 return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
776 [dsk.ToDict() for dsk in disks])
778 def call_blockdev_find(self, node, disk):
779 """Request identification of a given block device.
781 This is a single-node call.
784 return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
786 def call_blockdev_close(self, node, instance_name, disks):
787 """Closes the given block devices.
789 This is a single-node call.
792 params = [instance_name, [cf.ToDict() for cf in disks]]
793 return self._SingleNodeCall(node, "blockdev_close", params)
795 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
796 """Disconnects the network of the given drbd devices.
798 This is a multi-node call.
801 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
802 [nodes_ip, [cf.ToDict() for cf in disks]])
804 def call_drbd_attach_net(self, node_list, nodes_ip,
805 disks, instance_name, multimaster):
806 """Disconnects the given drbd devices.
808 This is a multi-node call.
811 return self._MultiNodeCall(node_list, "drbd_attach_net",
812 [nodes_ip, [cf.ToDict() for cf in disks],
813 instance_name, multimaster])
815 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
816 """Waits for the synchronization of drbd devices is complete.
818 This is a multi-node call.
821 return self._MultiNodeCall(node_list, "drbd_wait_sync",
822 [nodes_ip, [cf.ToDict() for cf in disks]])
825 def call_upload_file(cls, node_list, file_name, address_list=None):
828 The node will refuse the operation in case the file is not on the
831 This is a multi-node call.
833 @type node_list: list
834 @param node_list: the list of node names to upload to
836 @param file_name: the filename to upload
837 @type address_list: list or None
838 @keyword address_list: an optional list of node addresses, in order
839 to optimize the RPC speed
842 file_contents = utils.ReadFile(file_name)
843 data = cls._Compress(file_contents)
844 st = os.stat(file_name)
845 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
846 st.st_atime, st.st_mtime]
847 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
848 address_list=address_list)
851 def call_write_ssconf_files(cls, node_list, values):
852 """Write ssconf files.
854 This is a multi-node call.
857 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
859 def call_os_diagnose(self, node_list):
860 """Request a diagnose of OS definitions.
862 This is a multi-node call.
865 return self._MultiNodeCall(node_list, "os_diagnose", [])
867 def call_os_get(self, node, name):
868 """Returns an OS definition.
870 This is a single-node call.
873 result = self._SingleNodeCall(node, "os_get", [name])
874 if not result.failed and isinstance(result.data, dict):
875 result.data = objects.OS.FromDict(result.data)
878 def call_hooks_runner(self, node_list, hpath, phase, env):
879 """Call the hooks runner.
882 - op: the OpCode instance
883 - env: a dictionary with the environment
885 This is a multi-node call.
888 params = [hpath, phase, env]
889 return self._MultiNodeCall(node_list, "hooks_runner", params)
891 def call_iallocator_runner(self, node, name, idata):
892 """Call an iallocator on a remote node
895 - name: the iallocator name
896 - input: the json-encoded input string
898 This is a single-node call.
901 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
903 def call_blockdev_grow(self, node, cf_bdev, amount):
904 """Request a snapshot of the given block device.
906 This is a single-node call.
909 return self._SingleNodeCall(node, "blockdev_grow",
910 [cf_bdev.ToDict(), amount])
912 def call_blockdev_snapshot(self, node, cf_bdev):
913 """Request a snapshot of the given block device.
915 This is a single-node call.
918 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
920 def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
922 """Request the export of a given snapshot.
924 This is a single-node call.
927 return self._SingleNodeCall(node, "snapshot_export",
928 [snap_bdev.ToDict(), dest_node,
929 self._InstDict(instance), cluster_name, idx])
931 def call_finalize_export(self, node, instance, snap_disks):
932 """Request the completion of an export operation.
934 This writes the export config file, etc.
936 This is a single-node call.
940 for disk in snap_disks:
941 flat_disks.append(disk.ToDict())
943 return self._SingleNodeCall(node, "finalize_export",
944 [self._InstDict(instance), flat_disks])
946 def call_export_info(self, node, path):
947 """Queries the export information in a given path.
949 This is a single-node call.
952 return self._SingleNodeCall(node, "export_info", [path])
954 def call_instance_os_import(self, node, inst, src_node, src_images,
956 """Request the import of a backup into an instance.
958 This is a single-node call.
961 return self._SingleNodeCall(node, "instance_os_import",
962 [self._InstDict(inst), src_node, src_images,
965 def call_export_list(self, node_list):
966 """Gets the stored exports list.
968 This is a multi-node call.
971 return self._MultiNodeCall(node_list, "export_list", [])
973 def call_export_remove(self, node, export):
974 """Requests removal of a given export.
976 This is a single-node call.
979 return self._SingleNodeCall(node, "export_remove", [export])
982 def call_node_leave_cluster(cls, node):
983 """Requests a node to clean the cluster information it has.
985 This will remove the configuration information from the ganeti data
988 This is a single-node call.
991 return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
993 def call_node_volumes(self, node_list):
994 """Gets all volumes on node(s).
996 This is a multi-node call.
999 return self._MultiNodeCall(node_list, "node_volumes", [])
1001 def call_node_demote_from_mc(self, node):
1002 """Demote a node from the master candidate role.
1004 This is a single-node call.
1007 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1010 def call_node_powercycle(self, node, hypervisor):
1011 """Tries to powercycle a node.
1013 This is a single-node call.
1016 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1019 def call_test_delay(self, node_list, duration):
1020 """Sleep for a fixed time on given node(s).
1022 This is a multi-node call.
1025 return self._MultiNodeCall(node_list, "test_delay", [duration])
1027 def call_file_storage_dir_create(self, node, file_storage_dir):
1028 """Create the given file storage directory.
1030 This is a single-node call.
1033 return self._SingleNodeCall(node, "file_storage_dir_create",
1036 def call_file_storage_dir_remove(self, node, file_storage_dir):
1037 """Remove the given file storage directory.
1039 This is a single-node call.
1042 return self._SingleNodeCall(node, "file_storage_dir_remove",
1045 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1046 new_file_storage_dir):
1047 """Rename file storage directory.
1049 This is a single-node call.
1052 return self._SingleNodeCall(node, "file_storage_dir_rename",
1053 [old_file_storage_dir, new_file_storage_dir])
1056 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1057 """Update job queue.
1059 This is a multi-node call.
1062 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1063 [file_name, cls._Compress(content)],
1064 address_list=address_list)
1067 def call_jobqueue_purge(cls, node):
1070 This is a single-node call.
1073 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1076 def call_jobqueue_rename(cls, node_list, address_list, rename):
1077 """Rename a job queue file.
1079 This is a multi-node call.
1082 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1083 address_list=address_list)
1086 def call_jobqueue_set_drain(cls, node_list, drain_flag):
1087 """Set the drain flag on the queue.
1089 This is a multi-node call.
1091 @type node_list: list
1092 @param node_list: the list of nodes to query
1093 @type drain_flag: bool
1094 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1097 return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1100 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1101 """Validate the hypervisor params.
1103 This is a multi-node call.
1105 @type node_list: list
1106 @param node_list: the list of nodes to query
1107 @type hvname: string
1108 @param hvname: the hypervisor name
1109 @type hvparams: dict
1110 @param hvparams: the hypervisor parameters to be validated
1113 cluster = self._cfg.GetClusterInfo()
1114 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1115 return self._MultiNodeCall(node_list, "hypervisor_validate_params",