4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
45 import ganeti.http.client
48 # Module level variable
53 """Initializes the module-global HTTP client manager.
55 Must be called before using any RPC function.
60 assert not _http_manager, "RPC module initialized more than once"
62 _http_manager = http.client.HttpClientManager()
66 """Stops the module-global HTTP client manager.
68 Must be called before quitting the program.
74 _http_manager.Shutdown()
78 class RpcResult(object):
81 This class holds an RPC result. It is needed since in multi-node
82 calls we can't raise an exception just because one one out of many
83 failed, and therefore we use this class to encapsulate the result.
85 @ivar data: the data payload, for successful results, or None
87 @ivar failed: whether the operation failed at transport level (not
88 application level on the remote node)
89 @ivar call: the name of the RPC call
90 @ivar node: the name of the node to which we made the call
91 @ivar offline: whether the operation failed because the node was
92 offline, as opposed to actual failure; offline=True will always
93 imply failed=True, in order to allow simpler checking if
94 the user doesn't care about the exact failure mode
95 @ivar fail_msg: the error message if the call failed
98 def __init__(self, data=None, failed=False, offline=False,
99 call=None, node=None):
101 self.offline = offline
106 self.fail_msg = "Node is marked offline"
107 self.data = self.payload = None
109 self.fail_msg = self._EnsureErr(data)
110 self.data = self.payload = None
113 if not isinstance(self.data, (tuple, list)):
114 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
117 self.fail_msg = ("RPC layer error: invalid result length (%d), "
118 "expected 2" % len(self.data))
119 elif not self.data[0]:
120 self.fail_msg = self._EnsureErr(self.data[1])
124 self.payload = data[1]
128 """Helper to ensure we return a 'True' value for error."""
132 return "No error information"
134 def Raise(self, msg, prereq=False):
135 """If the result has failed, raise an OpExecError.
137 This is used so that LU code doesn't have to check for each
138 result, but instead can call this function.
141 if not self.fail_msg:
144 if not msg: # one could pass None for default message
145 msg = ("Call '%s' to node '%s' has failed: %s" %
146 (self.call, self.node, self.fail_msg))
148 msg = "%s: %s" % (msg, self.fail_msg)
150 ec = errors.OpPrereqError
152 ec = errors.OpExecError
155 def RemoteFailMsg(self):
156 """Check if the remote procedure failed.
158 @return: the fail_msg attribute
167 This class, given a (remote) method name, a list of parameters and a
168 list of nodes, will contact (in parallel) all nodes, and return a
169 dict of results (key: node name, value: result).
171 One current bug is that generic failure is still signaled by
172 'False' result, which is not good. This overloading of values can
176 def __init__(self, procedure, body, port):
177 self.procedure = procedure
183 http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
184 ssl_cert_path=constants.SSL_CERT_FILE)
186 def ConnectList(self, node_list, address_list=None):
187 """Add a list of nodes to the target nodes.
189 @type node_list: list
190 @param node_list: the list of node names to connect
191 @type address_list: list or None
192 @keyword address_list: either None or a list with node addresses,
193 which must have the same length as the node list
196 if address_list is None:
197 address_list = [None for _ in node_list]
199 assert len(node_list) == len(address_list), \
200 "Name and address lists should have the same length"
201 for node, address in zip(node_list, address_list):
202 self.ConnectNode(node, address)
204 def ConnectNode(self, name, address=None):
205 """Add a node to the target list.
208 @param name: the node name
210 @keyword address: the node address, if known
217 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
218 "/%s" % self.procedure,
220 ssl_params=self._ssl_params,
221 ssl_verify_peer=True)
223 def GetResults(self):
224 """Call nodes and return results.
227 @return: List of RPC results
230 assert _http_manager, "RPC module not initialized"
232 _http_manager.ExecRequests(self.nc.values())
236 for name, req in self.nc.iteritems():
237 if req.success and req.resp_status_code == http.HTTP_OK:
238 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
239 node=name, call=self.procedure)
242 # TODO: Better error reporting
248 logging.error("RPC error in %s from node %s: %s",
249 self.procedure, name, msg)
250 results[name] = RpcResult(data=msg, failed=True, node=name,
256 class RpcRunner(object):
257 """RPC runner class"""
259 def __init__(self, cfg):
260 """Initialized the rpc runner.
262 @type cfg: C{config.ConfigWriter}
263 @param cfg: the configuration object that will be used to get data
268 self.port = utils.GetDaemonPort(constants.NODED)
270 def _InstDict(self, instance, hvp=None, bep=None):
271 """Convert the given instance to a dict.
273 This is done via the instance's ToDict() method and additionally
274 we fill the hvparams with the cluster defaults.
276 @type instance: L{objects.Instance}
277 @param instance: an Instance object
278 @type hvp: dict or None
279 @param hvp: a dictionary with overridden hypervisor parameters
280 @type bep: dict or None
281 @param bep: a dictionary with overridden backend parameters
283 @return: the instance dict, with the hvparams filled with the
287 idict = instance.ToDict()
288 cluster = self._cfg.GetClusterInfo()
289 idict["hvparams"] = cluster.FillHV(instance)
291 idict["hvparams"].update(hvp)
292 idict["beparams"] = cluster.FillBE(instance)
294 idict["beparams"].update(bep)
295 for nic in idict["nics"]:
296 nic['nicparams'] = objects.FillDict(
297 cluster.nicparams[constants.PP_DEFAULT],
301 def _ConnectList(self, client, node_list, call):
302 """Helper for computing node addresses.
304 @type client: L{ganeti.rpc.Client}
305 @param client: a C{Client} instance
306 @type node_list: list
307 @param node_list: the node list we should connect
309 @param call: the name of the remote procedure call, for filling in
310 correctly any eventual offline nodes' results
313 all_nodes = self._cfg.GetAllNodesInfo()
317 for node in node_list:
318 if node in all_nodes:
319 if all_nodes[node].offline:
320 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
322 val = all_nodes[node].primary_ip
325 addr_list.append(val)
326 name_list.append(node)
328 client.ConnectList(name_list, address_list=addr_list)
331 def _ConnectNode(self, client, node, call):
332 """Helper for computing one node's address.
334 @type client: L{ganeti.rpc.Client}
335 @param client: a C{Client} instance
337 @param node: the node we should connect
339 @param call: the name of the remote procedure call, for filling in
340 correctly any eventual offline nodes' results
343 node_info = self._cfg.GetNodeInfo(node)
344 if node_info is not None:
345 if node_info.offline:
346 return RpcResult(node=node, offline=True, call=call)
347 addr = node_info.primary_ip
350 client.ConnectNode(node, address=addr)
352 def _MultiNodeCall(self, node_list, procedure, args):
353 """Helper for making a multi-node call
356 body = serializer.DumpJson(args, indent=False)
357 c = Client(procedure, body, self.port)
358 skip_dict = self._ConnectList(c, node_list, procedure)
359 skip_dict.update(c.GetResults())
363 def _StaticMultiNodeCall(cls, node_list, procedure, args,
365 """Helper for making a multi-node static call
368 body = serializer.DumpJson(args, indent=False)
369 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
370 c.ConnectList(node_list, address_list=address_list)
371 return c.GetResults()
373 def _SingleNodeCall(self, node, procedure, args):
374 """Helper for making a single-node call
377 body = serializer.DumpJson(args, indent=False)
378 c = Client(procedure, body, self.port)
379 result = self._ConnectNode(c, node, procedure)
381 # we did connect, node is not offline
382 result = c.GetResults()[node]
386 def _StaticSingleNodeCall(cls, node, procedure, args):
387 """Helper for making a single-node static call
390 body = serializer.DumpJson(args, indent=False)
391 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
393 return c.GetResults()[node]
397 """Compresses a string for transport over RPC.
399 Small amounts of data are not compressed.
404 @return: Encoded data to send
407 # Small amounts of data are not compressed
409 return (constants.RPC_ENCODING_NONE, data)
411 # Compress with zlib and encode in base64
412 return (constants.RPC_ENCODING_ZLIB_BASE64,
413 base64.b64encode(zlib.compress(data, 3)))
419 def call_lv_list(self, node_list, vg_name):
420 """Gets the logical volumes present in a given volume group.
422 This is a multi-node call.
425 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
427 def call_vg_list(self, node_list):
428 """Gets the volume group list.
430 This is a multi-node call.
433 return self._MultiNodeCall(node_list, "vg_list", [])
435 def call_storage_list(self, node_list, su_name, su_args, name, fields):
436 """Get list of storage units.
438 This is a multi-node call.
441 return self._MultiNodeCall(node_list, "storage_list",
442 [su_name, su_args, name, fields])
444 def call_storage_modify(self, node, su_name, su_args, name, changes):
445 """Modify a storage unit.
447 This is a single-node call.
450 return self._SingleNodeCall(node, "storage_modify",
451 [su_name, su_args, name, changes])
453 def call_bridges_exist(self, node, bridges_list):
454 """Checks if a node has all the bridges given.
456 This method checks if all bridges given in the bridges_list are
457 present on the remote node, so that an instance that uses interfaces
458 on those bridges can be started.
460 This is a single-node call.
463 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
465 def call_instance_start(self, node, instance, hvp, bep):
466 """Starts an instance.
468 This is a single-node call.
471 idict = self._InstDict(instance, hvp=hvp, bep=bep)
472 return self._SingleNodeCall(node, "instance_start", [idict])
474 def call_instance_shutdown(self, node, instance):
475 """Stops an instance.
477 This is a single-node call.
480 return self._SingleNodeCall(node, "instance_shutdown",
481 [self._InstDict(instance)])
483 def call_migration_info(self, node, instance):
484 """Gather the information necessary to prepare an instance migration.
486 This is a single-node call.
489 @param node: the node on which the instance is currently running
490 @type instance: C{objects.Instance}
491 @param instance: the instance definition
494 return self._SingleNodeCall(node, "migration_info",
495 [self._InstDict(instance)])
497 def call_accept_instance(self, node, instance, info, target):
498 """Prepare a node to accept an instance.
500 This is a single-node call.
503 @param node: the target node for the migration
504 @type instance: C{objects.Instance}
505 @param instance: the instance definition
506 @type info: opaque/hypervisor specific (string/data)
507 @param info: result for the call_migration_info call
509 @param target: target hostname (usually ip address) (on the node itself)
512 return self._SingleNodeCall(node, "accept_instance",
513 [self._InstDict(instance), info, target])
515 def call_finalize_migration(self, node, instance, info, success):
516 """Finalize any target-node migration specific operation.
518 This is called both in case of a successful migration and in case of error
519 (in which case it should abort the migration).
521 This is a single-node call.
524 @param node: the target node for the migration
525 @type instance: C{objects.Instance}
526 @param instance: the instance definition
527 @type info: opaque/hypervisor specific (string/data)
528 @param info: result for the call_migration_info call
529 @type success: boolean
530 @param success: whether the migration was a success or a failure
533 return self._SingleNodeCall(node, "finalize_migration",
534 [self._InstDict(instance), info, success])
536 def call_instance_migrate(self, node, instance, target, live):
537 """Migrate an instance.
539 This is a single-node call.
542 @param node: the node on which the instance is currently running
543 @type instance: C{objects.Instance}
544 @param instance: the instance definition
546 @param target: the target node name
548 @param live: whether the migration should be done live or not (the
549 interpretation of this parameter is left to the hypervisor)
552 return self._SingleNodeCall(node, "instance_migrate",
553 [self._InstDict(instance), target, live])
555 def call_instance_reboot(self, node, instance, reboot_type):
556 """Reboots an instance.
558 This is a single-node call.
561 return self._SingleNodeCall(node, "instance_reboot",
562 [self._InstDict(instance), reboot_type])
564 def call_instance_os_add(self, node, inst, reinstall):
565 """Installs an OS on the given instance.
567 This is a single-node call.
570 return self._SingleNodeCall(node, "instance_os_add",
571 [self._InstDict(inst), reinstall])
573 def call_instance_run_rename(self, node, inst, old_name):
574 """Run the OS rename script for an instance.
576 This is a single-node call.
579 return self._SingleNodeCall(node, "instance_run_rename",
580 [self._InstDict(inst), old_name])
582 def call_instance_info(self, node, instance, hname):
583 """Returns information about a single instance.
585 This is a single-node call.
588 @param node: the list of nodes to query
589 @type instance: string
590 @param instance: the instance name
592 @param hname: the hypervisor type of the instance
595 return self._SingleNodeCall(node, "instance_info", [instance, hname])
597 def call_instance_migratable(self, node, instance):
598 """Checks whether the given instance can be migrated.
600 This is a single-node call.
602 @param node: the node to query
603 @type instance: L{objects.Instance}
604 @param instance: the instance to check
608 return self._SingleNodeCall(node, "instance_migratable",
609 [self._InstDict(instance)])
611 def call_all_instances_info(self, node_list, hypervisor_list):
612 """Returns information about all instances on the given nodes.
614 This is a multi-node call.
616 @type node_list: list
617 @param node_list: the list of nodes to query
618 @type hypervisor_list: list
619 @param hypervisor_list: the hypervisors to query for instances
622 return self._MultiNodeCall(node_list, "all_instances_info",
625 def call_instance_list(self, node_list, hypervisor_list):
626 """Returns the list of running instances on a given node.
628 This is a multi-node call.
630 @type node_list: list
631 @param node_list: the list of nodes to query
632 @type hypervisor_list: list
633 @param hypervisor_list: the hypervisors to query for instances
636 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
638 def call_node_tcp_ping(self, node, source, target, port, timeout,
640 """Do a TcpPing on the remote node
642 This is a single-node call.
645 return self._SingleNodeCall(node, "node_tcp_ping",
646 [source, target, port, timeout,
649 def call_node_has_ip_address(self, node, address):
650 """Checks if a node has the given IP address.
652 This is a single-node call.
655 return self._SingleNodeCall(node, "node_has_ip_address", [address])
657 def call_node_info(self, node_list, vg_name, hypervisor_type):
658 """Return node information.
660 This will return memory information and volume group size and free
663 This is a multi-node call.
665 @type node_list: list
666 @param node_list: the list of nodes to query
667 @type vg_name: C{string}
668 @param vg_name: the name of the volume group to ask for disk space
670 @type hypervisor_type: C{str}
671 @param hypervisor_type: the name of the hypervisor to ask for
675 return self._MultiNodeCall(node_list, "node_info",
676 [vg_name, hypervisor_type])
678 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
679 """Add a node to the cluster.
681 This is a single-node call.
684 return self._SingleNodeCall(node, "node_add",
685 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
687 def call_node_verify(self, node_list, checkdict, cluster_name):
688 """Request verification of given parameters.
690 This is a multi-node call.
693 return self._MultiNodeCall(node_list, "node_verify",
694 [checkdict, cluster_name])
697 def call_node_start_master(cls, node, start_daemons, no_voting):
698 """Tells a node to activate itself as a master.
700 This is a single-node call.
703 return cls._StaticSingleNodeCall(node, "node_start_master",
704 [start_daemons, no_voting])
707 def call_node_stop_master(cls, node, stop_daemons):
708 """Tells a node to demote itself from master status.
710 This is a single-node call.
713 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
716 def call_master_info(cls, node_list):
717 """Query master info.
719 This is a multi-node call.
722 # TODO: should this method query down nodes?
723 return cls._StaticMultiNodeCall(node_list, "master_info", [])
725 def call_version(self, node_list):
726 """Query node version.
728 This is a multi-node call.
731 return self._MultiNodeCall(node_list, "version", [])
733 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
734 """Request creation of a given block device.
736 This is a single-node call.
739 return self._SingleNodeCall(node, "blockdev_create",
740 [bdev.ToDict(), size, owner, on_primary, info])
742 def call_blockdev_remove(self, node, bdev):
743 """Request removal of a given block device.
745 This is a single-node call.
748 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
750 def call_blockdev_rename(self, node, devlist):
751 """Request rename of the given block devices.
753 This is a single-node call.
756 return self._SingleNodeCall(node, "blockdev_rename",
757 [(d.ToDict(), uid) for d, uid in devlist])
759 def call_blockdev_assemble(self, node, disk, owner, on_primary):
760 """Request assembling of a given block device.
762 This is a single-node call.
765 return self._SingleNodeCall(node, "blockdev_assemble",
766 [disk.ToDict(), owner, on_primary])
768 def call_blockdev_shutdown(self, node, disk):
769 """Request shutdown of a given block device.
771 This is a single-node call.
774 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
776 def call_blockdev_addchildren(self, node, bdev, ndevs):
777 """Request adding a list of children to a (mirroring) device.
779 This is a single-node call.
782 return self._SingleNodeCall(node, "blockdev_addchildren",
784 [disk.ToDict() for disk in ndevs]])
786 def call_blockdev_removechildren(self, node, bdev, ndevs):
787 """Request removing a list of children from a (mirroring) device.
789 This is a single-node call.
792 return self._SingleNodeCall(node, "blockdev_removechildren",
794 [disk.ToDict() for disk in ndevs]])
796 def call_blockdev_getmirrorstatus(self, node, disks):
797 """Request status of a (mirroring) device.
799 This is a single-node call.
802 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
803 [dsk.ToDict() for dsk in disks])
804 if not result.failed:
805 result.payload = [objects.BlockDevStatus.FromDict(i)
806 for i in result.payload]
809 def call_blockdev_find(self, node, disk):
810 """Request identification of a given block device.
812 This is a single-node call.
815 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
816 if not result.failed and result.payload is not None:
817 result.payload = objects.BlockDevStatus.FromDict(result.payload)
820 def call_blockdev_close(self, node, instance_name, disks):
821 """Closes the given block devices.
823 This is a single-node call.
826 params = [instance_name, [cf.ToDict() for cf in disks]]
827 return self._SingleNodeCall(node, "blockdev_close", params)
829 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
830 """Disconnects the network of the given drbd devices.
832 This is a multi-node call.
835 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
836 [nodes_ip, [cf.ToDict() for cf in disks]])
838 def call_drbd_attach_net(self, node_list, nodes_ip,
839 disks, instance_name, multimaster):
840 """Disconnects the given drbd devices.
842 This is a multi-node call.
845 return self._MultiNodeCall(node_list, "drbd_attach_net",
846 [nodes_ip, [cf.ToDict() for cf in disks],
847 instance_name, multimaster])
849 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
850 """Waits for the synchronization of drbd devices is complete.
852 This is a multi-node call.
855 return self._MultiNodeCall(node_list, "drbd_wait_sync",
856 [nodes_ip, [cf.ToDict() for cf in disks]])
859 def call_upload_file(cls, node_list, file_name, address_list=None):
862 The node will refuse the operation in case the file is not on the
865 This is a multi-node call.
867 @type node_list: list
868 @param node_list: the list of node names to upload to
870 @param file_name: the filename to upload
871 @type address_list: list or None
872 @keyword address_list: an optional list of node addresses, in order
873 to optimize the RPC speed
876 file_contents = utils.ReadFile(file_name)
877 data = cls._Compress(file_contents)
878 st = os.stat(file_name)
879 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
880 st.st_atime, st.st_mtime]
881 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
882 address_list=address_list)
885 def call_write_ssconf_files(cls, node_list, values):
886 """Write ssconf files.
888 This is a multi-node call.
891 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
893 def call_os_diagnose(self, node_list):
894 """Request a diagnose of OS definitions.
896 This is a multi-node call.
899 return self._MultiNodeCall(node_list, "os_diagnose", [])
901 def call_os_get(self, node, name):
902 """Returns an OS definition.
904 This is a single-node call.
907 result = self._SingleNodeCall(node, "os_get", [name])
908 if not result.failed and isinstance(result.data, dict):
909 result.data = objects.OS.FromDict(result.data)
912 def call_hooks_runner(self, node_list, hpath, phase, env):
913 """Call the hooks runner.
916 - op: the OpCode instance
917 - env: a dictionary with the environment
919 This is a multi-node call.
922 params = [hpath, phase, env]
923 return self._MultiNodeCall(node_list, "hooks_runner", params)
925 def call_iallocator_runner(self, node, name, idata):
926 """Call an iallocator on a remote node
929 - name: the iallocator name
930 - input: the json-encoded input string
932 This is a single-node call.
935 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
937 def call_blockdev_grow(self, node, cf_bdev, amount):
938 """Request a snapshot of the given block device.
940 This is a single-node call.
943 return self._SingleNodeCall(node, "blockdev_grow",
944 [cf_bdev.ToDict(), amount])
946 def call_blockdev_snapshot(self, node, cf_bdev):
947 """Request a snapshot of the given block device.
949 This is a single-node call.
952 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
954 def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
956 """Request the export of a given snapshot.
958 This is a single-node call.
961 return self._SingleNodeCall(node, "snapshot_export",
962 [snap_bdev.ToDict(), dest_node,
963 self._InstDict(instance), cluster_name, idx])
965 def call_finalize_export(self, node, instance, snap_disks):
966 """Request the completion of an export operation.
968 This writes the export config file, etc.
970 This is a single-node call.
974 for disk in snap_disks:
975 if isinstance(disk, bool):
976 flat_disks.append(disk)
978 flat_disks.append(disk.ToDict())
980 return self._SingleNodeCall(node, "finalize_export",
981 [self._InstDict(instance), flat_disks])
983 def call_export_info(self, node, path):
984 """Queries the export information in a given path.
986 This is a single-node call.
989 return self._SingleNodeCall(node, "export_info", [path])
991 def call_instance_os_import(self, node, inst, src_node, src_images,
993 """Request the import of a backup into an instance.
995 This is a single-node call.
998 return self._SingleNodeCall(node, "instance_os_import",
999 [self._InstDict(inst), src_node, src_images,
1002 def call_export_list(self, node_list):
1003 """Gets the stored exports list.
1005 This is a multi-node call.
1008 return self._MultiNodeCall(node_list, "export_list", [])
1010 def call_export_remove(self, node, export):
1011 """Requests removal of a given export.
1013 This is a single-node call.
1016 return self._SingleNodeCall(node, "export_remove", [export])
1019 def call_node_leave_cluster(cls, node):
1020 """Requests a node to clean the cluster information it has.
1022 This will remove the configuration information from the ganeti data
1025 This is a single-node call.
1028 return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1030 def call_node_volumes(self, node_list):
1031 """Gets all volumes on node(s).
1033 This is a multi-node call.
1036 return self._MultiNodeCall(node_list, "node_volumes", [])
1038 def call_node_demote_from_mc(self, node):
1039 """Demote a node from the master candidate role.
1041 This is a single-node call.
1044 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1047 def call_node_powercycle(self, node, hypervisor):
1048 """Tries to powercycle a node.
1050 This is a single-node call.
1053 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1056 def call_test_delay(self, node_list, duration):
1057 """Sleep for a fixed time on given node(s).
1059 This is a multi-node call.
1062 return self._MultiNodeCall(node_list, "test_delay", [duration])
1064 def call_file_storage_dir_create(self, node, file_storage_dir):
1065 """Create the given file storage directory.
1067 This is a single-node call.
1070 return self._SingleNodeCall(node, "file_storage_dir_create",
1073 def call_file_storage_dir_remove(self, node, file_storage_dir):
1074 """Remove the given file storage directory.
1076 This is a single-node call.
1079 return self._SingleNodeCall(node, "file_storage_dir_remove",
1082 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1083 new_file_storage_dir):
1084 """Rename file storage directory.
1086 This is a single-node call.
1089 return self._SingleNodeCall(node, "file_storage_dir_rename",
1090 [old_file_storage_dir, new_file_storage_dir])
1093 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1094 """Update job queue.
1096 This is a multi-node call.
1099 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1100 [file_name, cls._Compress(content)],
1101 address_list=address_list)
1104 def call_jobqueue_purge(cls, node):
1107 This is a single-node call.
1110 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1113 def call_jobqueue_rename(cls, node_list, address_list, rename):
1114 """Rename a job queue file.
1116 This is a multi-node call.
1119 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1120 address_list=address_list)
1123 def call_jobqueue_set_drain(cls, node_list, drain_flag):
1124 """Set the drain flag on the queue.
1126 This is a multi-node call.
1128 @type node_list: list
1129 @param node_list: the list of nodes to query
1130 @type drain_flag: bool
1131 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1134 return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1137 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1138 """Validate the hypervisor params.
1140 This is a multi-node call.
1142 @type node_list: list
1143 @param node_list: the list of nodes to query
1144 @type hvname: string
1145 @param hvname: the hypervisor name
1146 @type hvparams: dict
1147 @param hvparams: the hypervisor parameters to be validated
1150 cluster = self._cfg.GetClusterInfo()
1151 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1152 return self._MultiNodeCall(node_list, "hypervisor_validate_params",