4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script to show add a new node to the cluster
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
39 from ganeti import logger
40 from ganeti import utils
41 from ganeti import objects
45 """Node-handling class.
47 For each node that we speak with, we create an instance of this
48 class, so that we have a safe place to store the details of this
52 def __init__(self, parent, node):
57 self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
60 hc.putrequest('PUT', "/%s" % self.parent.procedure,
61 skip_accept_encoding=True)
62 hc.putheader('Content-Length', str(len(parent.body)))
65 except socket.error, err:
66 logger.Error("Error connecting to %s: %s" % (node, str(err)))
69 def get_response(self):
70 """Try to process the response from the node.
74 # we already failed in connect
76 resp = self.http_conn.getresponse()
77 if resp.status != 200:
80 length = int(resp.getheader('Content-Length', '0'))
84 logger.Error("Zero-length reply from %s" % self.node)
86 payload = resp.read(length)
87 unload = simplejson.loads(payload)
94 This class, given a (remote) method name, a list of parameters and a
95 list of nodes, will contact (in parallel) all nodes, and return a
96 dict of results (key: node name, value: result).
98 One current bug is that generic failure is still signalled by
99 'False' result, which is not good. This overloading of values can
107 def __init__(self, procedure, args):
108 self.port = utils.GetNodeDaemonPort()
109 self.nodepw = utils.GetNodeDaemonPassword()
112 self.procedure = procedure
114 self.body = simplejson.dumps(args)
116 #--- generic connector -------------
118 def connect_list(self, node_list):
119 """Add a list of nodes to the target nodes.
122 for node in node_list:
125 def connect(self, connect_node):
126 """Add a node to the target list.
129 self.nc[connect_node] = nc = NodeController(self, connect_node)
132 """Return the results of the call.
138 """Wrapper over reactor.run().
140 This function simply calls reactor.run() if we have any requests
141 queued, otherwise it does nothing.
144 for node, nc in self.nc.items():
145 self.results[node] = nc.get_response()
148 class RpcRunner(object):
149 """RPC runner class"""
151 def __init__(self, cfg):
152 """Initialized the rpc runner.
154 @type cfg: C{config.ConfigWriter}
155 @param cfg: the configuration object that will be used to get data
161 def call_volume_list(self, node_list, vg_name):
162 """Gets the logical volumes present in a given volume group.
164 This is a multi-node call.
167 c = Client("volume_list", [vg_name])
168 c.connect_list(node_list)
172 def call_vg_list(self, node_list):
173 """Gets the volume group list.
175 This is a multi-node call.
178 c = Client("vg_list", [])
179 c.connect_list(node_list)
184 def call_bridges_exist(self, node, bridges_list):
185 """Checks if a node has all the bridges given.
187 This method checks if all bridges given in the bridges_list are
188 present on the remote node, so that an instance that uses interfaces
189 on those bridges can be started.
191 This is a single-node call.
194 c = Client("bridges_exist", [bridges_list])
197 return c.getresult().get(node, False)
200 def call_instance_start(self, node, instance, extra_args):
201 """Starts an instance.
203 This is a single-node call.
206 c = Client("instance_start", [instance.ToDict(), extra_args])
209 return c.getresult().get(node, False)
212 def call_instance_shutdown(self, node, instance):
213 """Stops an instance.
215 This is a single-node call.
218 c = Client("instance_shutdown", [instance.ToDict()])
221 return c.getresult().get(node, False)
224 def call_instance_migrate(self, node, instance, target, live):
225 """Migrate an instance.
227 This is a single-node call.
230 @param node: the node on which the instance is currently running
231 @type instance: C{objects.Instance}
232 @param instance: the instance definition
234 @param target: the target node name
236 @param live: whether the migration should be done live or not (the
237 interpretation of this parameter is left to the hypervisor)
240 c = Client("instance_migrate", [instance.ToDict(), target, live])
243 return c.getresult().get(node, False)
246 def call_instance_reboot(self, node, instance, reboot_type, extra_args):
247 """Reboots an instance.
249 This is a single-node call.
252 c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
255 return c.getresult().get(node, False)
258 def call_instance_os_add(self, node, inst, osdev, swapdev):
259 """Installs an OS on the given instance.
261 This is a single-node call.
264 params = [inst.ToDict(), osdev, swapdev]
265 c = Client("instance_os_add", params)
268 return c.getresult().get(node, False)
271 def call_instance_run_rename(self, node, inst, old_name, osdev, swapdev):
272 """Run the OS rename script for an instance.
274 This is a single-node call.
277 params = [inst.ToDict(), old_name, osdev, swapdev]
278 c = Client("instance_run_rename", params)
281 return c.getresult().get(node, False)
284 def call_instance_info(self, node, instance, hname):
285 """Returns information about a single instance.
287 This is a single-node call.
289 @type node_list: list
290 @param node_list: the list of nodes to query
291 @type instance: string
292 @param instance: the instance name
294 @param hname: the hypervisor type of the instance
297 c = Client("instance_info", [instance])
300 return c.getresult().get(node, False)
303 def call_all_instances_info(self, node_list, hypervisor_list):
304 """Returns information about all instances on the given nodes.
306 This is a multi-node call.
308 @type node_list: list
309 @param node_list: the list of nodes to query
310 @type hypervisor_list: list
311 @param hypervisor_list: the hypervisors to query for instances
314 c = Client("all_instances_info", [hypervisor_list])
315 c.connect_list(node_list)
320 def call_instance_list(self, node_list, hypervisor_list):
321 """Returns the list of running instances on a given node.
323 This is a multi-node call.
325 @type node_list: list
326 @param node_list: the list of nodes to query
327 @type hypervisor_list: list
328 @param hypervisor_list: the hypervisors to query for instances
331 c = Client("instance_list", [hypervisor_list])
332 c.connect_list(node_list)
337 def call_node_tcp_ping(self, node, source, target, port, timeout,
339 """Do a TcpPing on the remote node
341 This is a single-node call.
343 c = Client("node_tcp_ping", [source, target, port, timeout,
347 return c.getresult().get(node, False)
350 def call_node_info(self, node_list, vg_name, hypervisor_type):
351 """Return node information.
353 This will return memory information and volume group size and free
356 This is a multi-node call.
358 @type node_list: list
359 @param node_list: the list of nodes to query
360 @type vgname: C{string}
361 @param vgname: the name of the volume group to ask for disk space
363 @type hypervisor_type: C{str}
364 @param hypervisor_type: the name of the hypervisor to ask for
368 c = Client("node_info", [vg_name, hypervisor_type])
369 c.connect_list(node_list)
371 retux = c.getresult()
373 for node_name in retux:
374 ret = retux.get(node_name, False)
375 if type(ret) != dict:
376 logger.Error("could not connect to node %s" % (node_name))
380 { 'memory_total' : '-',
383 'vg_size' : 'node_unreachable',
390 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
391 """Add a node to the cluster.
393 This is a single-node call.
396 params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
397 c = Client("node_add", params)
400 return c.getresult().get(node, False)
403 def call_node_verify(self, node_list, checkdict, cluster_name):
404 """Request verification of given parameters.
406 This is a multi-node call.
409 c = Client("node_verify", [checkdict, cluster_name])
410 c.connect_list(node_list)
416 def call_node_start_master(node, start_daemons):
417 """Tells a node to activate itself as a master.
419 This is a single-node call.
422 c = Client("node_start_master", [start_daemons])
425 return c.getresult().get(node, False)
429 def call_node_stop_master(node, stop_daemons):
430 """Tells a node to demote itself from master status.
432 This is a single-node call.
435 c = Client("node_stop_master", [stop_daemons])
438 return c.getresult().get(node, False)
442 def call_master_info(node_list):
443 """Query master info.
445 This is a multi-node call.
448 # TODO: should this method query down nodes?
449 c = Client("master_info", [])
450 c.connect_list(node_list)
455 def call_version(self, node_list):
456 """Query node version.
458 This is a multi-node call.
461 c = Client("version", [])
462 c.connect_list(node_list)
467 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
468 """Request creation of a given block device.
470 This is a single-node call.
473 params = [bdev.ToDict(), size, owner, on_primary, info]
474 c = Client("blockdev_create", params)
477 return c.getresult().get(node, False)
480 def call_blockdev_remove(self, node, bdev):
481 """Request removal of a given block device.
483 This is a single-node call.
486 c = Client("blockdev_remove", [bdev.ToDict()])
489 return c.getresult().get(node, False)
492 def call_blockdev_rename(self, node, devlist):
493 """Request rename of the given block devices.
495 This is a single-node call.
498 params = [(d.ToDict(), uid) for d, uid in devlist]
499 c = Client("blockdev_rename", params)
502 return c.getresult().get(node, False)
505 def call_blockdev_assemble(self, node, disk, owner, on_primary):
506 """Request assembling of a given block device.
508 This is a single-node call.
511 params = [disk.ToDict(), owner, on_primary]
512 c = Client("blockdev_assemble", params)
515 return c.getresult().get(node, False)
518 def call_blockdev_shutdown(self, node, disk):
519 """Request shutdown of a given block device.
521 This is a single-node call.
524 c = Client("blockdev_shutdown", [disk.ToDict()])
527 return c.getresult().get(node, False)
530 def call_blockdev_addchildren(self, node, bdev, ndevs):
531 """Request adding a list of children to a (mirroring) device.
533 This is a single-node call.
536 params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
537 c = Client("blockdev_addchildren", params)
540 return c.getresult().get(node, False)
543 def call_blockdev_removechildren(self, node, bdev, ndevs):
544 """Request removing a list of children from a (mirroring) device.
546 This is a single-node call.
549 params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
550 c = Client("blockdev_removechildren", params)
553 return c.getresult().get(node, False)
556 def call_blockdev_getmirrorstatus(self, node, disks):
557 """Request status of a (mirroring) device.
559 This is a single-node call.
562 params = [dsk.ToDict() for dsk in disks]
563 c = Client("blockdev_getmirrorstatus", params)
566 return c.getresult().get(node, False)
569 def call_blockdev_find(self, node, disk):
570 """Request identification of a given block device.
572 This is a single-node call.
575 c = Client("blockdev_find", [disk.ToDict()])
578 return c.getresult().get(node, False)
581 def call_blockdev_close(self, node, disks):
582 """Closes the given block devices.
584 This is a single-node call.
587 params = [cf.ToDict() for cf in disks]
588 c = Client("blockdev_close", params)
591 return c.getresult().get(node, False)
595 def call_upload_file(node_list, file_name):
598 The node will refuse the operation in case the file is not on the
601 This is a multi-node call.
609 st = os.stat(file_name)
610 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
611 st.st_atime, st.st_mtime]
612 c = Client("upload_file", params)
613 c.connect_list(node_list)
618 def call_upload_file(node_list, file_name):
621 The node will refuse the operation in case the file is not on the
624 This is a multi-node call.
632 st = os.stat(file_name)
633 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
634 st.st_atime, st.st_mtime]
635 c = Client("upload_file", params)
636 c.connect_list(node_list)
640 def call_os_diagnose(self, node_list):
641 """Request a diagnose of OS definitions.
643 This is a multi-node call.
646 c = Client("os_diagnose", [])
647 c.connect_list(node_list)
649 result = c.getresult()
651 for node_name in result:
652 if result[node_name]:
653 nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
656 new_result[node_name] = nr
660 def call_os_get(self, node, name):
661 """Returns an OS definition.
663 This is a single-node call.
666 c = Client("os_get", [name])
669 result = c.getresult().get(node, False)
670 if isinstance(result, dict):
671 return objects.OS.FromDict(result)
676 def call_hooks_runner(self, node_list, hpath, phase, env):
677 """Call the hooks runner.
680 - op: the OpCode instance
681 - env: a dictionary with the environment
683 This is a multi-node call.
686 params = [hpath, phase, env]
687 c = Client("hooks_runner", params)
688 c.connect_list(node_list)
690 result = c.getresult()
694 def call_iallocator_runner(self, node, name, idata):
695 """Call an iallocator on a remote node
698 - name: the iallocator name
699 - input: the json-encoded input string
701 This is a single-node call.
704 params = [name, idata]
705 c = Client("iallocator_runner", params)
708 result = c.getresult().get(node, False)
712 def call_blockdev_grow(self, node, cf_bdev, amount):
713 """Request a snapshot of the given block device.
715 This is a single-node call.
718 c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
721 return c.getresult().get(node, False)
724 def call_blockdev_snapshot(self, node, cf_bdev):
725 """Request a snapshot of the given block device.
727 This is a single-node call.
730 c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
733 return c.getresult().get(node, False)
736 def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
738 """Request the export of a given snapshot.
740 This is a single-node call.
743 params = [snap_bdev.ToDict(), dest_node, instance.ToDict(), cluster_name]
744 c = Client("snapshot_export", params)
747 return c.getresult().get(node, False)
750 def call_finalize_export(self, node, instance, snap_disks):
751 """Request the completion of an export operation.
753 This writes the export config file, etc.
755 This is a single-node call.
759 for disk in snap_disks:
760 flat_disks.append(disk.ToDict())
761 params = [instance.ToDict(), flat_disks]
762 c = Client("finalize_export", params)
765 return c.getresult().get(node, False)
768 def call_export_info(self, node, path):
769 """Queries the export information in a given path.
771 This is a single-node call.
774 c = Client("export_info", [path])
777 result = c.getresult().get(node, False)
780 return objects.SerializableConfigParser.Loads(str(result))
783 def call_instance_os_import(self, node, inst, osdev, swapdev,
784 src_node, src_image, cluster_name):
785 """Request the import of a backup into an instance.
787 This is a single-node call.
790 params = [inst.ToDict(), osdev, swapdev, src_node, src_image, cluster_name]
791 c = Client("instance_os_import", params)
794 return c.getresult().get(node, False)
797 def call_export_list(self, node_list):
798 """Gets the stored exports list.
800 This is a multi-node call.
803 c = Client("export_list", [])
804 c.connect_list(node_list)
806 result = c.getresult()
810 def call_export_remove(self, node, export):
811 """Requests removal of a given export.
813 This is a single-node call.
816 c = Client("export_remove", [export])
819 return c.getresult().get(node, False)
822 def call_node_leave_cluster(self, node):
823 """Requests a node to clean the cluster information it has.
825 This will remove the configuration information from the ganeti data
828 This is a single-node call.
831 c = Client("node_leave_cluster", [])
834 return c.getresult().get(node, False)
837 def call_node_volumes(self, node_list):
838 """Gets all volumes on node(s).
840 This is a multi-node call.
843 c = Client("node_volumes", [])
844 c.connect_list(node_list)
849 def call_test_delay(self, node_list, duration):
850 """Sleep for a fixed time on given node(s).
852 This is a multi-node call.
855 c = Client("test_delay", [duration])
856 c.connect_list(node_list)
861 def call_file_storage_dir_create(self, node, file_storage_dir):
862 """Create the given file storage directory.
864 This is a single-node call.
867 c = Client("file_storage_dir_create", [file_storage_dir])
870 return c.getresult().get(node, False)
873 def call_file_storage_dir_remove(self, node, file_storage_dir):
874 """Remove the given file storage directory.
876 This is a single-node call.
879 c = Client("file_storage_dir_remove", [file_storage_dir])
882 return c.getresult().get(node, False)
885 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
886 new_file_storage_dir):
887 """Rename file storage directory.
889 This is a single-node call.
892 c = Client("file_storage_dir_rename",
893 [old_file_storage_dir, new_file_storage_dir])
896 return c.getresult().get(node, False)
900 def call_jobqueue_update(node_list, file_name, content):
903 This is a multi-node call.
906 c = Client("jobqueue_update", [file_name, content])
907 c.connect_list(node_list)
909 result = c.getresult()
914 def call_jobqueue_purge(node):
917 This is a single-node call.
920 c = Client("jobqueue_purge", [])
923 return c.getresult().get(node, False)
927 def call_jobqueue_rename(node_list, old, new):
928 """Rename a job queue file.
930 This is a multi-node call.
933 c = Client("jobqueue_rename", [old, new])
934 c.connect_list(node_list)
936 result = c.getresult()