4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
45 import ganeti.http.client
48 # Module level variable
53 """Initializes the module-global HTTP client manager.
55 Must be called before using any RPC function.
60 assert not _http_manager, "RPC module initialized more than once"
62 _http_manager = http.client.HttpClientManager()
66 """Stops the module-global HTTP client manager.
68 Must be called before quitting the program.
74 _http_manager.Shutdown()
78 class RpcResult(object):
81 This class holds an RPC result. It is needed since in multi-node
82 calls we can't raise an exception just because one one out of many
83 failed, and therefore we use this class to encapsulate the result.
85 @ivar data: the data payload, for successful results, or None
87 @ivar failed: whether the operation failed at transport level (not
88 application level on the remote node)
89 @ivar call: the name of the RPC call
90 @ivar node: the name of the node to which we made the call
91 @ivar offline: whether the operation failed because the node was
92 offline, as opposed to actual failure; offline=True will always
93 imply failed=True, in order to allow simpler checking if
94 the user doesn't care about the exact failure mode
95 @ivar fail_msg: the error message if the call failed
98 def __init__(self, data=None, failed=False, offline=False,
99 call=None, node=None):
101 self.offline = offline
106 self.fail_msg = "Node is marked offline"
107 self.data = self.payload = None
109 self.fail_msg = self._EnsureErr(data)
110 self.data = self.payload = None
113 if not isinstance(self.data, (tuple, list)):
115 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
119 self.fail_msg = ("RPC layer error: invalid result length (%d), "
120 "expected 2" % len(self.data))
121 elif not self.data[0]:
123 self.fail_msg = self._EnsureErr(self.data[1])
127 self.payload = data[1]
131 """Helper to ensure we return a 'True' value for error."""
135 return "No error information"
137 def Raise(self, msg, prereq=False):
138 """If the result has failed, raise an OpExecError.
140 This is used so that LU code doesn't have to check for each
141 result, but instead can call this function.
144 if not self.fail_msg:
147 if not msg: # one could pass None for default message
148 msg = ("Call '%s' to node '%s' has failed: %s" %
149 (self.call, self.node, self.fail_msg))
151 msg = "%s: %s" % (msg, self.fail_msg)
153 ec = errors.OpPrereqError
155 ec = errors.OpExecError
158 def RemoteFailMsg(self):
159 """Check if the remote procedure failed.
161 @return: the fail_msg attribute
170 This class, given a (remote) method name, a list of parameters and a
171 list of nodes, will contact (in parallel) all nodes, and return a
172 dict of results (key: node name, value: result).
174 One current bug is that generic failure is still signaled by
175 'False' result, which is not good. This overloading of values can
179 def __init__(self, procedure, body, port):
180 self.procedure = procedure
186 http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
187 ssl_cert_path=constants.SSL_CERT_FILE)
189 def ConnectList(self, node_list, address_list=None):
190 """Add a list of nodes to the target nodes.
192 @type node_list: list
193 @param node_list: the list of node names to connect
194 @type address_list: list or None
195 @keyword address_list: either None or a list with node addresses,
196 which must have the same length as the node list
199 if address_list is None:
200 address_list = [None for _ in node_list]
202 assert len(node_list) == len(address_list), \
203 "Name and address lists should have the same length"
204 for node, address in zip(node_list, address_list):
205 self.ConnectNode(node, address)
207 def ConnectNode(self, name, address=None):
208 """Add a node to the target list.
211 @param name: the node name
213 @keyword address: the node address, if known
220 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
221 "/%s" % self.procedure,
223 ssl_params=self._ssl_params,
224 ssl_verify_peer=True)
226 def GetResults(self):
227 """Call nodes and return results.
230 @return: List of RPC results
233 assert _http_manager, "RPC module not initialized"
235 _http_manager.ExecRequests(self.nc.values())
239 for name, req in self.nc.iteritems():
240 if req.success and req.resp_status_code == http.HTTP_OK:
241 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
242 node=name, call=self.procedure)
245 # TODO: Better error reporting
251 logging.error("RPC error in %s from node %s: %s",
252 self.procedure, name, msg)
253 results[name] = RpcResult(data=msg, failed=True, node=name,
259 class RpcRunner(object):
260 """RPC runner class"""
262 def __init__(self, cfg):
263 """Initialized the rpc runner.
265 @type cfg: C{config.ConfigWriter}
266 @param cfg: the configuration object that will be used to get data
271 self.port = utils.GetDaemonPort(constants.NODED)
273 def _InstDict(self, instance, hvp=None, bep=None):
274 """Convert the given instance to a dict.
276 This is done via the instance's ToDict() method and additionally
277 we fill the hvparams with the cluster defaults.
279 @type instance: L{objects.Instance}
280 @param instance: an Instance object
281 @type hvp: dict or None
282 @param hvp: a dictionary with overridden hypervisor parameters
283 @type bep: dict or None
284 @param bep: a dictionary with overridden backend parameters
286 @return: the instance dict, with the hvparams filled with the
290 idict = instance.ToDict()
291 cluster = self._cfg.GetClusterInfo()
292 idict["hvparams"] = cluster.FillHV(instance)
294 idict["hvparams"].update(hvp)
295 idict["beparams"] = cluster.FillBE(instance)
297 idict["beparams"].update(bep)
298 for nic in idict["nics"]:
299 nic['nicparams'] = objects.FillDict(
300 cluster.nicparams[constants.PP_DEFAULT],
304 def _ConnectList(self, client, node_list, call):
305 """Helper for computing node addresses.
307 @type client: L{ganeti.rpc.Client}
308 @param client: a C{Client} instance
309 @type node_list: list
310 @param node_list: the node list we should connect
312 @param call: the name of the remote procedure call, for filling in
313 correctly any eventual offline nodes' results
316 all_nodes = self._cfg.GetAllNodesInfo()
320 for node in node_list:
321 if node in all_nodes:
322 if all_nodes[node].offline:
323 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
325 val = all_nodes[node].primary_ip
328 addr_list.append(val)
329 name_list.append(node)
331 client.ConnectList(name_list, address_list=addr_list)
334 def _ConnectNode(self, client, node, call):
335 """Helper for computing one node's address.
337 @type client: L{ganeti.rpc.Client}
338 @param client: a C{Client} instance
340 @param node: the node we should connect
342 @param call: the name of the remote procedure call, for filling in
343 correctly any eventual offline nodes' results
346 node_info = self._cfg.GetNodeInfo(node)
347 if node_info is not None:
348 if node_info.offline:
349 return RpcResult(node=node, offline=True, call=call)
350 addr = node_info.primary_ip
353 client.ConnectNode(node, address=addr)
355 def _MultiNodeCall(self, node_list, procedure, args):
356 """Helper for making a multi-node call
359 body = serializer.DumpJson(args, indent=False)
360 c = Client(procedure, body, self.port)
361 skip_dict = self._ConnectList(c, node_list, procedure)
362 skip_dict.update(c.GetResults())
366 def _StaticMultiNodeCall(cls, node_list, procedure, args,
368 """Helper for making a multi-node static call
371 body = serializer.DumpJson(args, indent=False)
372 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
373 c.ConnectList(node_list, address_list=address_list)
374 return c.GetResults()
376 def _SingleNodeCall(self, node, procedure, args):
377 """Helper for making a single-node call
380 body = serializer.DumpJson(args, indent=False)
381 c = Client(procedure, body, self.port)
382 result = self._ConnectNode(c, node, procedure)
384 # we did connect, node is not offline
385 result = c.GetResults()[node]
389 def _StaticSingleNodeCall(cls, node, procedure, args):
390 """Helper for making a single-node static call
393 body = serializer.DumpJson(args, indent=False)
394 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
396 return c.GetResults()[node]
400 """Compresses a string for transport over RPC.
402 Small amounts of data are not compressed.
407 @return: Encoded data to send
410 # Small amounts of data are not compressed
412 return (constants.RPC_ENCODING_NONE, data)
414 # Compress with zlib and encode in base64
415 return (constants.RPC_ENCODING_ZLIB_BASE64,
416 base64.b64encode(zlib.compress(data, 3)))
422 def call_lv_list(self, node_list, vg_name):
423 """Gets the logical volumes present in a given volume group.
425 This is a multi-node call.
428 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
430 def call_vg_list(self, node_list):
431 """Gets the volume group list.
433 This is a multi-node call.
436 return self._MultiNodeCall(node_list, "vg_list", [])
438 def call_storage_list(self, node_list, su_name, su_args, name, fields):
439 """Get list of storage units.
441 This is a multi-node call.
444 return self._MultiNodeCall(node_list, "storage_list",
445 [su_name, su_args, name, fields])
447 def call_storage_modify(self, node, su_name, su_args, name, changes):
448 """Modify a storage unit.
450 This is a single-node call.
453 return self._SingleNodeCall(node, "storage_modify",
454 [su_name, su_args, name, changes])
456 def call_storage_execute(self, node, su_name, su_args, name, op):
457 """Executes an operation on a storage unit.
459 This is a single-node call.
462 return self._SingleNodeCall(node, "storage_execute",
463 [su_name, su_args, name, op])
465 def call_bridges_exist(self, node, bridges_list):
466 """Checks if a node has all the bridges given.
468 This method checks if all bridges given in the bridges_list are
469 present on the remote node, so that an instance that uses interfaces
470 on those bridges can be started.
472 This is a single-node call.
475 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
477 def call_instance_start(self, node, instance, hvp, bep):
478 """Starts an instance.
480 This is a single-node call.
483 idict = self._InstDict(instance, hvp=hvp, bep=bep)
484 return self._SingleNodeCall(node, "instance_start", [idict])
486 def call_instance_shutdown(self, node, instance):
487 """Stops an instance.
489 This is a single-node call.
492 return self._SingleNodeCall(node, "instance_shutdown",
493 [self._InstDict(instance)])
495 def call_migration_info(self, node, instance):
496 """Gather the information necessary to prepare an instance migration.
498 This is a single-node call.
501 @param node: the node on which the instance is currently running
502 @type instance: C{objects.Instance}
503 @param instance: the instance definition
506 return self._SingleNodeCall(node, "migration_info",
507 [self._InstDict(instance)])
509 def call_accept_instance(self, node, instance, info, target):
510 """Prepare a node to accept an instance.
512 This is a single-node call.
515 @param node: the target node for the migration
516 @type instance: C{objects.Instance}
517 @param instance: the instance definition
518 @type info: opaque/hypervisor specific (string/data)
519 @param info: result for the call_migration_info call
521 @param target: target hostname (usually ip address) (on the node itself)
524 return self._SingleNodeCall(node, "accept_instance",
525 [self._InstDict(instance), info, target])
527 def call_finalize_migration(self, node, instance, info, success):
528 """Finalize any target-node migration specific operation.
530 This is called both in case of a successful migration and in case of error
531 (in which case it should abort the migration).
533 This is a single-node call.
536 @param node: the target node for the migration
537 @type instance: C{objects.Instance}
538 @param instance: the instance definition
539 @type info: opaque/hypervisor specific (string/data)
540 @param info: result for the call_migration_info call
541 @type success: boolean
542 @param success: whether the migration was a success or a failure
545 return self._SingleNodeCall(node, "finalize_migration",
546 [self._InstDict(instance), info, success])
548 def call_instance_migrate(self, node, instance, target, live):
549 """Migrate an instance.
551 This is a single-node call.
554 @param node: the node on which the instance is currently running
555 @type instance: C{objects.Instance}
556 @param instance: the instance definition
558 @param target: the target node name
560 @param live: whether the migration should be done live or not (the
561 interpretation of this parameter is left to the hypervisor)
564 return self._SingleNodeCall(node, "instance_migrate",
565 [self._InstDict(instance), target, live])
567 def call_instance_reboot(self, node, instance, reboot_type):
568 """Reboots an instance.
570 This is a single-node call.
573 return self._SingleNodeCall(node, "instance_reboot",
574 [self._InstDict(instance), reboot_type])
576 def call_instance_os_add(self, node, inst, reinstall):
577 """Installs an OS on the given instance.
579 This is a single-node call.
582 return self._SingleNodeCall(node, "instance_os_add",
583 [self._InstDict(inst), reinstall])
585 def call_instance_run_rename(self, node, inst, old_name):
586 """Run the OS rename script for an instance.
588 This is a single-node call.
591 return self._SingleNodeCall(node, "instance_run_rename",
592 [self._InstDict(inst), old_name])
594 def call_instance_info(self, node, instance, hname):
595 """Returns information about a single instance.
597 This is a single-node call.
600 @param node: the list of nodes to query
601 @type instance: string
602 @param instance: the instance name
604 @param hname: the hypervisor type of the instance
607 return self._SingleNodeCall(node, "instance_info", [instance, hname])
609 def call_instance_migratable(self, node, instance):
610 """Checks whether the given instance can be migrated.
612 This is a single-node call.
614 @param node: the node to query
615 @type instance: L{objects.Instance}
616 @param instance: the instance to check
620 return self._SingleNodeCall(node, "instance_migratable",
621 [self._InstDict(instance)])
623 def call_all_instances_info(self, node_list, hypervisor_list):
624 """Returns information about all instances on the given nodes.
626 This is a multi-node call.
628 @type node_list: list
629 @param node_list: the list of nodes to query
630 @type hypervisor_list: list
631 @param hypervisor_list: the hypervisors to query for instances
634 return self._MultiNodeCall(node_list, "all_instances_info",
637 def call_instance_list(self, node_list, hypervisor_list):
638 """Returns the list of running instances on a given node.
640 This is a multi-node call.
642 @type node_list: list
643 @param node_list: the list of nodes to query
644 @type hypervisor_list: list
645 @param hypervisor_list: the hypervisors to query for instances
648 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
650 def call_node_tcp_ping(self, node, source, target, port, timeout,
652 """Do a TcpPing on the remote node
654 This is a single-node call.
657 return self._SingleNodeCall(node, "node_tcp_ping",
658 [source, target, port, timeout,
661 def call_node_has_ip_address(self, node, address):
662 """Checks if a node has the given IP address.
664 This is a single-node call.
667 return self._SingleNodeCall(node, "node_has_ip_address", [address])
669 def call_node_info(self, node_list, vg_name, hypervisor_type):
670 """Return node information.
672 This will return memory information and volume group size and free
675 This is a multi-node call.
677 @type node_list: list
678 @param node_list: the list of nodes to query
679 @type vg_name: C{string}
680 @param vg_name: the name of the volume group to ask for disk space
682 @type hypervisor_type: C{str}
683 @param hypervisor_type: the name of the hypervisor to ask for
687 return self._MultiNodeCall(node_list, "node_info",
688 [vg_name, hypervisor_type])
690 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
691 """Add a node to the cluster.
693 This is a single-node call.
696 return self._SingleNodeCall(node, "node_add",
697 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
699 def call_node_verify(self, node_list, checkdict, cluster_name):
700 """Request verification of given parameters.
702 This is a multi-node call.
705 return self._MultiNodeCall(node_list, "node_verify",
706 [checkdict, cluster_name])
709 def call_node_start_master(cls, node, start_daemons, no_voting):
710 """Tells a node to activate itself as a master.
712 This is a single-node call.
715 return cls._StaticSingleNodeCall(node, "node_start_master",
716 [start_daemons, no_voting])
719 def call_node_stop_master(cls, node, stop_daemons):
720 """Tells a node to demote itself from master status.
722 This is a single-node call.
725 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
728 def call_master_info(cls, node_list):
729 """Query master info.
731 This is a multi-node call.
734 # TODO: should this method query down nodes?
735 return cls._StaticMultiNodeCall(node_list, "master_info", [])
737 def call_version(self, node_list):
738 """Query node version.
740 This is a multi-node call.
743 return self._MultiNodeCall(node_list, "version", [])
745 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
746 """Request creation of a given block device.
748 This is a single-node call.
751 return self._SingleNodeCall(node, "blockdev_create",
752 [bdev.ToDict(), size, owner, on_primary, info])
754 def call_blockdev_remove(self, node, bdev):
755 """Request removal of a given block device.
757 This is a single-node call.
760 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
762 def call_blockdev_rename(self, node, devlist):
763 """Request rename of the given block devices.
765 This is a single-node call.
768 return self._SingleNodeCall(node, "blockdev_rename",
769 [(d.ToDict(), uid) for d, uid in devlist])
771 def call_blockdev_assemble(self, node, disk, owner, on_primary):
772 """Request assembling of a given block device.
774 This is a single-node call.
777 return self._SingleNodeCall(node, "blockdev_assemble",
778 [disk.ToDict(), owner, on_primary])
780 def call_blockdev_shutdown(self, node, disk):
781 """Request shutdown of a given block device.
783 This is a single-node call.
786 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
788 def call_blockdev_addchildren(self, node, bdev, ndevs):
789 """Request adding a list of children to a (mirroring) device.
791 This is a single-node call.
794 return self._SingleNodeCall(node, "blockdev_addchildren",
796 [disk.ToDict() for disk in ndevs]])
798 def call_blockdev_removechildren(self, node, bdev, ndevs):
799 """Request removing a list of children from a (mirroring) device.
801 This is a single-node call.
804 return self._SingleNodeCall(node, "blockdev_removechildren",
806 [disk.ToDict() for disk in ndevs]])
808 def call_blockdev_getmirrorstatus(self, node, disks):
809 """Request status of a (mirroring) device.
811 This is a single-node call.
814 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
815 [dsk.ToDict() for dsk in disks])
816 if not (result.failed or result.fail_msg):
817 result.payload = [objects.BlockDevStatus.FromDict(i)
818 for i in result.payload]
821 def call_blockdev_find(self, node, disk):
822 """Request identification of a given block device.
824 This is a single-node call.
827 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
828 if not result.failed and result.payload is not None:
829 result.payload = objects.BlockDevStatus.FromDict(result.payload)
832 def call_blockdev_close(self, node, instance_name, disks):
833 """Closes the given block devices.
835 This is a single-node call.
838 params = [instance_name, [cf.ToDict() for cf in disks]]
839 return self._SingleNodeCall(node, "blockdev_close", params)
841 def call_blockdev_getsizes(self, node, disks):
842 """Returns the size of the given disks.
844 This is a single-node call.
847 params = [[cf.ToDict() for cf in disks]]
848 return self._SingleNodeCall(node, "blockdev_getsize", params)
850 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
851 """Disconnects the network of the given drbd devices.
853 This is a multi-node call.
856 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
857 [nodes_ip, [cf.ToDict() for cf in disks]])
859 def call_drbd_attach_net(self, node_list, nodes_ip,
860 disks, instance_name, multimaster):
861 """Disconnects the given drbd devices.
863 This is a multi-node call.
866 return self._MultiNodeCall(node_list, "drbd_attach_net",
867 [nodes_ip, [cf.ToDict() for cf in disks],
868 instance_name, multimaster])
870 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
871 """Waits for the synchronization of drbd devices is complete.
873 This is a multi-node call.
876 return self._MultiNodeCall(node_list, "drbd_wait_sync",
877 [nodes_ip, [cf.ToDict() for cf in disks]])
880 def call_upload_file(cls, node_list, file_name, address_list=None):
883 The node will refuse the operation in case the file is not on the
886 This is a multi-node call.
888 @type node_list: list
889 @param node_list: the list of node names to upload to
891 @param file_name: the filename to upload
892 @type address_list: list or None
893 @keyword address_list: an optional list of node addresses, in order
894 to optimize the RPC speed
897 file_contents = utils.ReadFile(file_name)
898 data = cls._Compress(file_contents)
899 st = os.stat(file_name)
900 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
901 st.st_atime, st.st_mtime]
902 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
903 address_list=address_list)
906 def call_write_ssconf_files(cls, node_list, values):
907 """Write ssconf files.
909 This is a multi-node call.
912 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
914 def call_os_diagnose(self, node_list):
915 """Request a diagnose of OS definitions.
917 This is a multi-node call.
920 return self._MultiNodeCall(node_list, "os_diagnose", [])
922 def call_os_get(self, node, name):
923 """Returns an OS definition.
925 This is a single-node call.
928 result = self._SingleNodeCall(node, "os_get", [name])
929 if not result.failed and isinstance(result.data, dict):
930 result.data = objects.OS.FromDict(result.data)
933 def call_hooks_runner(self, node_list, hpath, phase, env):
934 """Call the hooks runner.
937 - op: the OpCode instance
938 - env: a dictionary with the environment
940 This is a multi-node call.
943 params = [hpath, phase, env]
944 return self._MultiNodeCall(node_list, "hooks_runner", params)
946 def call_iallocator_runner(self, node, name, idata):
947 """Call an iallocator on a remote node
950 - name: the iallocator name
951 - input: the json-encoded input string
953 This is a single-node call.
956 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
958 def call_blockdev_grow(self, node, cf_bdev, amount):
959 """Request a snapshot of the given block device.
961 This is a single-node call.
964 return self._SingleNodeCall(node, "blockdev_grow",
965 [cf_bdev.ToDict(), amount])
967 def call_blockdev_export(self, node, cf_bdev,
968 dest_node, dest_path, cluster_name):
969 """Export a given disk to another node.
971 This is a single-node call.
974 return self._SingleNodeCall(node, "blockdev_export",
975 [cf_bdev.ToDict(), dest_node, dest_path,
978 def call_blockdev_snapshot(self, node, cf_bdev):
979 """Request a snapshot of the given block device.
981 This is a single-node call.
984 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
986 def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
988 """Request the export of a given snapshot.
990 This is a single-node call.
993 return self._SingleNodeCall(node, "snapshot_export",
994 [snap_bdev.ToDict(), dest_node,
995 self._InstDict(instance), cluster_name, idx])
997 def call_finalize_export(self, node, instance, snap_disks):
998 """Request the completion of an export operation.
1000 This writes the export config file, etc.
1002 This is a single-node call.
1006 for disk in snap_disks:
1007 if isinstance(disk, bool):
1008 flat_disks.append(disk)
1010 flat_disks.append(disk.ToDict())
1012 return self._SingleNodeCall(node, "finalize_export",
1013 [self._InstDict(instance), flat_disks])
1015 def call_export_info(self, node, path):
1016 """Queries the export information in a given path.
1018 This is a single-node call.
1021 return self._SingleNodeCall(node, "export_info", [path])
1023 def call_instance_os_import(self, node, inst, src_node, src_images,
1025 """Request the import of a backup into an instance.
1027 This is a single-node call.
1030 return self._SingleNodeCall(node, "instance_os_import",
1031 [self._InstDict(inst), src_node, src_images,
1034 def call_export_list(self, node_list):
1035 """Gets the stored exports list.
1037 This is a multi-node call.
1040 return self._MultiNodeCall(node_list, "export_list", [])
1042 def call_export_remove(self, node, export):
1043 """Requests removal of a given export.
1045 This is a single-node call.
1048 return self._SingleNodeCall(node, "export_remove", [export])
1051 def call_node_leave_cluster(cls, node):
1052 """Requests a node to clean the cluster information it has.
1054 This will remove the configuration information from the ganeti data
1057 This is a single-node call.
1060 return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1062 def call_node_volumes(self, node_list):
1063 """Gets all volumes on node(s).
1065 This is a multi-node call.
1068 return self._MultiNodeCall(node_list, "node_volumes", [])
1070 def call_node_demote_from_mc(self, node):
1071 """Demote a node from the master candidate role.
1073 This is a single-node call.
1076 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1079 def call_node_powercycle(self, node, hypervisor):
1080 """Tries to powercycle a node.
1082 This is a single-node call.
1085 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1088 def call_test_delay(self, node_list, duration):
1089 """Sleep for a fixed time on given node(s).
1091 This is a multi-node call.
1094 return self._MultiNodeCall(node_list, "test_delay", [duration])
1096 def call_file_storage_dir_create(self, node, file_storage_dir):
1097 """Create the given file storage directory.
1099 This is a single-node call.
1102 return self._SingleNodeCall(node, "file_storage_dir_create",
1105 def call_file_storage_dir_remove(self, node, file_storage_dir):
1106 """Remove the given file storage directory.
1108 This is a single-node call.
1111 return self._SingleNodeCall(node, "file_storage_dir_remove",
1114 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1115 new_file_storage_dir):
1116 """Rename file storage directory.
1118 This is a single-node call.
1121 return self._SingleNodeCall(node, "file_storage_dir_rename",
1122 [old_file_storage_dir, new_file_storage_dir])
1125 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1126 """Update job queue.
1128 This is a multi-node call.
1131 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1132 [file_name, cls._Compress(content)],
1133 address_list=address_list)
1136 def call_jobqueue_purge(cls, node):
1139 This is a single-node call.
1142 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1145 def call_jobqueue_rename(cls, node_list, address_list, rename):
1146 """Rename a job queue file.
1148 This is a multi-node call.
1151 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1152 address_list=address_list)
1155 def call_jobqueue_set_drain(cls, node_list, drain_flag):
1156 """Set the drain flag on the queue.
1158 This is a multi-node call.
1160 @type node_list: list
1161 @param node_list: the list of nodes to query
1162 @type drain_flag: bool
1163 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1166 return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1169 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1170 """Validate the hypervisor params.
1172 This is a multi-node call.
1174 @type node_list: list
1175 @param node_list: the list of nodes to query
1176 @type hvname: string
1177 @param hvname: the hypervisor name
1178 @type hvparams: dict
1179 @param hvparams: the hypervisor parameters to be validated
1182 cluster = self._cfg.GetClusterInfo()
1183 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1184 return self._MultiNodeCall(node_list, "hypervisor_validate_params",