4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
45 # pylint has a bug here, doesn't see this import
46 import ganeti.http.client # pylint: disable-msg=W0611
49 # Module level variable
54 """Initializes the module-global HTTP client manager.
56 Must be called before using any RPC function.
59 global _http_manager # pylint: disable-msg=W0603
61 assert not _http_manager, "RPC module initialized more than once"
65 _http_manager = http.client.HttpClientManager()
69 """Stops the module-global HTTP client manager.
71 Must be called before quitting the program.
74 global _http_manager # pylint: disable-msg=W0603
77 _http_manager.Shutdown()
81 class RpcResult(object):
84 This class holds an RPC result. It is needed since in multi-node
85 calls we can't raise an exception just because one one out of many
86 failed, and therefore we use this class to encapsulate the result.
88 @ivar data: the data payload, for successful results, or None
89 @ivar call: the name of the RPC call
90 @ivar node: the name of the node to which we made the call
91 @ivar offline: whether the operation failed because the node was
92 offline, as opposed to actual failure; offline=True will always
93 imply failed=True, in order to allow simpler checking if
94 the user doesn't care about the exact failure mode
95 @ivar fail_msg: the error message if the call failed
98 def __init__(self, data=None, failed=False, offline=False,
99 call=None, node=None):
100 self.offline = offline
105 self.fail_msg = "Node is marked offline"
106 self.data = self.payload = None
108 self.fail_msg = self._EnsureErr(data)
109 self.data = self.payload = None
112 if not isinstance(self.data, (tuple, list)):
113 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
117 self.fail_msg = ("RPC layer error: invalid result length (%d), "
118 "expected 2" % len(self.data))
120 elif not self.data[0]:
121 self.fail_msg = self._EnsureErr(self.data[1])
126 self.payload = data[1]
128 assert hasattr(self, "call")
129 assert hasattr(self, "data")
130 assert hasattr(self, "fail_msg")
131 assert hasattr(self, "node")
132 assert hasattr(self, "offline")
133 assert hasattr(self, "payload")
137 """Helper to ensure we return a 'True' value for error."""
141 return "No error information"
143 def Raise(self, msg, prereq=False, ecode=None):
144 """If the result has failed, raise an OpExecError.
146 This is used so that LU code doesn't have to check for each
147 result, but instead can call this function.
150 if not self.fail_msg:
153 if not msg: # one could pass None for default message
154 msg = ("Call '%s' to node '%s' has failed: %s" %
155 (self.call, self.node, self.fail_msg))
157 msg = "%s: %s" % (msg, self.fail_msg)
159 ec = errors.OpPrereqError
161 ec = errors.OpExecError
162 if ecode is not None:
166 raise ec(*args) # pylint: disable-msg=W0142
172 This class, given a (remote) method name, a list of parameters and a
173 list of nodes, will contact (in parallel) all nodes, and return a
174 dict of results (key: node name, value: result).
176 One current bug is that generic failure is still signaled by
177 'False' result, which is not good. This overloading of values can
181 def __init__(self, procedure, body, port):
182 self.procedure = procedure
188 http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
189 ssl_cert_path=constants.NODED_CERT_FILE)
191 def ConnectList(self, node_list, address_list=None, read_timeout=None):
192 """Add a list of nodes to the target nodes.
194 @type node_list: list
195 @param node_list: the list of node names to connect
196 @type address_list: list or None
197 @keyword address_list: either None or a list with node addresses,
198 which must have the same length as the node list
199 @type read_timeout: int
200 @param read_timeout: overwrites the default read timeout for the
204 if address_list is None:
205 address_list = [None for _ in node_list]
207 assert len(node_list) == len(address_list), \
208 "Name and address lists should have the same length"
209 for node, address in zip(node_list, address_list):
210 self.ConnectNode(node, address, read_timeout=read_timeout)
212 def ConnectNode(self, name, address=None, read_timeout=None):
213 """Add a node to the target list.
216 @param name: the node name
218 @keyword address: the node address, if known
225 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
226 "/%s" % self.procedure,
228 ssl_params=self._ssl_params,
229 ssl_verify_peer=True,
230 read_timeout=read_timeout)
232 def GetResults(self):
233 """Call nodes and return results.
236 @return: List of RPC results
239 assert _http_manager, "RPC module not initialized"
241 _http_manager.ExecRequests(self.nc.values())
245 for name, req in self.nc.iteritems():
246 if req.success and req.resp_status_code == http.HTTP_OK:
247 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
248 node=name, call=self.procedure)
251 # TODO: Better error reporting
257 logging.error("RPC error in %s from node %s: %s",
258 self.procedure, name, msg)
259 results[name] = RpcResult(data=msg, failed=True, node=name,
265 def _EncodeImportExportIO(ieio, ieioargs):
266 """Encodes import/export I/O information.
269 if ieio == constants.IEIO_RAW_DISK:
270 assert len(ieioargs) == 1
271 return (ieioargs[0].ToDict(), )
273 if ieio == constants.IEIO_SCRIPT:
274 assert len(ieioargs) == 2
275 return (ieioargs[0].ToDict(), ieioargs[1])
280 class RpcRunner(object):
281 """RPC runner class"""
283 def __init__(self, cfg):
284 """Initialized the rpc runner.
286 @type cfg: C{config.ConfigWriter}
287 @param cfg: the configuration object that will be used to get data
292 self.port = utils.GetDaemonPort(constants.NODED)
294 def _InstDict(self, instance, hvp=None, bep=None):
295 """Convert the given instance to a dict.
297 This is done via the instance's ToDict() method and additionally
298 we fill the hvparams with the cluster defaults.
300 @type instance: L{objects.Instance}
301 @param instance: an Instance object
302 @type hvp: dict or None
303 @param hvp: a dictionary with overridden hypervisor parameters
304 @type bep: dict or None
305 @param bep: a dictionary with overridden backend parameters
307 @return: the instance dict, with the hvparams filled with the
311 idict = instance.ToDict()
312 cluster = self._cfg.GetClusterInfo()
313 idict["hvparams"] = cluster.FillHV(instance)
315 idict["hvparams"].update(hvp)
316 idict["beparams"] = cluster.FillBE(instance)
318 idict["beparams"].update(bep)
319 for nic in idict["nics"]:
320 nic['nicparams'] = objects.FillDict(
321 cluster.nicparams[constants.PP_DEFAULT],
325 def _ConnectList(self, client, node_list, call, read_timeout=None):
326 """Helper for computing node addresses.
328 @type client: L{ganeti.rpc.Client}
329 @param client: a C{Client} instance
330 @type node_list: list
331 @param node_list: the node list we should connect
333 @param call: the name of the remote procedure call, for filling in
334 correctly any eventual offline nodes' results
335 @type read_timeout: int
336 @param read_timeout: overwrites the default read timeout for the
340 all_nodes = self._cfg.GetAllNodesInfo()
344 for node in node_list:
345 if node in all_nodes:
346 if all_nodes[node].offline:
347 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
349 val = all_nodes[node].primary_ip
352 addr_list.append(val)
353 name_list.append(node)
355 client.ConnectList(name_list, address_list=addr_list,
356 read_timeout=read_timeout)
359 def _ConnectNode(self, client, node, call, read_timeout=None):
360 """Helper for computing one node's address.
362 @type client: L{ganeti.rpc.Client}
363 @param client: a C{Client} instance
365 @param node: the node we should connect
367 @param call: the name of the remote procedure call, for filling in
368 correctly any eventual offline nodes' results
369 @type read_timeout: int
370 @param read_timeout: overwrites the default read timeout for the
374 node_info = self._cfg.GetNodeInfo(node)
375 if node_info is not None:
376 if node_info.offline:
377 return RpcResult(node=node, offline=True, call=call)
378 addr = node_info.primary_ip
381 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
383 def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
384 """Helper for making a multi-node call
387 body = serializer.DumpJson(args, indent=False)
388 c = Client(procedure, body, self.port)
389 skip_dict = self._ConnectList(c, node_list, procedure,
390 read_timeout=read_timeout)
391 skip_dict.update(c.GetResults())
395 def _StaticMultiNodeCall(cls, node_list, procedure, args,
396 address_list=None, read_timeout=None):
397 """Helper for making a multi-node static call
400 body = serializer.DumpJson(args, indent=False)
401 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
402 c.ConnectList(node_list, address_list=address_list,
403 read_timeout=read_timeout)
404 return c.GetResults()
406 def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
407 """Helper for making a single-node call
410 body = serializer.DumpJson(args, indent=False)
411 c = Client(procedure, body, self.port)
412 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
414 # we did connect, node is not offline
415 result = c.GetResults()[node]
419 def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
420 """Helper for making a single-node static call
423 body = serializer.DumpJson(args, indent=False)
424 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
425 c.ConnectNode(node, read_timeout=read_timeout)
426 return c.GetResults()[node]
430 """Compresses a string for transport over RPC.
432 Small amounts of data are not compressed.
437 @return: Encoded data to send
440 # Small amounts of data are not compressed
442 return (constants.RPC_ENCODING_NONE, data)
444 # Compress with zlib and encode in base64
445 return (constants.RPC_ENCODING_ZLIB_BASE64,
446 base64.b64encode(zlib.compress(data, 3)))
452 def call_lv_list(self, node_list, vg_name):
453 """Gets the logical volumes present in a given volume group.
455 This is a multi-node call.
458 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
460 def call_vg_list(self, node_list):
461 """Gets the volume group list.
463 This is a multi-node call.
466 return self._MultiNodeCall(node_list, "vg_list", [])
468 def call_storage_list(self, node_list, su_name, su_args, name, fields):
469 """Get list of storage units.
471 This is a multi-node call.
474 return self._MultiNodeCall(node_list, "storage_list",
475 [su_name, su_args, name, fields])
477 def call_storage_modify(self, node, su_name, su_args, name, changes):
478 """Modify a storage unit.
480 This is a single-node call.
483 return self._SingleNodeCall(node, "storage_modify",
484 [su_name, su_args, name, changes])
486 def call_storage_execute(self, node, su_name, su_args, name, op):
487 """Executes an operation on a storage unit.
489 This is a single-node call.
492 return self._SingleNodeCall(node, "storage_execute",
493 [su_name, su_args, name, op])
495 def call_bridges_exist(self, node, bridges_list):
496 """Checks if a node has all the bridges given.
498 This method checks if all bridges given in the bridges_list are
499 present on the remote node, so that an instance that uses interfaces
500 on those bridges can be started.
502 This is a single-node call.
505 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
507 def call_instance_start(self, node, instance, hvp, bep):
508 """Starts an instance.
510 This is a single-node call.
513 idict = self._InstDict(instance, hvp=hvp, bep=bep)
514 return self._SingleNodeCall(node, "instance_start", [idict])
516 def call_instance_shutdown(self, node, instance, timeout):
517 """Stops an instance.
519 This is a single-node call.
522 return self._SingleNodeCall(node, "instance_shutdown",
523 [self._InstDict(instance), timeout])
525 def call_migration_info(self, node, instance):
526 """Gather the information necessary to prepare an instance migration.
528 This is a single-node call.
531 @param node: the node on which the instance is currently running
532 @type instance: C{objects.Instance}
533 @param instance: the instance definition
536 return self._SingleNodeCall(node, "migration_info",
537 [self._InstDict(instance)])
539 def call_accept_instance(self, node, instance, info, target):
540 """Prepare a node to accept an instance.
542 This is a single-node call.
545 @param node: the target node for the migration
546 @type instance: C{objects.Instance}
547 @param instance: the instance definition
548 @type info: opaque/hypervisor specific (string/data)
549 @param info: result for the call_migration_info call
551 @param target: target hostname (usually ip address) (on the node itself)
554 return self._SingleNodeCall(node, "accept_instance",
555 [self._InstDict(instance), info, target])
557 def call_finalize_migration(self, node, instance, info, success):
558 """Finalize any target-node migration specific operation.
560 This is called both in case of a successful migration and in case of error
561 (in which case it should abort the migration).
563 This is a single-node call.
566 @param node: the target node for the migration
567 @type instance: C{objects.Instance}
568 @param instance: the instance definition
569 @type info: opaque/hypervisor specific (string/data)
570 @param info: result for the call_migration_info call
571 @type success: boolean
572 @param success: whether the migration was a success or a failure
575 return self._SingleNodeCall(node, "finalize_migration",
576 [self._InstDict(instance), info, success])
578 def call_instance_migrate(self, node, instance, target, live):
579 """Migrate an instance.
581 This is a single-node call.
584 @param node: the node on which the instance is currently running
585 @type instance: C{objects.Instance}
586 @param instance: the instance definition
588 @param target: the target node name
590 @param live: whether the migration should be done live or not (the
591 interpretation of this parameter is left to the hypervisor)
594 return self._SingleNodeCall(node, "instance_migrate",
595 [self._InstDict(instance), target, live])
597 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
598 """Reboots an instance.
600 This is a single-node call.
603 return self._SingleNodeCall(node, "instance_reboot",
604 [self._InstDict(inst), reboot_type,
607 def call_instance_os_add(self, node, inst, reinstall, debug):
608 """Installs an OS on the given instance.
610 This is a single-node call.
613 return self._SingleNodeCall(node, "instance_os_add",
614 [self._InstDict(inst), reinstall, debug])
616 def call_instance_run_rename(self, node, inst, old_name, debug):
617 """Run the OS rename script for an instance.
619 This is a single-node call.
622 return self._SingleNodeCall(node, "instance_run_rename",
623 [self._InstDict(inst), old_name, debug])
625 def call_instance_info(self, node, instance, hname):
626 """Returns information about a single instance.
628 This is a single-node call.
631 @param node: the list of nodes to query
632 @type instance: string
633 @param instance: the instance name
635 @param hname: the hypervisor type of the instance
638 return self._SingleNodeCall(node, "instance_info", [instance, hname])
640 def call_instance_migratable(self, node, instance):
641 """Checks whether the given instance can be migrated.
643 This is a single-node call.
645 @param node: the node to query
646 @type instance: L{objects.Instance}
647 @param instance: the instance to check
651 return self._SingleNodeCall(node, "instance_migratable",
652 [self._InstDict(instance)])
654 def call_all_instances_info(self, node_list, hypervisor_list):
655 """Returns information about all instances on the given nodes.
657 This is a multi-node call.
659 @type node_list: list
660 @param node_list: the list of nodes to query
661 @type hypervisor_list: list
662 @param hypervisor_list: the hypervisors to query for instances
665 return self._MultiNodeCall(node_list, "all_instances_info",
668 def call_instance_list(self, node_list, hypervisor_list):
669 """Returns the list of running instances on a given node.
671 This is a multi-node call.
673 @type node_list: list
674 @param node_list: the list of nodes to query
675 @type hypervisor_list: list
676 @param hypervisor_list: the hypervisors to query for instances
679 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
681 def call_node_tcp_ping(self, node, source, target, port, timeout,
683 """Do a TcpPing on the remote node
685 This is a single-node call.
688 return self._SingleNodeCall(node, "node_tcp_ping",
689 [source, target, port, timeout,
692 def call_node_has_ip_address(self, node, address):
693 """Checks if a node has the given IP address.
695 This is a single-node call.
698 return self._SingleNodeCall(node, "node_has_ip_address", [address])
700 def call_node_info(self, node_list, vg_name, hypervisor_type):
701 """Return node information.
703 This will return memory information and volume group size and free
706 This is a multi-node call.
708 @type node_list: list
709 @param node_list: the list of nodes to query
710 @type vg_name: C{string}
711 @param vg_name: the name of the volume group to ask for disk space
713 @type hypervisor_type: C{str}
714 @param hypervisor_type: the name of the hypervisor to ask for
718 return self._MultiNodeCall(node_list, "node_info",
719 [vg_name, hypervisor_type])
721 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
722 """Add a node to the cluster.
724 This is a single-node call.
727 return self._SingleNodeCall(node, "node_add",
728 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
730 def call_node_verify(self, node_list, checkdict, cluster_name):
731 """Request verification of given parameters.
733 This is a multi-node call.
736 return self._MultiNodeCall(node_list, "node_verify",
737 [checkdict, cluster_name])
740 def call_node_start_master(cls, node, start_daemons, no_voting):
741 """Tells a node to activate itself as a master.
743 This is a single-node call.
746 return cls._StaticSingleNodeCall(node, "node_start_master",
747 [start_daemons, no_voting])
750 def call_node_stop_master(cls, node, stop_daemons):
751 """Tells a node to demote itself from master status.
753 This is a single-node call.
756 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
759 def call_master_info(cls, node_list):
760 """Query master info.
762 This is a multi-node call.
765 # TODO: should this method query down nodes?
766 return cls._StaticMultiNodeCall(node_list, "master_info", [])
769 def call_version(cls, node_list):
770 """Query node version.
772 This is a multi-node call.
775 return cls._StaticMultiNodeCall(node_list, "version", [])
777 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
778 """Request creation of a given block device.
780 This is a single-node call.
783 return self._SingleNodeCall(node, "blockdev_create",
784 [bdev.ToDict(), size, owner, on_primary, info])
786 def call_blockdev_remove(self, node, bdev):
787 """Request removal of a given block device.
789 This is a single-node call.
792 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
794 def call_blockdev_rename(self, node, devlist):
795 """Request rename of the given block devices.
797 This is a single-node call.
800 return self._SingleNodeCall(node, "blockdev_rename",
801 [(d.ToDict(), uid) for d, uid in devlist])
803 def call_blockdev_assemble(self, node, disk, owner, on_primary):
804 """Request assembling of a given block device.
806 This is a single-node call.
809 return self._SingleNodeCall(node, "blockdev_assemble",
810 [disk.ToDict(), owner, on_primary])
812 def call_blockdev_shutdown(self, node, disk):
813 """Request shutdown of a given block device.
815 This is a single-node call.
818 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
820 def call_blockdev_addchildren(self, node, bdev, ndevs):
821 """Request adding a list of children to a (mirroring) device.
823 This is a single-node call.
826 return self._SingleNodeCall(node, "blockdev_addchildren",
828 [disk.ToDict() for disk in ndevs]])
830 def call_blockdev_removechildren(self, node, bdev, ndevs):
831 """Request removing a list of children from a (mirroring) device.
833 This is a single-node call.
836 return self._SingleNodeCall(node, "blockdev_removechildren",
838 [disk.ToDict() for disk in ndevs]])
840 def call_blockdev_getmirrorstatus(self, node, disks):
841 """Request status of a (mirroring) device.
843 This is a single-node call.
846 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
847 [dsk.ToDict() for dsk in disks])
848 if not result.fail_msg:
849 result.payload = [objects.BlockDevStatus.FromDict(i)
850 for i in result.payload]
853 def call_blockdev_find(self, node, disk):
854 """Request identification of a given block device.
856 This is a single-node call.
859 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
860 if not result.fail_msg and result.payload is not None:
861 result.payload = objects.BlockDevStatus.FromDict(result.payload)
864 def call_blockdev_close(self, node, instance_name, disks):
865 """Closes the given block devices.
867 This is a single-node call.
870 params = [instance_name, [cf.ToDict() for cf in disks]]
871 return self._SingleNodeCall(node, "blockdev_close", params)
873 def call_blockdev_getsizes(self, node, disks):
874 """Returns the size of the given disks.
876 This is a single-node call.
879 params = [[cf.ToDict() for cf in disks]]
880 return self._SingleNodeCall(node, "blockdev_getsize", params)
882 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
883 """Disconnects the network of the given drbd devices.
885 This is a multi-node call.
888 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
889 [nodes_ip, [cf.ToDict() for cf in disks]])
891 def call_drbd_attach_net(self, node_list, nodes_ip,
892 disks, instance_name, multimaster):
893 """Disconnects the given drbd devices.
895 This is a multi-node call.
898 return self._MultiNodeCall(node_list, "drbd_attach_net",
899 [nodes_ip, [cf.ToDict() for cf in disks],
900 instance_name, multimaster])
902 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
903 """Waits for the synchronization of drbd devices is complete.
905 This is a multi-node call.
908 return self._MultiNodeCall(node_list, "drbd_wait_sync",
909 [nodes_ip, [cf.ToDict() for cf in disks]])
912 def call_upload_file(cls, node_list, file_name, address_list=None):
915 The node will refuse the operation in case the file is not on the
918 This is a multi-node call.
920 @type node_list: list
921 @param node_list: the list of node names to upload to
923 @param file_name: the filename to upload
924 @type address_list: list or None
925 @keyword address_list: an optional list of node addresses, in order
926 to optimize the RPC speed
929 file_contents = utils.ReadFile(file_name)
930 data = cls._Compress(file_contents)
931 st = os.stat(file_name)
932 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
933 st.st_atime, st.st_mtime]
934 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
935 address_list=address_list)
938 def call_write_ssconf_files(cls, node_list, values):
939 """Write ssconf files.
941 This is a multi-node call.
944 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
946 def call_os_diagnose(self, node_list):
947 """Request a diagnose of OS definitions.
949 This is a multi-node call.
952 return self._MultiNodeCall(node_list, "os_diagnose", [])
954 def call_os_get(self, node, name):
955 """Returns an OS definition.
957 This is a single-node call.
960 result = self._SingleNodeCall(node, "os_get", [name])
961 if not result.fail_msg and isinstance(result.payload, dict):
962 result.payload = objects.OS.FromDict(result.payload)
965 def call_hooks_runner(self, node_list, hpath, phase, env):
966 """Call the hooks runner.
969 - op: the OpCode instance
970 - env: a dictionary with the environment
972 This is a multi-node call.
975 params = [hpath, phase, env]
976 return self._MultiNodeCall(node_list, "hooks_runner", params)
978 def call_iallocator_runner(self, node, name, idata):
979 """Call an iallocator on a remote node
982 - name: the iallocator name
983 - input: the json-encoded input string
985 This is a single-node call.
988 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
990 def call_blockdev_grow(self, node, cf_bdev, amount):
991 """Request a snapshot of the given block device.
993 This is a single-node call.
996 return self._SingleNodeCall(node, "blockdev_grow",
997 [cf_bdev.ToDict(), amount])
999 def call_blockdev_export(self, node, cf_bdev,
1000 dest_node, dest_path, cluster_name):
1001 """Export a given disk to another node.
1003 This is a single-node call.
1006 return self._SingleNodeCall(node, "blockdev_export",
1007 [cf_bdev.ToDict(), dest_node, dest_path,
1010 def call_blockdev_snapshot(self, node, cf_bdev):
1011 """Request a snapshot of the given block device.
1013 This is a single-node call.
1016 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1018 def call_finalize_export(self, node, instance, snap_disks):
1019 """Request the completion of an export operation.
1021 This writes the export config file, etc.
1023 This is a single-node call.
1027 for disk in snap_disks:
1028 if isinstance(disk, bool):
1029 flat_disks.append(disk)
1031 flat_disks.append(disk.ToDict())
1033 return self._SingleNodeCall(node, "finalize_export",
1034 [self._InstDict(instance), flat_disks])
1036 def call_export_info(self, node, path):
1037 """Queries the export information in a given path.
1039 This is a single-node call.
1042 return self._SingleNodeCall(node, "export_info", [path])
1044 def call_export_list(self, node_list):
1045 """Gets the stored exports list.
1047 This is a multi-node call.
1050 return self._MultiNodeCall(node_list, "export_list", [])
1052 def call_export_remove(self, node, export):
1053 """Requests removal of a given export.
1055 This is a single-node call.
1058 return self._SingleNodeCall(node, "export_remove", [export])
1061 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1062 """Requests a node to clean the cluster information it has.
1064 This will remove the configuration information from the ganeti data
1067 This is a single-node call.
1070 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1073 def call_node_volumes(self, node_list):
1074 """Gets all volumes on node(s).
1076 This is a multi-node call.
1079 return self._MultiNodeCall(node_list, "node_volumes", [])
1081 def call_node_demote_from_mc(self, node):
1082 """Demote a node from the master candidate role.
1084 This is a single-node call.
1087 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1089 def call_node_powercycle(self, node, hypervisor):
1090 """Tries to powercycle a node.
1092 This is a single-node call.
1095 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1097 def call_test_delay(self, node_list, duration):
1098 """Sleep for a fixed time on given node(s).
1100 This is a multi-node call.
1103 return self._MultiNodeCall(node_list, "test_delay", [duration])
1105 def call_file_storage_dir_create(self, node, file_storage_dir):
1106 """Create the given file storage directory.
1108 This is a single-node call.
1111 return self._SingleNodeCall(node, "file_storage_dir_create",
1114 def call_file_storage_dir_remove(self, node, file_storage_dir):
1115 """Remove the given file storage directory.
1117 This is a single-node call.
1120 return self._SingleNodeCall(node, "file_storage_dir_remove",
1123 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1124 new_file_storage_dir):
1125 """Rename file storage directory.
1127 This is a single-node call.
1130 return self._SingleNodeCall(node, "file_storage_dir_rename",
1131 [old_file_storage_dir, new_file_storage_dir])
1134 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1135 """Update job queue.
1137 This is a multi-node call.
1140 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1141 [file_name, cls._Compress(content)],
1142 address_list=address_list)
1145 def call_jobqueue_purge(cls, node):
1148 This is a single-node call.
1151 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1154 def call_jobqueue_rename(cls, node_list, address_list, rename):
1155 """Rename a job queue file.
1157 This is a multi-node call.
1160 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1161 address_list=address_list)
1164 def call_jobqueue_set_drain(cls, node_list, drain_flag):
1165 """Set the drain flag on the queue.
1167 This is a multi-node call.
1169 @type node_list: list
1170 @param node_list: the list of nodes to query
1171 @type drain_flag: bool
1172 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1175 return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1178 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1179 """Validate the hypervisor params.
1181 This is a multi-node call.
1183 @type node_list: list
1184 @param node_list: the list of nodes to query
1185 @type hvname: string
1186 @param hvname: the hypervisor name
1187 @type hvparams: dict
1188 @param hvparams: the hypervisor parameters to be validated
1191 cluster = self._cfg.GetClusterInfo()
1192 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1193 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1196 def call_x509_cert_create(self, node, validity):
1197 """Creates a new X509 certificate for SSL/TLS.
1199 This is a single-node call.
1202 @param validity: Validity in seconds
1205 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1207 def call_x509_cert_remove(self, node, name):
1208 """Removes a X509 certificate.
1210 This is a single-node call.
1213 @param name: Certificate name
1216 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1218 def call_import_start(self, node, opts, instance, dest, dest_args):
1219 """Starts a listener for an import.
1221 This is a single-node call.
1224 @param node: Node name
1225 @type instance: C{objects.Instance}
1226 @param instance: Instance object
1229 return self._SingleNodeCall(node, "import_start",
1231 self._InstDict(instance), dest,
1232 _EncodeImportExportIO(dest, dest_args)])
1234 def call_export_start(self, node, opts, host, port,
1235 instance, source, source_args):
1236 """Starts an export daemon.
1238 This is a single-node call.
1241 @param node: Node name
1242 @type instance: C{objects.Instance}
1243 @param instance: Instance object
1246 return self._SingleNodeCall(node, "export_start",
1247 [opts.ToDict(), host, port,
1248 self._InstDict(instance), source,
1249 _EncodeImportExportIO(source, source_args)])
1251 def call_impexp_status(self, node, names):
1252 """Gets the status of an import or export.
1254 This is a single-node call.
1257 @param node: Node name
1258 @type names: List of strings
1259 @param names: Import/export names
1260 @rtype: List of L{objects.ImportExportStatus} instances
1261 @return: Returns a list of the state of each named import/export or None if
1262 a status couldn't be retrieved
1265 result = self._SingleNodeCall(node, "impexp_status", [names])
1267 if not result.fail_msg:
1270 for i in result.payload:
1272 decoded.append(None)
1274 decoded.append(objects.ImportExportStatus.FromDict(i))
1276 result.payload = decoded
1280 def call_impexp_abort(self, node, name):
1281 """Aborts an import or export.
1283 This is a single-node call.
1286 @param node: Node name
1288 @param name: Import/export name
1291 return self._SingleNodeCall(node, "impexp_abort", [name])
1293 def call_impexp_cleanup(self, node, name):
1294 """Cleans up after an import or export.
1296 This is a single-node call.
1299 @param node: Node name
1301 @param name: Import/export name
1304 return self._SingleNodeCall(node, "impexp_cleanup", [name])