4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
45 import ganeti.http.client
48 # Module level variable
53 """Initializes the module-global HTTP client manager.
55 Must be called before using any RPC function.
60 assert not _http_manager, "RPC module initialized more than once"
62 _http_manager = http.client.HttpClientManager()
66 """Stops the module-global HTTP client manager.
68 Must be called before quitting the program.
74 _http_manager.Shutdown()
78 class RpcResult(object):
81 This class holds an RPC result. It is needed since in multi-node
82 calls we can't raise an exception just because one one out of many
83 failed, and therefore we use this class to encapsulate the result.
85 @ivar data: the data payload, for successful results, or None
86 @ivar call: the name of the RPC call
87 @ivar node: the name of the node to which we made the call
88 @ivar offline: whether the operation failed because the node was
89 offline, as opposed to actual failure; offline=True will always
90 imply failed=True, in order to allow simpler checking if
91 the user doesn't care about the exact failure mode
92 @ivar fail_msg: the error message if the call failed
95 def __init__(self, data=None, failed=False, offline=False,
96 call=None, node=None):
97 self.offline = offline
101 self.fail_msg = "Node is marked offline"
102 self.data = self.payload = None
104 self.fail_msg = self._EnsureErr(data)
105 self.data = self.payload = None
108 if not isinstance(self.data, (tuple, list)):
109 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
112 self.fail_msg = ("RPC layer error: invalid result length (%d), "
113 "expected 2" % len(self.data))
114 elif not self.data[0]:
115 self.fail_msg = self._EnsureErr(self.data[1])
119 self.payload = data[1]
123 """Helper to ensure we return a 'True' value for error."""
127 return "No error information"
129 def Raise(self, msg, prereq=False):
130 """If the result has failed, raise an OpExecError.
132 This is used so that LU code doesn't have to check for each
133 result, but instead can call this function.
136 if not self.fail_msg:
139 if not msg: # one could pass None for default message
140 msg = ("Call '%s' to node '%s' has failed: %s" %
141 (self.call, self.node, self.fail_msg))
143 msg = "%s: %s" % (msg, self.fail_msg)
145 ec = errors.OpPrereqError
147 ec = errors.OpExecError
150 def RemoteFailMsg(self):
151 """Check if the remote procedure failed.
153 @return: the fail_msg attribute
162 This class, given a (remote) method name, a list of parameters and a
163 list of nodes, will contact (in parallel) all nodes, and return a
164 dict of results (key: node name, value: result).
166 One current bug is that generic failure is still signaled by
167 'False' result, which is not good. This overloading of values can
171 def __init__(self, procedure, body, port):
172 self.procedure = procedure
178 http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
179 ssl_cert_path=constants.SSL_CERT_FILE)
181 def ConnectList(self, node_list, address_list=None):
182 """Add a list of nodes to the target nodes.
184 @type node_list: list
185 @param node_list: the list of node names to connect
186 @type address_list: list or None
187 @keyword address_list: either None or a list with node addresses,
188 which must have the same length as the node list
191 if address_list is None:
192 address_list = [None for _ in node_list]
194 assert len(node_list) == len(address_list), \
195 "Name and address lists should have the same length"
196 for node, address in zip(node_list, address_list):
197 self.ConnectNode(node, address)
199 def ConnectNode(self, name, address=None):
200 """Add a node to the target list.
203 @param name: the node name
205 @keyword address: the node address, if known
212 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
213 "/%s" % self.procedure,
215 ssl_params=self._ssl_params,
216 ssl_verify_peer=True)
218 def GetResults(self):
219 """Call nodes and return results.
222 @return: List of RPC results
225 assert _http_manager, "RPC module not initialized"
227 _http_manager.ExecRequests(self.nc.values())
231 for name, req in self.nc.iteritems():
232 if req.success and req.resp_status_code == http.HTTP_OK:
233 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
234 node=name, call=self.procedure)
237 # TODO: Better error reporting
243 logging.error("RPC error in %s from node %s: %s",
244 self.procedure, name, msg)
245 results[name] = RpcResult(data=msg, failed=True, node=name,
251 class RpcRunner(object):
252 """RPC runner class"""
254 def __init__(self, cfg):
255 """Initialized the rpc runner.
257 @type cfg: C{config.ConfigWriter}
258 @param cfg: the configuration object that will be used to get data
263 self.port = utils.GetDaemonPort(constants.NODED)
265 def _InstDict(self, instance, hvp=None, bep=None):
266 """Convert the given instance to a dict.
268 This is done via the instance's ToDict() method and additionally
269 we fill the hvparams with the cluster defaults.
271 @type instance: L{objects.Instance}
272 @param instance: an Instance object
273 @type hvp: dict or None
274 @param hvp: a dictionary with overridden hypervisor parameters
275 @type bep: dict or None
276 @param bep: a dictionary with overridden backend parameters
278 @return: the instance dict, with the hvparams filled with the
282 idict = instance.ToDict()
283 cluster = self._cfg.GetClusterInfo()
284 idict["hvparams"] = cluster.FillHV(instance)
286 idict["hvparams"].update(hvp)
287 idict["beparams"] = cluster.FillBE(instance)
289 idict["beparams"].update(bep)
290 for nic in idict["nics"]:
291 nic['nicparams'] = objects.FillDict(
292 cluster.nicparams[constants.PP_DEFAULT],
296 def _ConnectList(self, client, node_list, call):
297 """Helper for computing node addresses.
299 @type client: L{ganeti.rpc.Client}
300 @param client: a C{Client} instance
301 @type node_list: list
302 @param node_list: the node list we should connect
304 @param call: the name of the remote procedure call, for filling in
305 correctly any eventual offline nodes' results
308 all_nodes = self._cfg.GetAllNodesInfo()
312 for node in node_list:
313 if node in all_nodes:
314 if all_nodes[node].offline:
315 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
317 val = all_nodes[node].primary_ip
320 addr_list.append(val)
321 name_list.append(node)
323 client.ConnectList(name_list, address_list=addr_list)
326 def _ConnectNode(self, client, node, call):
327 """Helper for computing one node's address.
329 @type client: L{ganeti.rpc.Client}
330 @param client: a C{Client} instance
332 @param node: the node we should connect
334 @param call: the name of the remote procedure call, for filling in
335 correctly any eventual offline nodes' results
338 node_info = self._cfg.GetNodeInfo(node)
339 if node_info is not None:
340 if node_info.offline:
341 return RpcResult(node=node, offline=True, call=call)
342 addr = node_info.primary_ip
345 client.ConnectNode(node, address=addr)
347 def _MultiNodeCall(self, node_list, procedure, args):
348 """Helper for making a multi-node call
351 body = serializer.DumpJson(args, indent=False)
352 c = Client(procedure, body, self.port)
353 skip_dict = self._ConnectList(c, node_list, procedure)
354 skip_dict.update(c.GetResults())
358 def _StaticMultiNodeCall(cls, node_list, procedure, args,
360 """Helper for making a multi-node static call
363 body = serializer.DumpJson(args, indent=False)
364 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
365 c.ConnectList(node_list, address_list=address_list)
366 return c.GetResults()
368 def _SingleNodeCall(self, node, procedure, args):
369 """Helper for making a single-node call
372 body = serializer.DumpJson(args, indent=False)
373 c = Client(procedure, body, self.port)
374 result = self._ConnectNode(c, node, procedure)
376 # we did connect, node is not offline
377 result = c.GetResults()[node]
381 def _StaticSingleNodeCall(cls, node, procedure, args):
382 """Helper for making a single-node static call
385 body = serializer.DumpJson(args, indent=False)
386 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
388 return c.GetResults()[node]
392 """Compresses a string for transport over RPC.
394 Small amounts of data are not compressed.
399 @return: Encoded data to send
402 # Small amounts of data are not compressed
404 return (constants.RPC_ENCODING_NONE, data)
406 # Compress with zlib and encode in base64
407 return (constants.RPC_ENCODING_ZLIB_BASE64,
408 base64.b64encode(zlib.compress(data, 3)))
414 def call_lv_list(self, node_list, vg_name):
415 """Gets the logical volumes present in a given volume group.
417 This is a multi-node call.
420 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
422 def call_vg_list(self, node_list):
423 """Gets the volume group list.
425 This is a multi-node call.
428 return self._MultiNodeCall(node_list, "vg_list", [])
430 def call_storage_list(self, node_list, su_name, su_args, name, fields):
431 """Get list of storage units.
433 This is a multi-node call.
436 return self._MultiNodeCall(node_list, "storage_list",
437 [su_name, su_args, name, fields])
439 def call_storage_modify(self, node, su_name, su_args, name, changes):
440 """Modify a storage unit.
442 This is a single-node call.
445 return self._SingleNodeCall(node, "storage_modify",
446 [su_name, su_args, name, changes])
448 def call_storage_execute(self, node, su_name, su_args, name, op):
449 """Executes an operation on a storage unit.
451 This is a single-node call.
454 return self._SingleNodeCall(node, "storage_execute",
455 [su_name, su_args, name, op])
457 def call_bridges_exist(self, node, bridges_list):
458 """Checks if a node has all the bridges given.
460 This method checks if all bridges given in the bridges_list are
461 present on the remote node, so that an instance that uses interfaces
462 on those bridges can be started.
464 This is a single-node call.
467 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
469 def call_instance_start(self, node, instance, hvp, bep):
470 """Starts an instance.
472 This is a single-node call.
475 idict = self._InstDict(instance, hvp=hvp, bep=bep)
476 return self._SingleNodeCall(node, "instance_start", [idict])
478 def call_instance_shutdown(self, node, instance):
479 """Stops an instance.
481 This is a single-node call.
484 return self._SingleNodeCall(node, "instance_shutdown",
485 [self._InstDict(instance)])
487 def call_migration_info(self, node, instance):
488 """Gather the information necessary to prepare an instance migration.
490 This is a single-node call.
493 @param node: the node on which the instance is currently running
494 @type instance: C{objects.Instance}
495 @param instance: the instance definition
498 return self._SingleNodeCall(node, "migration_info",
499 [self._InstDict(instance)])
501 def call_accept_instance(self, node, instance, info, target):
502 """Prepare a node to accept an instance.
504 This is a single-node call.
507 @param node: the target node for the migration
508 @type instance: C{objects.Instance}
509 @param instance: the instance definition
510 @type info: opaque/hypervisor specific (string/data)
511 @param info: result for the call_migration_info call
513 @param target: target hostname (usually ip address) (on the node itself)
516 return self._SingleNodeCall(node, "accept_instance",
517 [self._InstDict(instance), info, target])
519 def call_finalize_migration(self, node, instance, info, success):
520 """Finalize any target-node migration specific operation.
522 This is called both in case of a successful migration and in case of error
523 (in which case it should abort the migration).
525 This is a single-node call.
528 @param node: the target node for the migration
529 @type instance: C{objects.Instance}
530 @param instance: the instance definition
531 @type info: opaque/hypervisor specific (string/data)
532 @param info: result for the call_migration_info call
533 @type success: boolean
534 @param success: whether the migration was a success or a failure
537 return self._SingleNodeCall(node, "finalize_migration",
538 [self._InstDict(instance), info, success])
540 def call_instance_migrate(self, node, instance, target, live):
541 """Migrate an instance.
543 This is a single-node call.
546 @param node: the node on which the instance is currently running
547 @type instance: C{objects.Instance}
548 @param instance: the instance definition
550 @param target: the target node name
552 @param live: whether the migration should be done live or not (the
553 interpretation of this parameter is left to the hypervisor)
556 return self._SingleNodeCall(node, "instance_migrate",
557 [self._InstDict(instance), target, live])
559 def call_instance_reboot(self, node, instance, reboot_type):
560 """Reboots an instance.
562 This is a single-node call.
565 return self._SingleNodeCall(node, "instance_reboot",
566 [self._InstDict(instance), reboot_type])
568 def call_instance_os_add(self, node, inst, reinstall):
569 """Installs an OS on the given instance.
571 This is a single-node call.
574 return self._SingleNodeCall(node, "instance_os_add",
575 [self._InstDict(inst), reinstall])
577 def call_instance_run_rename(self, node, inst, old_name):
578 """Run the OS rename script for an instance.
580 This is a single-node call.
583 return self._SingleNodeCall(node, "instance_run_rename",
584 [self._InstDict(inst), old_name])
586 def call_instance_info(self, node, instance, hname):
587 """Returns information about a single instance.
589 This is a single-node call.
592 @param node: the list of nodes to query
593 @type instance: string
594 @param instance: the instance name
596 @param hname: the hypervisor type of the instance
599 return self._SingleNodeCall(node, "instance_info", [instance, hname])
601 def call_instance_migratable(self, node, instance):
602 """Checks whether the given instance can be migrated.
604 This is a single-node call.
606 @param node: the node to query
607 @type instance: L{objects.Instance}
608 @param instance: the instance to check
612 return self._SingleNodeCall(node, "instance_migratable",
613 [self._InstDict(instance)])
615 def call_all_instances_info(self, node_list, hypervisor_list):
616 """Returns information about all instances on the given nodes.
618 This is a multi-node call.
620 @type node_list: list
621 @param node_list: the list of nodes to query
622 @type hypervisor_list: list
623 @param hypervisor_list: the hypervisors to query for instances
626 return self._MultiNodeCall(node_list, "all_instances_info",
629 def call_instance_list(self, node_list, hypervisor_list):
630 """Returns the list of running instances on a given node.
632 This is a multi-node call.
634 @type node_list: list
635 @param node_list: the list of nodes to query
636 @type hypervisor_list: list
637 @param hypervisor_list: the hypervisors to query for instances
640 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
642 def call_node_tcp_ping(self, node, source, target, port, timeout,
644 """Do a TcpPing on the remote node
646 This is a single-node call.
649 return self._SingleNodeCall(node, "node_tcp_ping",
650 [source, target, port, timeout,
653 def call_node_has_ip_address(self, node, address):
654 """Checks if a node has the given IP address.
656 This is a single-node call.
659 return self._SingleNodeCall(node, "node_has_ip_address", [address])
661 def call_node_info(self, node_list, vg_name, hypervisor_type):
662 """Return node information.
664 This will return memory information and volume group size and free
667 This is a multi-node call.
669 @type node_list: list
670 @param node_list: the list of nodes to query
671 @type vg_name: C{string}
672 @param vg_name: the name of the volume group to ask for disk space
674 @type hypervisor_type: C{str}
675 @param hypervisor_type: the name of the hypervisor to ask for
679 return self._MultiNodeCall(node_list, "node_info",
680 [vg_name, hypervisor_type])
682 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
683 """Add a node to the cluster.
685 This is a single-node call.
688 return self._SingleNodeCall(node, "node_add",
689 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
691 def call_node_verify(self, node_list, checkdict, cluster_name):
692 """Request verification of given parameters.
694 This is a multi-node call.
697 return self._MultiNodeCall(node_list, "node_verify",
698 [checkdict, cluster_name])
701 def call_node_start_master(cls, node, start_daemons, no_voting):
702 """Tells a node to activate itself as a master.
704 This is a single-node call.
707 return cls._StaticSingleNodeCall(node, "node_start_master",
708 [start_daemons, no_voting])
711 def call_node_stop_master(cls, node, stop_daemons):
712 """Tells a node to demote itself from master status.
714 This is a single-node call.
717 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
720 def call_master_info(cls, node_list):
721 """Query master info.
723 This is a multi-node call.
726 # TODO: should this method query down nodes?
727 return cls._StaticMultiNodeCall(node_list, "master_info", [])
729 def call_version(self, node_list):
730 """Query node version.
732 This is a multi-node call.
735 return self._MultiNodeCall(node_list, "version", [])
737 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
738 """Request creation of a given block device.
740 This is a single-node call.
743 return self._SingleNodeCall(node, "blockdev_create",
744 [bdev.ToDict(), size, owner, on_primary, info])
746 def call_blockdev_remove(self, node, bdev):
747 """Request removal of a given block device.
749 This is a single-node call.
752 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
754 def call_blockdev_rename(self, node, devlist):
755 """Request rename of the given block devices.
757 This is a single-node call.
760 return self._SingleNodeCall(node, "blockdev_rename",
761 [(d.ToDict(), uid) for d, uid in devlist])
763 def call_blockdev_assemble(self, node, disk, owner, on_primary):
764 """Request assembling of a given block device.
766 This is a single-node call.
769 return self._SingleNodeCall(node, "blockdev_assemble",
770 [disk.ToDict(), owner, on_primary])
772 def call_blockdev_shutdown(self, node, disk):
773 """Request shutdown of a given block device.
775 This is a single-node call.
778 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
780 def call_blockdev_addchildren(self, node, bdev, ndevs):
781 """Request adding a list of children to a (mirroring) device.
783 This is a single-node call.
786 return self._SingleNodeCall(node, "blockdev_addchildren",
788 [disk.ToDict() for disk in ndevs]])
790 def call_blockdev_removechildren(self, node, bdev, ndevs):
791 """Request removing a list of children from a (mirroring) device.
793 This is a single-node call.
796 return self._SingleNodeCall(node, "blockdev_removechildren",
798 [disk.ToDict() for disk in ndevs]])
800 def call_blockdev_getmirrorstatus(self, node, disks):
801 """Request status of a (mirroring) device.
803 This is a single-node call.
806 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
807 [dsk.ToDict() for dsk in disks])
808 if not result.fail_msg:
809 result.payload = [objects.BlockDevStatus.FromDict(i)
810 for i in result.payload]
813 def call_blockdev_find(self, node, disk):
814 """Request identification of a given block device.
816 This is a single-node call.
819 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
820 if not result.fail_msg and result.payload is not None:
821 result.payload = objects.BlockDevStatus.FromDict(result.payload)
824 def call_blockdev_close(self, node, instance_name, disks):
825 """Closes the given block devices.
827 This is a single-node call.
830 params = [instance_name, [cf.ToDict() for cf in disks]]
831 return self._SingleNodeCall(node, "blockdev_close", params)
833 def call_blockdev_getsizes(self, node, disks):
834 """Returns the size of the given disks.
836 This is a single-node call.
839 params = [[cf.ToDict() for cf in disks]]
840 return self._SingleNodeCall(node, "blockdev_getsize", params)
842 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
843 """Disconnects the network of the given drbd devices.
845 This is a multi-node call.
848 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
849 [nodes_ip, [cf.ToDict() for cf in disks]])
851 def call_drbd_attach_net(self, node_list, nodes_ip,
852 disks, instance_name, multimaster):
853 """Disconnects the given drbd devices.
855 This is a multi-node call.
858 return self._MultiNodeCall(node_list, "drbd_attach_net",
859 [nodes_ip, [cf.ToDict() for cf in disks],
860 instance_name, multimaster])
862 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
863 """Waits for the synchronization of drbd devices is complete.
865 This is a multi-node call.
868 return self._MultiNodeCall(node_list, "drbd_wait_sync",
869 [nodes_ip, [cf.ToDict() for cf in disks]])
872 def call_upload_file(cls, node_list, file_name, address_list=None):
875 The node will refuse the operation in case the file is not on the
878 This is a multi-node call.
880 @type node_list: list
881 @param node_list: the list of node names to upload to
883 @param file_name: the filename to upload
884 @type address_list: list or None
885 @keyword address_list: an optional list of node addresses, in order
886 to optimize the RPC speed
889 file_contents = utils.ReadFile(file_name)
890 data = cls._Compress(file_contents)
891 st = os.stat(file_name)
892 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
893 st.st_atime, st.st_mtime]
894 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
895 address_list=address_list)
898 def call_write_ssconf_files(cls, node_list, values):
899 """Write ssconf files.
901 This is a multi-node call.
904 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
906 def call_os_diagnose(self, node_list):
907 """Request a diagnose of OS definitions.
909 This is a multi-node call.
912 return self._MultiNodeCall(node_list, "os_diagnose", [])
914 def call_os_get(self, node, name):
915 """Returns an OS definition.
917 This is a single-node call.
920 result = self._SingleNodeCall(node, "os_get", [name])
921 if not result.fail_msg and isinstance(result.data, dict):
922 result.data = objects.OS.FromDict(result.data)
925 def call_hooks_runner(self, node_list, hpath, phase, env):
926 """Call the hooks runner.
929 - op: the OpCode instance
930 - env: a dictionary with the environment
932 This is a multi-node call.
935 params = [hpath, phase, env]
936 return self._MultiNodeCall(node_list, "hooks_runner", params)
938 def call_iallocator_runner(self, node, name, idata):
939 """Call an iallocator on a remote node
942 - name: the iallocator name
943 - input: the json-encoded input string
945 This is a single-node call.
948 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
950 def call_blockdev_grow(self, node, cf_bdev, amount):
951 """Request a snapshot of the given block device.
953 This is a single-node call.
956 return self._SingleNodeCall(node, "blockdev_grow",
957 [cf_bdev.ToDict(), amount])
959 def call_blockdev_export(self, node, cf_bdev,
960 dest_node, dest_path, cluster_name):
961 """Export a given disk to another node.
963 This is a single-node call.
966 return self._SingleNodeCall(node, "blockdev_export",
967 [cf_bdev.ToDict(), dest_node, dest_path,
970 def call_blockdev_snapshot(self, node, cf_bdev):
971 """Request a snapshot of the given block device.
973 This is a single-node call.
976 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
978 def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
980 """Request the export of a given snapshot.
982 This is a single-node call.
985 return self._SingleNodeCall(node, "snapshot_export",
986 [snap_bdev.ToDict(), dest_node,
987 self._InstDict(instance), cluster_name, idx])
989 def call_finalize_export(self, node, instance, snap_disks):
990 """Request the completion of an export operation.
992 This writes the export config file, etc.
994 This is a single-node call.
998 for disk in snap_disks:
999 if isinstance(disk, bool):
1000 flat_disks.append(disk)
1002 flat_disks.append(disk.ToDict())
1004 return self._SingleNodeCall(node, "finalize_export",
1005 [self._InstDict(instance), flat_disks])
1007 def call_export_info(self, node, path):
1008 """Queries the export information in a given path.
1010 This is a single-node call.
1013 return self._SingleNodeCall(node, "export_info", [path])
1015 def call_instance_os_import(self, node, inst, src_node, src_images,
1017 """Request the import of a backup into an instance.
1019 This is a single-node call.
1022 return self._SingleNodeCall(node, "instance_os_import",
1023 [self._InstDict(inst), src_node, src_images,
1026 def call_export_list(self, node_list):
1027 """Gets the stored exports list.
1029 This is a multi-node call.
1032 return self._MultiNodeCall(node_list, "export_list", [])
1034 def call_export_remove(self, node, export):
1035 """Requests removal of a given export.
1037 This is a single-node call.
1040 return self._SingleNodeCall(node, "export_remove", [export])
1043 def call_node_leave_cluster(cls, node):
1044 """Requests a node to clean the cluster information it has.
1046 This will remove the configuration information from the ganeti data
1049 This is a single-node call.
1052 return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1054 def call_node_volumes(self, node_list):
1055 """Gets all volumes on node(s).
1057 This is a multi-node call.
1060 return self._MultiNodeCall(node_list, "node_volumes", [])
1062 def call_node_demote_from_mc(self, node):
1063 """Demote a node from the master candidate role.
1065 This is a single-node call.
1068 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1071 def call_node_powercycle(self, node, hypervisor):
1072 """Tries to powercycle a node.
1074 This is a single-node call.
1077 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1080 def call_test_delay(self, node_list, duration):
1081 """Sleep for a fixed time on given node(s).
1083 This is a multi-node call.
1086 return self._MultiNodeCall(node_list, "test_delay", [duration])
1088 def call_file_storage_dir_create(self, node, file_storage_dir):
1089 """Create the given file storage directory.
1091 This is a single-node call.
1094 return self._SingleNodeCall(node, "file_storage_dir_create",
1097 def call_file_storage_dir_remove(self, node, file_storage_dir):
1098 """Remove the given file storage directory.
1100 This is a single-node call.
1103 return self._SingleNodeCall(node, "file_storage_dir_remove",
1106 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1107 new_file_storage_dir):
1108 """Rename file storage directory.
1110 This is a single-node call.
1113 return self._SingleNodeCall(node, "file_storage_dir_rename",
1114 [old_file_storage_dir, new_file_storage_dir])
1117 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1118 """Update job queue.
1120 This is a multi-node call.
1123 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1124 [file_name, cls._Compress(content)],
1125 address_list=address_list)
1128 def call_jobqueue_purge(cls, node):
1131 This is a single-node call.
1134 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1137 def call_jobqueue_rename(cls, node_list, address_list, rename):
1138 """Rename a job queue file.
1140 This is a multi-node call.
1143 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1144 address_list=address_list)
1147 def call_jobqueue_set_drain(cls, node_list, drain_flag):
1148 """Set the drain flag on the queue.
1150 This is a multi-node call.
1152 @type node_list: list
1153 @param node_list: the list of nodes to query
1154 @type drain_flag: bool
1155 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1158 return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1161 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1162 """Validate the hypervisor params.
1164 This is a multi-node call.
1166 @type node_list: list
1167 @param node_list: the list of nodes to query
1168 @type hvname: string
1169 @param hvname: the hypervisor name
1170 @type hvparams: dict
1171 @param hvparams: the hypervisor parameters to be validated
1174 cluster = self._cfg.GetClusterInfo()
1175 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1176 return self._MultiNodeCall(node_list, "hypervisor_validate_params",