4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
46 import ganeti.http.client
49 # Module level variable
54 """Initializes the module-global HTTP client manager.
56 Must be called before using any RPC function.
61 assert not _http_manager, "RPC module initialized more than once"
63 _http_manager = http.client.HttpClientManager()
67 """Stops the module-global HTTP client manager.
69 Must be called before quitting the program.
75 _http_manager.Shutdown()
79 class RpcResult(object):
82 This class holds an RPC result. It is needed since in multi-node
83 calls we can't raise an exception just because one one out of many
84 failed, and therefore we use this class to encapsulate the result.
86 @ivar data: the data payload, for successfull results, or None
88 @ivar failed: whether the operation failed at RPC level (not
89 application level on the remote node)
90 @ivar call: the name of the RPC call
91 @ivar node: the name of the node to which we made the call
92 @ivar offline: whether the operation failed because the node was
93 offline, as opposed to actual failure; offline=True will always
94 imply failed=True, in order to allow simpler checking if
95 the user doesn't care about the exact failure mode
98 def __init__(self, data=None, failed=False, offline=False,
99 call=None, node=None):
101 self.offline = offline
106 self.error = "Node is marked offline"
116 """If the result has failed, raise an OpExecError.
118 This is used so that LU code doesn't have to check for each
119 result, but instead can call this function.
123 raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
124 (self.call, self.node, self.error))
130 This class, given a (remote) method name, a list of parameters and a
131 list of nodes, will contact (in parallel) all nodes, and return a
132 dict of results (key: node name, value: result).
134 One current bug is that generic failure is still signalled by
135 'False' result, which is not good. This overloading of values can
139 def __init__(self, procedure, body, port):
140 self.procedure = procedure
146 http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
147 ssl_cert_path=constants.SSL_CERT_FILE)
149 def ConnectList(self, node_list, address_list=None):
150 """Add a list of nodes to the target nodes.
152 @type node_list: list
153 @param node_list: the list of node names to connect
154 @type address_list: list or None
155 @keyword address_list: either None or a list with node addresses,
156 which must have the same length as the node list
159 if address_list is None:
160 address_list = [None for _ in node_list]
162 assert len(node_list) == len(address_list), \
163 "Name and address lists should have the same length"
164 for node, address in zip(node_list, address_list):
165 self.ConnectNode(node, address)
167 def ConnectNode(self, name, address=None):
168 """Add a node to the target list.
171 @param name: the node name
173 @keyword address: the node address, if known
180 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
181 "/%s" % self.procedure,
183 ssl_params=self._ssl_params,
184 ssl_verify_peer=True)
186 def GetResults(self):
187 """Call nodes and return results.
190 @returns: List of RPC results
193 assert _http_manager, "RPC module not intialized"
195 _http_manager.ExecRequests(self.nc.values())
199 for name, req in self.nc.iteritems():
200 if req.success and req.resp_status_code == http.HTTP_OK:
201 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
202 node=name, call=self.procedure)
205 # TODO: Better error reporting
211 logging.error("RPC error from node %s: %s", name, msg)
212 results[name] = RpcResult(data=msg, failed=True, node=name,
218 class RpcRunner(object):
219 """RPC runner class"""
221 def __init__(self, cfg):
222 """Initialized the rpc runner.
224 @type cfg: C{config.ConfigWriter}
225 @param cfg: the configuration object that will be used to get data
230 self.port = utils.GetNodeDaemonPort()
232 def _InstDict(self, instance):
233 """Convert the given instance to a dict.
235 This is done via the instance's ToDict() method and additionally
236 we fill the hvparams with the cluster defaults.
238 @type instance: L{objects.Instance}
239 @param instance: an Instance object
241 @return: the instance dict, with the hvparams filled with the
245 idict = instance.ToDict()
246 cluster = self._cfg.GetClusterInfo()
247 idict["hvparams"] = cluster.FillHV(instance)
248 idict["beparams"] = cluster.FillBE(instance)
251 def _ConnectList(self, client, node_list):
252 """Helper for computing node addresses.
254 @type client: L{Client}
255 @param client: a C{Client} instance
256 @type node_list: list
257 @param node_list: the node list we should connect
260 all_nodes = self._cfg.GetAllNodesInfo()
264 for node in node_list:
265 if node in all_nodes:
266 if all_nodes[node].offline:
267 skip_dict[node] = RpcResult(node=node, offline=True)
269 val = all_nodes[node].primary_ip
272 addr_list.append(val)
273 name_list.append(node)
275 client.ConnectList(name_list, address_list=addr_list)
278 def _ConnectNode(self, client, node):
279 """Helper for computing one node's address.
281 @type client: L{Client}
282 @param client: a C{Client} instance
284 @param node: the node we should connect
287 node_info = self._cfg.GetNodeInfo(node)
288 if node_info is not None:
289 if node_info.offline:
290 return RpcResult(node=node, offline=True)
291 addr = node_info.primary_ip
294 client.ConnectNode(node, address=addr)
296 def _MultiNodeCall(self, node_list, procedure, args):
297 """Helper for making a multi-node call
300 body = serializer.DumpJson(args, indent=False)
301 c = Client(procedure, body, self.port)
302 skip_dict = self._ConnectList(c, node_list)
303 skip_dict.update(c.GetResults())
307 def _StaticMultiNodeCall(cls, node_list, procedure, args,
309 """Helper for making a multi-node static call
312 body = serializer.DumpJson(args, indent=False)
313 c = Client(procedure, body, utils.GetNodeDaemonPort())
314 c.ConnectList(node_list, address_list=address_list)
315 return c.GetResults()
317 def _SingleNodeCall(self, node, procedure, args):
318 """Helper for making a single-node call
321 body = serializer.DumpJson(args, indent=False)
322 c = Client(procedure, body, self.port)
323 result = self._ConnectNode(c, node)
325 # we did connect, node is not offline
326 result = c.GetResults()[node]
330 def _StaticSingleNodeCall(cls, node, procedure, args):
331 """Helper for making a single-node static call
334 body = serializer.DumpJson(args, indent=False)
335 c = Client(procedure, body, utils.GetNodeDaemonPort())
337 return c.GetResults()[node]
341 """Compresses a string for transport over RPC.
343 Small amounts of data are not compressed.
348 @return: Encoded data to send
351 # Small amounts of data are not compressed
353 return (constants.RPC_ENCODING_NONE, data)
355 # Compress with zlib and encode in base64
356 return (constants.RPC_ENCODING_ZLIB_BASE64,
357 base64.b64encode(zlib.compress(data, 3)))
363 def call_volume_list(self, node_list, vg_name):
364 """Gets the logical volumes present in a given volume group.
366 This is a multi-node call.
369 return self._MultiNodeCall(node_list, "volume_list", [vg_name])
371 def call_vg_list(self, node_list):
372 """Gets the volume group list.
374 This is a multi-node call.
377 return self._MultiNodeCall(node_list, "vg_list", [])
379 def call_bridges_exist(self, node, bridges_list):
380 """Checks if a node has all the bridges given.
382 This method checks if all bridges given in the bridges_list are
383 present on the remote node, so that an instance that uses interfaces
384 on those bridges can be started.
386 This is a single-node call.
389 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
391 def call_instance_start(self, node, instance, extra_args):
392 """Starts an instance.
394 This is a single-node call.
397 return self._SingleNodeCall(node, "instance_start",
398 [self._InstDict(instance), extra_args])
400 def call_instance_shutdown(self, node, instance):
401 """Stops an instance.
403 This is a single-node call.
406 return self._SingleNodeCall(node, "instance_shutdown",
407 [self._InstDict(instance)])
409 def call_instance_migrate(self, node, instance, target, live):
410 """Migrate an instance.
412 This is a single-node call.
415 @param node: the node on which the instance is currently running
416 @type instance: C{objects.Instance}
417 @param instance: the instance definition
419 @param target: the target node name
421 @param live: whether the migration should be done live or not (the
422 interpretation of this parameter is left to the hypervisor)
425 return self._SingleNodeCall(node, "instance_migrate",
426 [self._InstDict(instance), target, live])
428 def call_instance_reboot(self, node, instance, reboot_type, extra_args):
429 """Reboots an instance.
431 This is a single-node call.
434 return self._SingleNodeCall(node, "instance_reboot",
435 [self._InstDict(instance), reboot_type,
438 def call_instance_os_add(self, node, inst):
439 """Installs an OS on the given instance.
441 This is a single-node call.
444 return self._SingleNodeCall(node, "instance_os_add",
445 [self._InstDict(inst)])
447 def call_instance_run_rename(self, node, inst, old_name):
448 """Run the OS rename script for an instance.
450 This is a single-node call.
453 return self._SingleNodeCall(node, "instance_run_rename",
454 [self._InstDict(inst), old_name])
456 def call_instance_info(self, node, instance, hname):
457 """Returns information about a single instance.
459 This is a single-node call.
462 @param node: the list of nodes to query
463 @type instance: string
464 @param instance: the instance name
466 @param hname: the hypervisor type of the instance
469 return self._SingleNodeCall(node, "instance_info", [instance, hname])
471 def call_all_instances_info(self, node_list, hypervisor_list):
472 """Returns information about all instances on the given nodes.
474 This is a multi-node call.
476 @type node_list: list
477 @param node_list: the list of nodes to query
478 @type hypervisor_list: list
479 @param hypervisor_list: the hypervisors to query for instances
482 return self._MultiNodeCall(node_list, "all_instances_info",
485 def call_instance_list(self, node_list, hypervisor_list):
486 """Returns the list of running instances on a given node.
488 This is a multi-node call.
490 @type node_list: list
491 @param node_list: the list of nodes to query
492 @type hypervisor_list: list
493 @param hypervisor_list: the hypervisors to query for instances
496 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
498 def call_node_tcp_ping(self, node, source, target, port, timeout,
500 """Do a TcpPing on the remote node
502 This is a single-node call.
505 return self._SingleNodeCall(node, "node_tcp_ping",
506 [source, target, port, timeout,
509 def call_node_has_ip_address(self, node, address):
510 """Checks if a node has the given IP address.
512 This is a single-node call.
515 return self._SingleNodeCall(node, "node_has_ip_address", [address])
517 def call_node_info(self, node_list, vg_name, hypervisor_type):
518 """Return node information.
520 This will return memory information and volume group size and free
523 This is a multi-node call.
525 @type node_list: list
526 @param node_list: the list of nodes to query
527 @type vg_name: C{string}
528 @param vg_name: the name of the volume group to ask for disk space
530 @type hypervisor_type: C{str}
531 @param hypervisor_type: the name of the hypervisor to ask for
535 retux = self._MultiNodeCall(node_list, "node_info",
536 [vg_name, hypervisor_type])
538 for result in retux.itervalues():
539 if result.failed or not isinstance(result.data, dict):
542 utils.CheckDict(result.data, {
543 'memory_total' : '-',
546 'vg_size' : 'node_unreachable',
551 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
552 """Add a node to the cluster.
554 This is a single-node call.
557 return self._SingleNodeCall(node, "node_add",
558 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
560 def call_node_verify(self, node_list, checkdict, cluster_name):
561 """Request verification of given parameters.
563 This is a multi-node call.
566 return self._MultiNodeCall(node_list, "node_verify",
567 [checkdict, cluster_name])
570 def call_node_start_master(cls, node, start_daemons):
571 """Tells a node to activate itself as a master.
573 This is a single-node call.
576 return cls._StaticSingleNodeCall(node, "node_start_master",
580 def call_node_stop_master(cls, node, stop_daemons):
581 """Tells a node to demote itself from master status.
583 This is a single-node call.
586 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
589 def call_master_info(cls, node_list):
590 """Query master info.
592 This is a multi-node call.
595 # TODO: should this method query down nodes?
596 return cls._StaticMultiNodeCall(node_list, "master_info", [])
598 def call_version(self, node_list):
599 """Query node version.
601 This is a multi-node call.
604 return self._MultiNodeCall(node_list, "version", [])
606 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
607 """Request creation of a given block device.
609 This is a single-node call.
612 return self._SingleNodeCall(node, "blockdev_create",
613 [bdev.ToDict(), size, owner, on_primary, info])
615 def call_blockdev_remove(self, node, bdev):
616 """Request removal of a given block device.
618 This is a single-node call.
621 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
623 def call_blockdev_rename(self, node, devlist):
624 """Request rename of the given block devices.
626 This is a single-node call.
629 return self._SingleNodeCall(node, "blockdev_rename",
630 [(d.ToDict(), uid) for d, uid in devlist])
632 def call_blockdev_assemble(self, node, disk, owner, on_primary):
633 """Request assembling of a given block device.
635 This is a single-node call.
638 return self._SingleNodeCall(node, "blockdev_assemble",
639 [disk.ToDict(), owner, on_primary])
641 def call_blockdev_shutdown(self, node, disk):
642 """Request shutdown of a given block device.
644 This is a single-node call.
647 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
649 def call_blockdev_addchildren(self, node, bdev, ndevs):
650 """Request adding a list of children to a (mirroring) device.
652 This is a single-node call.
655 return self._SingleNodeCall(node, "blockdev_addchildren",
657 [disk.ToDict() for disk in ndevs]])
659 def call_blockdev_removechildren(self, node, bdev, ndevs):
660 """Request removing a list of children from a (mirroring) device.
662 This is a single-node call.
665 return self._SingleNodeCall(node, "blockdev_removechildren",
667 [disk.ToDict() for disk in ndevs]])
669 def call_blockdev_getmirrorstatus(self, node, disks):
670 """Request status of a (mirroring) device.
672 This is a single-node call.
675 return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
676 [dsk.ToDict() for dsk in disks])
678 def call_blockdev_find(self, node, disk):
679 """Request identification of a given block device.
681 This is a single-node call.
684 return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
686 def call_blockdev_close(self, node, disks):
687 """Closes the given block devices.
689 This is a single-node call.
692 return self._SingleNodeCall(node, "blockdev_close",
693 [cf.ToDict() for cf in disks])
696 def call_upload_file(cls, node_list, file_name, address_list=None):
699 The node will refuse the operation in case the file is not on the
702 This is a multi-node call.
704 @type node_list: list
705 @param node_list: the list of node names to upload to
707 @param file_name: the filename to upload
708 @type address_list: list or None
709 @keyword address_list: an optional list of node addresses, in order
710 to optimize the RPC speed
713 file_contents = utils.ReadFile(file_name)
714 data = cls._Compress(file_contents)
715 st = os.stat(file_name)
716 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
717 st.st_atime, st.st_mtime]
718 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
719 address_list=address_list)
722 def call_write_ssconf_files(cls, node_list, values):
723 """Write ssconf files.
725 This is a multi-node call.
728 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
730 def call_os_diagnose(self, node_list):
731 """Request a diagnose of OS definitions.
733 This is a multi-node call.
736 result = self._MultiNodeCall(node_list, "os_diagnose", [])
738 for node_name, node_result in result.iteritems():
739 if not node_result.failed and node_result.data:
740 node_result.data = [objects.OS.FromDict(oss)
741 for oss in node_result.data]
744 def call_os_get(self, node, name):
745 """Returns an OS definition.
747 This is a single-node call.
750 result = self._SingleNodeCall(node, "os_get", [name])
751 if not result.failed and isinstance(result.data, dict):
752 result.data = objects.OS.FromDict(result.data)
755 def call_hooks_runner(self, node_list, hpath, phase, env):
756 """Call the hooks runner.
759 - op: the OpCode instance
760 - env: a dictionary with the environment
762 This is a multi-node call.
765 params = [hpath, phase, env]
766 return self._MultiNodeCall(node_list, "hooks_runner", params)
768 def call_iallocator_runner(self, node, name, idata):
769 """Call an iallocator on a remote node
772 - name: the iallocator name
773 - input: the json-encoded input string
775 This is a single-node call.
778 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
780 def call_blockdev_grow(self, node, cf_bdev, amount):
781 """Request a snapshot of the given block device.
783 This is a single-node call.
786 return self._SingleNodeCall(node, "blockdev_grow",
787 [cf_bdev.ToDict(), amount])
789 def call_blockdev_snapshot(self, node, cf_bdev):
790 """Request a snapshot of the given block device.
792 This is a single-node call.
795 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
797 def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
799 """Request the export of a given snapshot.
801 This is a single-node call.
804 return self._SingleNodeCall(node, "snapshot_export",
805 [snap_bdev.ToDict(), dest_node,
806 self._InstDict(instance), cluster_name, idx])
808 def call_finalize_export(self, node, instance, snap_disks):
809 """Request the completion of an export operation.
811 This writes the export config file, etc.
813 This is a single-node call.
817 for disk in snap_disks:
818 flat_disks.append(disk.ToDict())
820 return self._SingleNodeCall(node, "finalize_export",
821 [self._InstDict(instance), flat_disks])
823 def call_export_info(self, node, path):
824 """Queries the export information in a given path.
826 This is a single-node call.
829 result = self._SingleNodeCall(node, "export_info", [path])
830 if not result.failed and result.data:
831 result.data = objects.SerializableConfigParser.Loads(str(result.data))
834 def call_instance_os_import(self, node, inst, src_node, src_images,
836 """Request the import of a backup into an instance.
838 This is a single-node call.
841 return self._SingleNodeCall(node, "instance_os_import",
842 [self._InstDict(inst), src_node, src_images,
845 def call_export_list(self, node_list):
846 """Gets the stored exports list.
848 This is a multi-node call.
851 return self._MultiNodeCall(node_list, "export_list", [])
853 def call_export_remove(self, node, export):
854 """Requests removal of a given export.
856 This is a single-node call.
859 return self._SingleNodeCall(node, "export_remove", [export])
862 def call_node_leave_cluster(cls, node):
863 """Requests a node to clean the cluster information it has.
865 This will remove the configuration information from the ganeti data
868 This is a single-node call.
871 return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
873 def call_node_volumes(self, node_list):
874 """Gets all volumes on node(s).
876 This is a multi-node call.
879 return self._MultiNodeCall(node_list, "node_volumes", [])
881 def call_node_demote_from_mc(self, node):
882 """Demote a node from the master candidate role.
884 This is a single-node call.
887 return self._SingleNodeCall(node, "node_demote_from_mc", [])
889 def call_test_delay(self, node_list, duration):
890 """Sleep for a fixed time on given node(s).
892 This is a multi-node call.
895 return self._MultiNodeCall(node_list, "test_delay", [duration])
897 def call_file_storage_dir_create(self, node, file_storage_dir):
898 """Create the given file storage directory.
900 This is a single-node call.
903 return self._SingleNodeCall(node, "file_storage_dir_create",
906 def call_file_storage_dir_remove(self, node, file_storage_dir):
907 """Remove the given file storage directory.
909 This is a single-node call.
912 return self._SingleNodeCall(node, "file_storage_dir_remove",
915 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
916 new_file_storage_dir):
917 """Rename file storage directory.
919 This is a single-node call.
922 return self._SingleNodeCall(node, "file_storage_dir_rename",
923 [old_file_storage_dir, new_file_storage_dir])
926 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
929 This is a multi-node call.
932 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
933 [file_name, cls._Compress(content)],
934 address_list=address_list)
937 def call_jobqueue_purge(cls, node):
940 This is a single-node call.
943 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
946 def call_jobqueue_rename(cls, node_list, address_list, old, new):
947 """Rename a job queue file.
949 This is a multi-node call.
952 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
953 address_list=address_list)
956 def call_jobqueue_set_drain(cls, node_list, drain_flag):
957 """Set the drain flag on the queue.
959 This is a multi-node call.
961 @type node_list: list
962 @param node_list: the list of nodes to query
963 @type drain_flag: bool
964 @param drain_flag: if True, will set the drain flag, otherwise reset it.
967 return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
970 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
971 """Validate the hypervisor params.
973 This is a multi-node call.
975 @type node_list: list
976 @param node_list: the list of nodes to query
978 @param hvname: the hypervisor name
980 @param hvparams: the hypervisor parameters to be validated
983 cluster = self._cfg.GetClusterInfo()
984 hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
985 return self._MultiNodeCall(node_list, "hypervisor_validate_params",