4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
45 """Node-handling class.
47 For each node that we speak with, we create an instance of this
48 class, so that we have a safe place to store the details of this
52 def __init__(self, parent, node, address=None):
53 """Constructor for the node controller.
55 @type parent: L{Client}
56 @param parent: the C{Client} instance which holds global parameters for
59 @param node: the name of the node we connect to; it is used for error
60 messages and in cases we the address paramater is not passed
62 @keyword address: the node's address, in case we know it, so that we
63 don't need to resolve it; testing shows that httplib has high
64 overhead in resolving addresses (even when speficied in /etc/hosts)
73 self.http_conn = hc = httplib.HTTPConnection(address, parent.port)
76 hc.putrequest('PUT', "/%s" % parent.procedure,
77 skip_accept_encoding=True)
78 hc.putheader('Content-Length', parent.body_length)
82 logging.exception("Error connecting to node %s", node)
85 def GetResponse(self):
86 """Try to process the response from the node.
90 # we already failed in connect
92 resp = self.http_conn.getresponse()
93 if resp.status != 200:
96 length = int(resp.getheader('Content-Length', '0'))
100 logging.error("Zero-length reply from node %s", self.node)
102 payload = resp.read(length)
103 unload = simplejson.loads(payload)
110 This class, given a (remote) method name, a list of parameters and a
111 list of nodes, will contact (in parallel) all nodes, and return a
112 dict of results (key: node name, value: result).
114 One current bug is that generic failure is still signalled by
115 'False' result, which is not good. This overloading of values can
118 @var body_length: cached string value of the length of the body (so that
119 individual C{NodeController} instances don't have to recompute it)
126 def __init__(self, procedure, args):
127 self.port = utils.GetNodeDaemonPort()
128 self.nodepw = utils.GetNodeDaemonPassword()
131 self.procedure = procedure
133 self.body = simplejson.dumps(args)
134 self.body_length = str(len(self.body))
136 #--- generic connector -------------
138 def ConnectList(self, node_list, address_list=None):
139 """Add a list of nodes to the target nodes.
141 @type node_list: list
142 @param node_list: the list of node names to connect
143 @type address_list: list or None
144 @keyword address_list: either None or a list with node addresses,
145 which must have the same length as the node list
148 if address_list is None:
149 address_list = [None for _ in node_list]
151 assert len(node_list) == len(address_list), \
152 "Name and address lists should have the same length"
153 for node, address in zip(node_list, address_list):
154 self.ConnectNode(node, address)
156 def ConnectNode(self, name, address=None):
157 """Add a node to the target list.
160 @param name: the node name
162 @keyword address: the node address, if known
165 self.nc[name] = NodeController(self, name, address)
167 def GetResults(self):
168 """Return the results of the call.
174 """Gather results from the node controllers.
176 This function simply calls GetResponse() for each of our node
180 for node, nc in self.nc.items():
181 self.results[node] = nc.GetResponse()
184 class RpcRunner(object):
185 """RPC runner class"""
187 def __init__(self, cfg):
188 """Initialized the rpc runner.
190 @type cfg: C{config.ConfigWriter}
191 @param cfg: the configuration object that will be used to get data
197 def _InstDict(self, instance):
198 """Convert the given instance to a dict.
200 This is done via the instance's ToDict() method and additionally
201 we fill the hvparams with the cluster defaults.
203 @type instance: L{objects.Instance}
204 @param instance: an Instance object
206 @return: the instance dict, with the hvparams filled with the
210 idict = instance.ToDict()
211 cluster = self._cfg.GetClusterInfo()
212 idict["hvparams"] = cluster.FillHV(instance)
213 idict["beparams"] = cluster.FillBE(instance)
216 def _ConnectList(self, client, node_list):
217 """Helper for computing node addresses.
219 @type client: L{Client}
220 @param client: a C{Client} instance
221 @type node_list: list
222 @param node_list: the node list we should connect
225 all_nodes = self._cfg.GetAllNodesInfo()
227 for node in node_list:
228 if node in all_nodes:
229 val = all_nodes[node].primary_ip
232 addr_list.append(val)
233 client.ConnectList(node_list, address_list=addr_list)
235 def _ConnectNode(self, client, node):
236 """Helper for computing one node's address.
238 @type client: L{Client}
239 @param client: a C{Client} instance
241 @param node: the node we should connect
244 node_info = self._cfg.GetNodeInfo(node)
245 if node_info is not None:
246 addr = node_info.primary_ip
249 client.ConnectNode(node, address=addr)
251 def call_volume_list(self, node_list, vg_name):
252 """Gets the logical volumes present in a given volume group.
254 This is a multi-node call.
257 c = Client("volume_list", [vg_name])
258 self._ConnectList(c, node_list)
260 return c.GetResults()
262 def call_vg_list(self, node_list):
263 """Gets the volume group list.
265 This is a multi-node call.
268 c = Client("vg_list", [])
269 self._ConnectList(c, node_list)
271 return c.GetResults()
273 def call_bridges_exist(self, node, bridges_list):
274 """Checks if a node has all the bridges given.
276 This method checks if all bridges given in the bridges_list are
277 present on the remote node, so that an instance that uses interfaces
278 on those bridges can be started.
280 This is a single-node call.
283 c = Client("bridges_exist", [bridges_list])
284 self._ConnectNode(c, node)
286 return c.GetResults().get(node, False)
288 def call_instance_start(self, node, instance, extra_args):
289 """Starts an instance.
291 This is a single-node call.
294 c = Client("instance_start", [self._InstDict(instance), extra_args])
295 self._ConnectNode(c, node)
297 return c.GetResults().get(node, False)
299 def call_instance_shutdown(self, node, instance):
300 """Stops an instance.
302 This is a single-node call.
305 c = Client("instance_shutdown", [self._InstDict(instance)])
306 self._ConnectNode(c, node)
308 return c.GetResults().get(node, False)
310 def call_instance_migrate(self, node, instance, target, live):
311 """Migrate an instance.
313 This is a single-node call.
316 @param node: the node on which the instance is currently running
317 @type instance: C{objects.Instance}
318 @param instance: the instance definition
320 @param target: the target node name
322 @param live: whether the migration should be done live or not (the
323 interpretation of this parameter is left to the hypervisor)
326 c = Client("instance_migrate", [self._InstDict(instance), target, live])
327 self._ConnectNode(c, node)
329 return c.GetResults().get(node, False)
331 def call_instance_reboot(self, node, instance, reboot_type, extra_args):
332 """Reboots an instance.
334 This is a single-node call.
337 c = Client("instance_reboot", [self._InstDict(instance),
338 reboot_type, extra_args])
339 self._ConnectNode(c, node)
341 return c.GetResults().get(node, False)
343 def call_instance_os_add(self, node, inst):
344 """Installs an OS on the given instance.
346 This is a single-node call.
349 params = [self._InstDict(inst)]
350 c = Client("instance_os_add", params)
351 self._ConnectNode(c, node)
353 return c.GetResults().get(node, False)
355 def call_instance_run_rename(self, node, inst, old_name):
356 """Run the OS rename script for an instance.
358 This is a single-node call.
361 params = [self._InstDict(inst), old_name]
362 c = Client("instance_run_rename", params)
363 self._ConnectNode(c, node)
365 return c.GetResults().get(node, False)
367 def call_instance_info(self, node, instance, hname):
368 """Returns information about a single instance.
370 This is a single-node call.
372 @type node_list: list
373 @param node_list: the list of nodes to query
374 @type instance: string
375 @param instance: the instance name
377 @param hname: the hypervisor type of the instance
380 c = Client("instance_info", [instance, hname])
381 self._ConnectNode(c, node)
383 return c.GetResults().get(node, False)
385 def call_all_instances_info(self, node_list, hypervisor_list):
386 """Returns information about all instances on the given nodes.
388 This is a multi-node call.
390 @type node_list: list
391 @param node_list: the list of nodes to query
392 @type hypervisor_list: list
393 @param hypervisor_list: the hypervisors to query for instances
396 c = Client("all_instances_info", [hypervisor_list])
397 self._ConnectList(c, node_list)
399 return c.GetResults()
401 def call_instance_list(self, node_list, hypervisor_list):
402 """Returns the list of running instances on a given node.
404 This is a multi-node call.
406 @type node_list: list
407 @param node_list: the list of nodes to query
408 @type hypervisor_list: list
409 @param hypervisor_list: the hypervisors to query for instances
412 c = Client("instance_list", [hypervisor_list])
413 self._ConnectList(c, node_list)
415 return c.GetResults()
417 def call_node_tcp_ping(self, node, source, target, port, timeout,
419 """Do a TcpPing on the remote node
421 This is a single-node call.
424 c = Client("node_tcp_ping", [source, target, port, timeout,
426 self._ConnectNode(c, node)
428 return c.GetResults().get(node, False)
430 def call_node_has_ip_address(self, node, address):
431 """Checks if a node has the given IP address.
433 This is a single-node call.
436 c = Client("node_has_ip_address", [address])
437 self._ConnectNode(c, node)
439 return c.GetResults().get(node, False)
441 def call_node_info(self, node_list, vg_name, hypervisor_type):
442 """Return node information.
444 This will return memory information and volume group size and free
447 This is a multi-node call.
449 @type node_list: list
450 @param node_list: the list of nodes to query
451 @type vgname: C{string}
452 @param vgname: the name of the volume group to ask for disk space
454 @type hypervisor_type: C{str}
455 @param hypervisor_type: the name of the hypervisor to ask for
459 c = Client("node_info", [vg_name, hypervisor_type])
460 self._ConnectList(c, node_list)
462 retux = c.GetResults()
464 for node_name in retux:
465 ret = retux.get(node_name, False)
466 if type(ret) != dict:
467 logging.error("could not connect to node %s", node_name)
471 { 'memory_total' : '-',
474 'vg_size' : 'node_unreachable',
480 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
481 """Add a node to the cluster.
483 This is a single-node call.
486 params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
487 c = Client("node_add", params)
488 self._ConnectNode(c, node)
490 return c.GetResults().get(node, False)
492 def call_node_verify(self, node_list, checkdict, cluster_name):
493 """Request verification of given parameters.
495 This is a multi-node call.
498 c = Client("node_verify", [checkdict, cluster_name])
499 self._ConnectList(c, node_list)
501 return c.GetResults()
504 def call_node_start_master(node, start_daemons):
505 """Tells a node to activate itself as a master.
507 This is a single-node call.
510 c = Client("node_start_master", [start_daemons])
513 return c.GetResults().get(node, False)
516 def call_node_stop_master(node, stop_daemons):
517 """Tells a node to demote itself from master status.
519 This is a single-node call.
522 c = Client("node_stop_master", [stop_daemons])
525 return c.GetResults().get(node, False)
528 def call_master_info(node_list):
529 """Query master info.
531 This is a multi-node call.
534 # TODO: should this method query down nodes?
535 c = Client("master_info", [])
536 c.ConnectList(node_list)
538 return c.GetResults()
540 def call_version(self, node_list):
541 """Query node version.
543 This is a multi-node call.
546 c = Client("version", [])
547 self._ConnectList(c, node_list)
549 return c.GetResults()
551 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
552 """Request creation of a given block device.
554 This is a single-node call.
557 params = [bdev.ToDict(), size, owner, on_primary, info]
558 c = Client("blockdev_create", params)
559 self._ConnectNode(c, node)
561 return c.GetResults().get(node, False)
563 def call_blockdev_remove(self, node, bdev):
564 """Request removal of a given block device.
566 This is a single-node call.
569 c = Client("blockdev_remove", [bdev.ToDict()])
570 self._ConnectNode(c, node)
572 return c.GetResults().get(node, False)
574 def call_blockdev_rename(self, node, devlist):
575 """Request rename of the given block devices.
577 This is a single-node call.
580 params = [(d.ToDict(), uid) for d, uid in devlist]
581 c = Client("blockdev_rename", params)
582 self._ConnectNode(c, node)
584 return c.GetResults().get(node, False)
586 def call_blockdev_assemble(self, node, disk, owner, on_primary):
587 """Request assembling of a given block device.
589 This is a single-node call.
592 params = [disk.ToDict(), owner, on_primary]
593 c = Client("blockdev_assemble", params)
594 self._ConnectNode(c, node)
596 return c.GetResults().get(node, False)
598 def call_blockdev_shutdown(self, node, disk):
599 """Request shutdown of a given block device.
601 This is a single-node call.
604 c = Client("blockdev_shutdown", [disk.ToDict()])
605 self._ConnectNode(c, node)
607 return c.GetResults().get(node, False)
609 def call_blockdev_addchildren(self, node, bdev, ndevs):
610 """Request adding a list of children to a (mirroring) device.
612 This is a single-node call.
615 params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
616 c = Client("blockdev_addchildren", params)
617 self._ConnectNode(c, node)
619 return c.GetResults().get(node, False)
621 def call_blockdev_removechildren(self, node, bdev, ndevs):
622 """Request removing a list of children from a (mirroring) device.
624 This is a single-node call.
627 params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
628 c = Client("blockdev_removechildren", params)
629 self._ConnectNode(c, node)
631 return c.GetResults().get(node, False)
633 def call_blockdev_getmirrorstatus(self, node, disks):
634 """Request status of a (mirroring) device.
636 This is a single-node call.
639 params = [dsk.ToDict() for dsk in disks]
640 c = Client("blockdev_getmirrorstatus", params)
641 self._ConnectNode(c, node)
643 return c.GetResults().get(node, False)
645 def call_blockdev_find(self, node, disk):
646 """Request identification of a given block device.
648 This is a single-node call.
651 c = Client("blockdev_find", [disk.ToDict()])
652 self._ConnectNode(c, node)
654 return c.GetResults().get(node, False)
656 def call_blockdev_close(self, node, disks):
657 """Closes the given block devices.
659 This is a single-node call.
662 params = [cf.ToDict() for cf in disks]
663 c = Client("blockdev_close", params)
664 self._ConnectNode(c, node)
666 return c.GetResults().get(node, False)
669 def call_upload_file(node_list, file_name, address_list=None):
672 The node will refuse the operation in case the file is not on the
675 This is a multi-node call.
677 @type node_list: list
678 @param node_list: the list of node names to upload to
680 @param file_name: the filename to upload
681 @type address_list: list or None
682 @keyword address_list: an optional list of node addresses, in order
683 to optimize the RPC speed
691 st = os.stat(file_name)
692 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
693 st.st_atime, st.st_mtime]
694 c = Client("upload_file", params)
695 c.ConnectList(node_list, address_list=address_list)
697 return c.GetResults()
699 def call_os_diagnose(self, node_list):
700 """Request a diagnose of OS definitions.
702 This is a multi-node call.
705 c = Client("os_diagnose", [])
706 self._ConnectList(c, node_list)
708 result = c.GetResults()
710 for node_name in result:
711 if result[node_name]:
712 nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
715 new_result[node_name] = nr
718 def call_os_get(self, node, name):
719 """Returns an OS definition.
721 This is a single-node call.
724 c = Client("os_get", [name])
725 self._ConnectNode(c, node)
727 result = c.GetResults().get(node, False)
728 if isinstance(result, dict):
729 return objects.OS.FromDict(result)
733 def call_hooks_runner(self, node_list, hpath, phase, env):
734 """Call the hooks runner.
737 - op: the OpCode instance
738 - env: a dictionary with the environment
740 This is a multi-node call.
743 params = [hpath, phase, env]
744 c = Client("hooks_runner", params)
745 self._ConnectList(c, node_list)
747 result = c.GetResults()
750 def call_iallocator_runner(self, node, name, idata):
751 """Call an iallocator on a remote node
754 - name: the iallocator name
755 - input: the json-encoded input string
757 This is a single-node call.
760 params = [name, idata]
761 c = Client("iallocator_runner", params)
762 self._ConnectNode(c, node)
764 result = c.GetResults().get(node, False)
767 def call_blockdev_grow(self, node, cf_bdev, amount):
768 """Request a snapshot of the given block device.
770 This is a single-node call.
773 c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
774 self._ConnectNode(c, node)
776 return c.GetResults().get(node, False)
778 def call_blockdev_snapshot(self, node, cf_bdev):
779 """Request a snapshot of the given block device.
781 This is a single-node call.
784 c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
785 self._ConnectNode(c, node)
787 return c.GetResults().get(node, False)
789 def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
791 """Request the export of a given snapshot.
793 This is a single-node call.
796 params = [snap_bdev.ToDict(), dest_node,
797 self._InstDict(instance), cluster_name]
798 c = Client("snapshot_export", params)
799 self._ConnectNode(c, node)
801 return c.GetResults().get(node, False)
803 def call_finalize_export(self, node, instance, snap_disks):
804 """Request the completion of an export operation.
806 This writes the export config file, etc.
808 This is a single-node call.
812 for disk in snap_disks:
813 flat_disks.append(disk.ToDict())
814 params = [self._InstDict(instance), flat_disks]
815 c = Client("finalize_export", params)
816 self._ConnectNode(c, node)
818 return c.GetResults().get(node, False)
820 def call_export_info(self, node, path):
821 """Queries the export information in a given path.
823 This is a single-node call.
826 c = Client("export_info", [path])
827 self._ConnectNode(c, node)
829 result = c.GetResults().get(node, False)
832 return objects.SerializableConfigParser.Loads(str(result))
834 def call_instance_os_import(self, node, inst, src_node, src_images,
836 """Request the import of a backup into an instance.
838 This is a single-node call.
841 params = [self._InstDict(inst), src_node, src_images, cluster_name]
842 c = Client("instance_os_import", params)
843 self._ConnectNode(c, node)
845 return c.GetResults().get(node, False)
847 def call_export_list(self, node_list):
848 """Gets the stored exports list.
850 This is a multi-node call.
853 c = Client("export_list", [])
854 self._ConnectList(c, node_list)
856 result = c.GetResults()
859 def call_export_remove(self, node, export):
860 """Requests removal of a given export.
862 This is a single-node call.
865 c = Client("export_remove", [export])
866 self._ConnectNode(c, node)
868 return c.GetResults().get(node, False)
871 def call_node_leave_cluster(node):
872 """Requests a node to clean the cluster information it has.
874 This will remove the configuration information from the ganeti data
877 This is a single-node call.
880 c = Client("node_leave_cluster", [])
883 return c.GetResults().get(node, False)
885 def call_node_volumes(self, node_list):
886 """Gets all volumes on node(s).
888 This is a multi-node call.
891 c = Client("node_volumes", [])
892 self._ConnectList(c, node_list)
894 return c.GetResults()
896 def call_test_delay(self, node_list, duration):
897 """Sleep for a fixed time on given node(s).
899 This is a multi-node call.
902 c = Client("test_delay", [duration])
903 self._ConnectList(c, node_list)
905 return c.GetResults()
907 def call_file_storage_dir_create(self, node, file_storage_dir):
908 """Create the given file storage directory.
910 This is a single-node call.
913 c = Client("file_storage_dir_create", [file_storage_dir])
914 self._ConnectNode(c, node)
916 return c.GetResults().get(node, False)
918 def call_file_storage_dir_remove(self, node, file_storage_dir):
919 """Remove the given file storage directory.
921 This is a single-node call.
924 c = Client("file_storage_dir_remove", [file_storage_dir])
925 self._ConnectNode(c, node)
927 return c.GetResults().get(node, False)
929 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
930 new_file_storage_dir):
931 """Rename file storage directory.
933 This is a single-node call.
936 c = Client("file_storage_dir_rename",
937 [old_file_storage_dir, new_file_storage_dir])
938 self._ConnectNode(c, node)
940 return c.GetResults().get(node, False)
943 def call_jobqueue_update(node_list, address_list, file_name, content):
946 This is a multi-node call.
949 c = Client("jobqueue_update", [file_name, content])
950 c.ConnectList(node_list, address_list=address_list)
952 result = c.GetResults()
956 def call_jobqueue_purge(node):
959 This is a single-node call.
962 c = Client("jobqueue_purge", [])
965 return c.GetResults().get(node, False)
968 def call_jobqueue_rename(node_list, address_list, old, new):
969 """Rename a job queue file.
971 This is a multi-node call.
974 c = Client("jobqueue_rename", [old, new])
975 c.ConnectList(node_list, address_list=address_list)
977 result = c.GetResults()
982 def call_jobqueue_set_drain(node_list, drain_flag):
983 """Set the drain flag on the queue.
985 This is a multi-node call.
987 @type node_list: list
988 @param node_list: the list of nodes to query
989 @type drain_flag: bool
990 @param drain_flag: if True, will set the drain flag, otherwise reset it.
993 c = Client("jobqueue_set_drain", [drain_flag])
994 c.ConnectList(node_list)
996 result = c.GetResults()
1000 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1001 """Validate the hypervisor params.
1003 This is a multi-node call.
1005 @type node_list: list
1006 @param node_list: the list of nodes to query
1007 @type hvname: string
1008 @param hvname: the hypervisor name
1009 @type hvparams: dict
1010 @param hvparams: the hypervisor parameters to be validated
1013 cluster = self._cfg.GetClusterInfo()
1014 hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1015 c = Client("hypervisor_validate_params", [hvname, hv_full])
1016 self._ConnectList(c, node_list)
1018 result = c.GetResults()