4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
37 from ganeti import utils
38 from ganeti import objects
39 from ganeti import http
40 from ganeti import serializer
41 from ganeti import constants
42 from ganeti import errors
44 import ganeti.http.client
47 # Module level variable
52 """Initializes the module-global HTTP client manager.
54 Must be called before using any RPC function.
59 assert not _http_manager, "RPC module initialized more than once"
61 _http_manager = http.client.HttpClientManager()
65 """Stops the module-global HTTP client manager.
67 Must be called before quitting the program.
73 _http_manager.Shutdown()
77 class RpcResult(object):
80 This class holds an RPC result. It is needed since in multi-node
81 calls we can't raise an exception just because one one out of many
82 failed, and therefore we use this class to encapsulate the result.
85 def __init__(self, data, failed=False, call=None, node=None):
97 """If the result has failed, raise an OpExecError.
99 This is used so that LU code doesn't have to check for each
100 result, but instead can call this function.
104 raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
105 (self.call, self.node, self.error))
111 This class, given a (remote) method name, a list of parameters and a
112 list of nodes, will contact (in parallel) all nodes, and return a
113 dict of results (key: node name, value: result).
115 One current bug is that generic failure is still signalled by
116 'False' result, which is not good. This overloading of values can
120 def __init__(self, procedure, body, port):
121 self.procedure = procedure
127 http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
128 ssl_cert_path=constants.SSL_CERT_FILE)
130 def ConnectList(self, node_list, address_list=None):
131 """Add a list of nodes to the target nodes.
133 @type node_list: list
134 @param node_list: the list of node names to connect
135 @type address_list: list or None
136 @keyword address_list: either None or a list with node addresses,
137 which must have the same length as the node list
140 if address_list is None:
141 address_list = [None for _ in node_list]
143 assert len(node_list) == len(address_list), \
144 "Name and address lists should have the same length"
145 for node, address in zip(node_list, address_list):
146 self.ConnectNode(node, address)
148 def ConnectNode(self, name, address=None):
149 """Add a node to the target list.
152 @param name: the node name
154 @keyword address: the node address, if known
161 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
162 "/%s" % self.procedure,
164 ssl_params=self._ssl_params,
165 ssl_verify_peer=True)
167 def GetResults(self):
168 """Call nodes and return results.
171 @returns: List of RPC results
174 assert _http_manager, "RPC module not intialized"
176 _http_manager.ExecRequests(self.nc.values())
180 for name, req in self.nc.iteritems():
181 if req.success and req.resp_status_code == http.HTTP_OK:
182 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
183 node=name, call=self.procedure)
186 # TODO: Better error reporting
192 logging.error("RPC error from node %s: %s", name, msg)
193 results[name] = RpcResult(data=msg, failed=True, node=name,
199 class RpcRunner(object):
200 """RPC runner class"""
202 def __init__(self, cfg):
203 """Initialized the rpc runner.
205 @type cfg: C{config.ConfigWriter}
206 @param cfg: the configuration object that will be used to get data
211 self.port = utils.GetNodeDaemonPort()
213 def _InstDict(self, instance):
214 """Convert the given instance to a dict.
216 This is done via the instance's ToDict() method and additionally
217 we fill the hvparams with the cluster defaults.
219 @type instance: L{objects.Instance}
220 @param instance: an Instance object
222 @return: the instance dict, with the hvparams filled with the
226 idict = instance.ToDict()
227 cluster = self._cfg.GetClusterInfo()
228 idict["hvparams"] = cluster.FillHV(instance)
229 idict["beparams"] = cluster.FillBE(instance)
232 def _ConnectList(self, client, node_list):
233 """Helper for computing node addresses.
235 @type client: L{Client}
236 @param client: a C{Client} instance
237 @type node_list: list
238 @param node_list: the node list we should connect
241 all_nodes = self._cfg.GetAllNodesInfo()
243 for node in node_list:
244 if node in all_nodes:
245 val = all_nodes[node].primary_ip
248 addr_list.append(val)
249 client.ConnectList(node_list, address_list=addr_list)
251 def _ConnectNode(self, client, node):
252 """Helper for computing one node's address.
254 @type client: L{Client}
255 @param client: a C{Client} instance
257 @param node: the node we should connect
260 node_info = self._cfg.GetNodeInfo(node)
261 if node_info is not None:
262 addr = node_info.primary_ip
265 client.ConnectNode(node, address=addr)
267 def _MultiNodeCall(self, node_list, procedure, args,
269 """Helper for making a multi-node call
272 body = serializer.DumpJson(args, indent=False)
273 c = Client(procedure, body, self.port)
274 if address_list is None:
275 self._ConnectList(c, node_list)
277 c.ConnectList(node_list, address_list=address_list)
278 return c.GetResults()
281 def _StaticMultiNodeCall(cls, node_list, procedure, args,
283 """Helper for making a multi-node static call
286 body = serializer.DumpJson(args, indent=False)
287 c = Client(procedure, body, utils.GetNodeDaemonPort())
288 c.ConnectList(node_list, address_list=address_list)
289 return c.GetResults()
291 def _SingleNodeCall(self, node, procedure, args):
292 """Helper for making a single-node call
295 body = serializer.DumpJson(args, indent=False)
296 c = Client(procedure, body, self.port)
297 self._ConnectNode(c, node)
298 return c.GetResults().get(node, False)
301 def _StaticSingleNodeCall(cls, node, procedure, args):
302 """Helper for making a single-node static call
305 body = serializer.DumpJson(args, indent=False)
306 c = Client(procedure, body, utils.GetNodeDaemonPort())
308 return c.GetResults().get(node, False)
314 def call_volume_list(self, node_list, vg_name):
315 """Gets the logical volumes present in a given volume group.
317 This is a multi-node call.
320 return self._MultiNodeCall(node_list, "volume_list", [vg_name])
322 def call_vg_list(self, node_list):
323 """Gets the volume group list.
325 This is a multi-node call.
328 return self._MultiNodeCall(node_list, "vg_list", [])
330 def call_bridges_exist(self, node, bridges_list):
331 """Checks if a node has all the bridges given.
333 This method checks if all bridges given in the bridges_list are
334 present on the remote node, so that an instance that uses interfaces
335 on those bridges can be started.
337 This is a single-node call.
340 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
342 def call_instance_start(self, node, instance, extra_args):
343 """Starts an instance.
345 This is a single-node call.
348 return self._SingleNodeCall(node, "instance_start",
349 [self._InstDict(instance), extra_args])
351 def call_instance_shutdown(self, node, instance):
352 """Stops an instance.
354 This is a single-node call.
357 return self._SingleNodeCall(node, "instance_shutdown",
358 [self._InstDict(instance)])
360 def call_instance_migrate(self, node, instance, target, live):
361 """Migrate an instance.
363 This is a single-node call.
366 @param node: the node on which the instance is currently running
367 @type instance: C{objects.Instance}
368 @param instance: the instance definition
370 @param target: the target node name
372 @param live: whether the migration should be done live or not (the
373 interpretation of this parameter is left to the hypervisor)
376 return self._SingleNodeCall(node, "instance_migrate",
377 [self._InstDict(instance), target, live])
379 def call_instance_reboot(self, node, instance, reboot_type, extra_args):
380 """Reboots an instance.
382 This is a single-node call.
385 return self._SingleNodeCall(node, "instance_reboot",
386 [self._InstDict(instance), reboot_type,
389 def call_instance_os_add(self, node, inst):
390 """Installs an OS on the given instance.
392 This is a single-node call.
395 return self._SingleNodeCall(node, "instance_os_add",
396 [self._InstDict(inst)])
398 def call_instance_run_rename(self, node, inst, old_name):
399 """Run the OS rename script for an instance.
401 This is a single-node call.
404 return self._SingleNodeCall(node, "instance_run_rename",
405 [self._InstDict(inst), old_name])
407 def call_instance_info(self, node, instance, hname):
408 """Returns information about a single instance.
410 This is a single-node call.
413 @param node: the list of nodes to query
414 @type instance: string
415 @param instance: the instance name
417 @param hname: the hypervisor type of the instance
420 return self._SingleNodeCall(node, "instance_info", [instance, hname])
422 def call_all_instances_info(self, node_list, hypervisor_list):
423 """Returns information about all instances on the given nodes.
425 This is a multi-node call.
427 @type node_list: list
428 @param node_list: the list of nodes to query
429 @type hypervisor_list: list
430 @param hypervisor_list: the hypervisors to query for instances
433 return self._MultiNodeCall(node_list, "all_instances_info",
436 def call_instance_list(self, node_list, hypervisor_list):
437 """Returns the list of running instances on a given node.
439 This is a multi-node call.
441 @type node_list: list
442 @param node_list: the list of nodes to query
443 @type hypervisor_list: list
444 @param hypervisor_list: the hypervisors to query for instances
447 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
449 def call_node_tcp_ping(self, node, source, target, port, timeout,
451 """Do a TcpPing on the remote node
453 This is a single-node call.
456 return self._SingleNodeCall(node, "node_tcp_ping",
457 [source, target, port, timeout,
460 def call_node_has_ip_address(self, node, address):
461 """Checks if a node has the given IP address.
463 This is a single-node call.
466 return self._SingleNodeCall(node, "node_has_ip_address", [address])
468 def call_node_info(self, node_list, vg_name, hypervisor_type):
469 """Return node information.
471 This will return memory information and volume group size and free
474 This is a multi-node call.
476 @type node_list: list
477 @param node_list: the list of nodes to query
478 @type vgname: C{string}
479 @param vgname: the name of the volume group to ask for disk space
481 @type hypervisor_type: C{str}
482 @param hypervisor_type: the name of the hypervisor to ask for
486 retux = self._MultiNodeCall(node_list, "node_info",
487 [vg_name, hypervisor_type])
489 for result in retux.itervalues():
490 if result.failed or not isinstance(result.data, dict):
493 utils.CheckDict(result.data, {
494 'memory_total' : '-',
497 'vg_size' : 'node_unreachable',
502 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
503 """Add a node to the cluster.
505 This is a single-node call.
508 return self._SingleNodeCall(node, "node_add",
509 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
511 def call_node_verify(self, node_list, checkdict, cluster_name):
512 """Request verification of given parameters.
514 This is a multi-node call.
517 return self._MultiNodeCall(node_list, "node_verify",
518 [checkdict, cluster_name])
521 def call_node_start_master(cls, node, start_daemons):
522 """Tells a node to activate itself as a master.
524 This is a single-node call.
527 return cls._StaticSingleNodeCall(node, "node_start_master",
531 def call_node_stop_master(cls, node, stop_daemons):
532 """Tells a node to demote itself from master status.
534 This is a single-node call.
537 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
540 def call_master_info(cls, node_list):
541 """Query master info.
543 This is a multi-node call.
546 # TODO: should this method query down nodes?
547 return cls._StaticMultiNodeCall(node_list, "master_info", [])
549 def call_version(self, node_list):
550 """Query node version.
552 This is a multi-node call.
555 return self._MultiNodeCall(node_list, "version", [])
557 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
558 """Request creation of a given block device.
560 This is a single-node call.
563 return self._SingleNodeCall(node, "blockdev_create",
564 [bdev.ToDict(), size, owner, on_primary, info])
566 def call_blockdev_remove(self, node, bdev):
567 """Request removal of a given block device.
569 This is a single-node call.
572 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
574 def call_blockdev_rename(self, node, devlist):
575 """Request rename of the given block devices.
577 This is a single-node call.
580 return self._SingleNodeCall(node, "blockdev_rename",
581 [(d.ToDict(), uid) for d, uid in devlist])
583 def call_blockdev_assemble(self, node, disk, owner, on_primary):
584 """Request assembling of a given block device.
586 This is a single-node call.
589 return self._SingleNodeCall(node, "blockdev_assemble",
590 [disk.ToDict(), owner, on_primary])
592 def call_blockdev_shutdown(self, node, disk):
593 """Request shutdown of a given block device.
595 This is a single-node call.
598 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
600 def call_blockdev_addchildren(self, node, bdev, ndevs):
601 """Request adding a list of children to a (mirroring) device.
603 This is a single-node call.
606 return self._SingleNodeCall(node, "blockdev_addchildren",
608 [disk.ToDict() for disk in ndevs]])
610 def call_blockdev_removechildren(self, node, bdev, ndevs):
611 """Request removing a list of children from a (mirroring) device.
613 This is a single-node call.
616 return self._SingleNodeCall(node, "blockdev_removechildren",
618 [disk.ToDict() for disk in ndevs]])
620 def call_blockdev_getmirrorstatus(self, node, disks):
621 """Request status of a (mirroring) device.
623 This is a single-node call.
626 return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
627 [dsk.ToDict() for dsk in disks])
629 def call_blockdev_find(self, node, disk):
630 """Request identification of a given block device.
632 This is a single-node call.
635 return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
637 def call_blockdev_close(self, node, disks):
638 """Closes the given block devices.
640 This is a single-node call.
643 return self._SingleNodeCall(node, "blockdev_close",
644 [cf.ToDict() for cf in disks])
647 def call_upload_file(cls, node_list, file_name, address_list=None):
650 The node will refuse the operation in case the file is not on the
653 This is a multi-node call.
655 @type node_list: list
656 @param node_list: the list of node names to upload to
658 @param file_name: the filename to upload
659 @type address_list: list or None
660 @keyword address_list: an optional list of node addresses, in order
661 to optimize the RPC speed
664 data = utils.ReadFile(file_name)
665 st = os.stat(file_name)
666 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
667 st.st_atime, st.st_mtime]
668 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
669 address_list=address_list)
672 def call_write_ssconf_files(cls, node_list, values):
673 """Write ssconf files.
675 This is a multi-node call.
678 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
680 def call_os_diagnose(self, node_list):
681 """Request a diagnose of OS definitions.
683 This is a multi-node call.
686 result = self._MultiNodeCall(node_list, "os_diagnose", [])
688 for node_name, node_result in result.iteritems():
689 if not node_result.failed and node_result.data:
690 node_result.data = [objects.OS.FromDict(oss)
691 for oss in node_result.data]
694 def call_os_get(self, node, name):
695 """Returns an OS definition.
697 This is a single-node call.
700 result = self._SingleNodeCall(node, "os_get", [name])
701 if not result.failed and isinstance(result.data, dict):
702 result.data = objects.OS.FromDict(result.data)
705 def call_hooks_runner(self, node_list, hpath, phase, env):
706 """Call the hooks runner.
709 - op: the OpCode instance
710 - env: a dictionary with the environment
712 This is a multi-node call.
715 params = [hpath, phase, env]
716 return self._MultiNodeCall(node_list, "hooks_runner", params)
718 def call_iallocator_runner(self, node, name, idata):
719 """Call an iallocator on a remote node
722 - name: the iallocator name
723 - input: the json-encoded input string
725 This is a single-node call.
728 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
730 def call_blockdev_grow(self, node, cf_bdev, amount):
731 """Request a snapshot of the given block device.
733 This is a single-node call.
736 return self._SingleNodeCall(node, "blockdev_grow",
737 [cf_bdev.ToDict(), amount])
739 def call_blockdev_snapshot(self, node, cf_bdev):
740 """Request a snapshot of the given block device.
742 This is a single-node call.
745 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
747 def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
749 """Request the export of a given snapshot.
751 This is a single-node call.
754 return self._SingleNodeCall(node, "snapshot_export",
755 [snap_bdev.ToDict(), dest_node,
756 self._InstDict(instance), cluster_name, idx])
758 def call_finalize_export(self, node, instance, snap_disks):
759 """Request the completion of an export operation.
761 This writes the export config file, etc.
763 This is a single-node call.
767 for disk in snap_disks:
768 flat_disks.append(disk.ToDict())
770 return self._SingleNodeCall(node, "finalize_export",
771 [self._InstDict(instance), flat_disks])
773 def call_export_info(self, node, path):
774 """Queries the export information in a given path.
776 This is a single-node call.
779 result = self._SingleNodeCall(node, "export_info", [path])
780 if not result.failed and result.data:
781 result.data = objects.SerializableConfigParser.Loads(str(result.data))
784 def call_instance_os_import(self, node, inst, src_node, src_images,
786 """Request the import of a backup into an instance.
788 This is a single-node call.
791 return self._SingleNodeCall(node, "instance_os_import",
792 [self._InstDict(inst), src_node, src_images,
795 def call_export_list(self, node_list):
796 """Gets the stored exports list.
798 This is a multi-node call.
801 return self._MultiNodeCall(node_list, "export_list", [])
803 def call_export_remove(self, node, export):
804 """Requests removal of a given export.
806 This is a single-node call.
809 return self._SingleNodeCall(node, "export_remove", [export])
812 def call_node_leave_cluster(cls, node):
813 """Requests a node to clean the cluster information it has.
815 This will remove the configuration information from the ganeti data
818 This is a single-node call.
821 return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
823 def call_node_volumes(self, node_list):
824 """Gets all volumes on node(s).
826 This is a multi-node call.
829 return self._MultiNodeCall(node_list, "node_volumes", [])
831 def call_node_demote_from_mc(self, node):
832 """Demote a node from the master candidate role.
834 This is a single-node call.
837 return self._SingleNodeCall(node, "node_demote_from_mc", [])
839 def call_test_delay(self, node_list, duration):
840 """Sleep for a fixed time on given node(s).
842 This is a multi-node call.
845 return self._MultiNodeCall(node_list, "test_delay", [duration])
847 def call_file_storage_dir_create(self, node, file_storage_dir):
848 """Create the given file storage directory.
850 This is a single-node call.
853 return self._SingleNodeCall(node, "file_storage_dir_create",
856 def call_file_storage_dir_remove(self, node, file_storage_dir):
857 """Remove the given file storage directory.
859 This is a single-node call.
862 return self._SingleNodeCall(node, "file_storage_dir_remove",
865 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
866 new_file_storage_dir):
867 """Rename file storage directory.
869 This is a single-node call.
872 return self._SingleNodeCall(node, "file_storage_dir_rename",
873 [old_file_storage_dir, new_file_storage_dir])
876 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
879 This is a multi-node call.
882 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
883 [file_name, content],
884 address_list=address_list)
887 def call_jobqueue_purge(cls, node):
890 This is a single-node call.
893 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
896 def call_jobqueue_rename(cls, node_list, address_list, old, new):
897 """Rename a job queue file.
899 This is a multi-node call.
902 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
903 address_list=address_list)
906 def call_jobqueue_set_drain(cls, node_list, drain_flag):
907 """Set the drain flag on the queue.
909 This is a multi-node call.
911 @type node_list: list
912 @param node_list: the list of nodes to query
913 @type drain_flag: bool
914 @param drain_flag: if True, will set the drain flag, otherwise reset it.
917 return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
920 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
921 """Validate the hypervisor params.
923 This is a multi-node call.
925 @type node_list: list
926 @param node_list: the list of nodes to query
928 @param hvname: the hypervisor name
930 @param hvparams: the hypervisor parameters to be validated
933 cluster = self._cfg.GetClusterInfo()
934 hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
935 return self._MultiNodeCall(node_list, "hypervisor_validate_params",