4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script to show add a new node to the cluster
26 # pylint: disable-msg=C0103
34 from ganeti import logger
35 from ganeti import utils
36 from ganeti import objects
40 """Node-handling class.
42 For each node that we speak with, we create an instance of this
43 class, so that we have a safe place to store the details of this
47 def __init__(self, parent, node):
52 self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
55 hc.putrequest('PUT', "/%s" % self.parent.procedure,
56 skip_accept_encoding=True)
57 hc.putheader('Content-Length', str(len(parent.body)))
60 except socket.error, err:
61 logger.Error("Error connecting to %s: %s" % (node, str(err)))
64 def get_response(self):
65 """Try to process the response from the node.
69 # we already failed in connect
71 resp = self.http_conn.getresponse()
72 if resp.status != 200:
75 length = int(resp.getheader('Content-Length', '0'))
79 logger.Error("Zero-length reply from %s" % self.node)
81 payload = resp.read(length)
82 unload = simplejson.loads(payload)
89 This class, given a (remote) method name, a list of parameters and a
90 list of nodes, will contact (in parallel) all nodes, and return a
91 dict of results (key: node name, value: result).
93 One current bug is that generic failure is still signalled by
94 'False' result, which is not good. This overloading of values can
102 def __init__(self, procedure, args):
103 self.port = utils.GetNodeDaemonPort()
104 self.nodepw = utils.GetNodeDaemonPassword()
107 self.procedure = procedure
109 self.body = simplejson.dumps(args)
111 #--- generic connector -------------
113 def connect_list(self, node_list):
114 """Add a list of nodes to the target nodes.
117 for node in node_list:
120 def connect(self, connect_node):
121 """Add a node to the target list.
124 self.nc[connect_node] = nc = NodeController(self, connect_node)
127 """Return the results of the call.
133 """Wrapper over reactor.run().
135 This function simply calls reactor.run() if we have any requests
136 queued, otherwise it does nothing.
139 for node, nc in self.nc.items():
140 self.results[node] = nc.get_response()
143 def call_volume_list(node_list, vg_name):
144 """Gets the logical volumes present in a given volume group.
146 This is a multi-node call.
149 c = Client("volume_list", [vg_name])
150 c.connect_list(node_list)
155 def call_vg_list(node_list):
156 """Gets the volume group list.
158 This is a multi-node call.
161 c = Client("vg_list", [])
162 c.connect_list(node_list)
167 def call_bridges_exist(node, bridges_list):
168 """Checks if a node has all the bridges given.
170 This method checks if all bridges given in the bridges_list are
171 present on the remote node, so that an instance that uses interfaces
172 on those bridges can be started.
174 This is a single-node call.
177 c = Client("bridges_exist", [bridges_list])
180 return c.getresult().get(node, False)
183 def call_instance_start(node, instance, extra_args):
184 """Starts an instance.
186 This is a single-node call.
189 c = Client("instance_start", [instance.ToDict(), extra_args])
192 return c.getresult().get(node, False)
195 def call_instance_shutdown(node, instance):
196 """Stops an instance.
198 This is a single-node call.
201 c = Client("instance_shutdown", [instance.ToDict()])
204 return c.getresult().get(node, False)
207 def call_instance_migrate(node, instance, target, live):
208 """Migrate an instance.
210 This is a single-node call.
213 c = Client("instance_migrate", [instance.name, target, live])
216 return c.getresult().get(node, False)
219 def call_instance_reboot(node, instance, reboot_type, extra_args):
220 """Reboots an instance.
222 This is a single-node call.
225 c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
228 return c.getresult().get(node, False)
231 def call_instance_os_add(node, inst, osdev, swapdev):
232 """Installs an OS on the given instance.
234 This is a single-node call.
237 params = [inst.ToDict(), osdev, swapdev]
238 c = Client("instance_os_add", params)
241 return c.getresult().get(node, False)
244 def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
245 """Run the OS rename script for an instance.
247 This is a single-node call.
250 params = [inst.ToDict(), old_name, osdev, swapdev]
251 c = Client("instance_run_rename", params)
254 return c.getresult().get(node, False)
257 def call_instance_info(node, instance):
258 """Returns information about a single instance.
260 This is a single-node call.
263 c = Client("instance_info", [instance])
266 return c.getresult().get(node, False)
269 def call_all_instances_info(node_list):
270 """Returns information about all instances on a given node.
272 This is a single-node call.
275 c = Client("all_instances_info", [])
276 c.connect_list(node_list)
281 def call_instance_list(node_list):
282 """Returns the list of running instances on a given node.
284 This is a single-node call.
287 c = Client("instance_list", [])
288 c.connect_list(node_list)
293 def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
294 """Do a TcpPing on the remote node
296 This is a single-node call.
298 c = Client("node_tcp_ping", [source, target, port, timeout,
302 return c.getresult().get(node, False)
305 def call_node_info(node_list, vg_name):
306 """Return node information.
308 This will return memory information and volume group size and free
311 This is a multi-node call.
314 c = Client("node_info", [vg_name])
315 c.connect_list(node_list)
317 retux = c.getresult()
319 for node_name in retux:
320 ret = retux.get(node_name, False)
321 if type(ret) != dict:
322 logger.Error("could not connect to node %s" % (node_name))
326 { 'memory_total' : '-',
329 'vg_size' : 'node_unreachable',
336 def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
337 """Add a node to the cluster.
339 This is a single-node call.
342 params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
343 c = Client("node_add", params)
346 return c.getresult().get(node, False)
349 def call_node_verify(node_list, checkdict):
350 """Request verification of given parameters.
352 This is a multi-node call.
355 c = Client("node_verify", [checkdict])
356 c.connect_list(node_list)
361 def call_node_start_master(node, start_daemons):
362 """Tells a node to activate itself as a master.
364 This is a single-node call.
367 c = Client("node_start_master", [start_daemons])
370 return c.getresult().get(node, False)
373 def call_node_stop_master(node, stop_daemons):
374 """Tells a node to demote itself from master status.
376 This is a single-node call.
379 c = Client("node_stop_master", [stop_daemons])
382 return c.getresult().get(node, False)
385 def call_master_info(node_list):
386 """Query master info.
388 This is a multi-node call.
391 c = Client("master_info", [])
392 c.connect_list(node_list)
397 def call_version(node_list):
398 """Query node version.
400 This is a multi-node call.
403 c = Client("version", [])
404 c.connect_list(node_list)
409 def call_blockdev_create(node, bdev, size, owner, on_primary, info):
410 """Request creation of a given block device.
412 This is a single-node call.
415 params = [bdev.ToDict(), size, owner, on_primary, info]
416 c = Client("blockdev_create", params)
419 return c.getresult().get(node, False)
422 def call_blockdev_remove(node, bdev):
423 """Request removal of a given block device.
425 This is a single-node call.
428 c = Client("blockdev_remove", [bdev.ToDict()])
431 return c.getresult().get(node, False)
434 def call_blockdev_rename(node, devlist):
435 """Request rename of the given block devices.
437 This is a single-node call.
440 params = [(d.ToDict(), uid) for d, uid in devlist]
441 c = Client("blockdev_rename", params)
444 return c.getresult().get(node, False)
447 def call_blockdev_assemble(node, disk, owner, on_primary):
448 """Request assembling of a given block device.
450 This is a single-node call.
453 params = [disk.ToDict(), owner, on_primary]
454 c = Client("blockdev_assemble", params)
457 return c.getresult().get(node, False)
460 def call_blockdev_shutdown(node, disk):
461 """Request shutdown of a given block device.
463 This is a single-node call.
466 c = Client("blockdev_shutdown", [disk.ToDict()])
469 return c.getresult().get(node, False)
472 def call_blockdev_addchildren(node, bdev, ndevs):
473 """Request adding a list of children to a (mirroring) device.
475 This is a single-node call.
478 params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
479 c = Client("blockdev_addchildren", params)
482 return c.getresult().get(node, False)
485 def call_blockdev_removechildren(node, bdev, ndevs):
486 """Request removing a list of children from a (mirroring) device.
488 This is a single-node call.
491 params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
492 c = Client("blockdev_removechildren", params)
495 return c.getresult().get(node, False)
498 def call_blockdev_getmirrorstatus(node, disks):
499 """Request status of a (mirroring) device.
501 This is a single-node call.
504 params = [dsk.ToDict() for dsk in disks]
505 c = Client("blockdev_getmirrorstatus", params)
508 return c.getresult().get(node, False)
511 def call_blockdev_find(node, disk):
512 """Request identification of a given block device.
514 This is a single-node call.
517 c = Client("blockdev_find", [disk.ToDict()])
520 return c.getresult().get(node, False)
523 def call_blockdev_close(node, disks):
524 """Closes the given block devices.
526 This is a single-node call.
529 params = [cf.ToDict() for cf in disks]
530 c = Client("blockdev_close", params)
533 return c.getresult().get(node, False)
536 def call_upload_file(node_list, file_name):
539 The node will refuse the operation in case the file is not on the
542 This is a multi-node call.
550 st = os.stat(file_name)
551 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
552 st.st_atime, st.st_mtime]
553 c = Client("upload_file", params)
554 c.connect_list(node_list)
559 def call_os_diagnose(node_list):
560 """Request a diagnose of OS definitions.
562 This is a multi-node call.
565 c = Client("os_diagnose", [])
566 c.connect_list(node_list)
568 result = c.getresult()
570 for node_name in result:
571 if result[node_name]:
572 nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
575 new_result[node_name] = nr
579 def call_os_get(node, name):
580 """Returns an OS definition.
582 This is a single-node call.
585 c = Client("os_get", [name])
588 result = c.getresult().get(node, False)
589 if isinstance(result, dict):
590 return objects.OS.FromDict(result)
595 def call_hooks_runner(node_list, hpath, phase, env):
596 """Call the hooks runner.
599 - op: the OpCode instance
600 - env: a dictionary with the environment
602 This is a multi-node call.
605 params = [hpath, phase, env]
606 c = Client("hooks_runner", params)
607 c.connect_list(node_list)
609 result = c.getresult()
613 def call_iallocator_runner(node, name, idata):
614 """Call an iallocator on a remote node
617 - name: the iallocator name
618 - input: the json-encoded input string
620 This is a single-node call.
623 params = [name, idata]
624 c = Client("iallocator_runner", params)
627 result = c.getresult().get(node, False)
631 def call_blockdev_grow(node, cf_bdev, amount):
632 """Request a snapshot of the given block device.
634 This is a single-node call.
637 c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
640 return c.getresult().get(node, False)
643 def call_blockdev_snapshot(node, cf_bdev):
644 """Request a snapshot of the given block device.
646 This is a single-node call.
649 c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
652 return c.getresult().get(node, False)
655 def call_snapshot_export(node, snap_bdev, dest_node, instance):
656 """Request the export of a given snapshot.
658 This is a single-node call.
661 params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
662 c = Client("snapshot_export", params)
665 return c.getresult().get(node, False)
668 def call_finalize_export(node, instance, snap_disks):
669 """Request the completion of an export operation.
671 This writes the export config file, etc.
673 This is a single-node call.
677 for disk in snap_disks:
678 flat_disks.append(disk.ToDict())
679 params = [instance.ToDict(), flat_disks]
680 c = Client("finalize_export", params)
683 return c.getresult().get(node, False)
686 def call_export_info(node, path):
687 """Queries the export information in a given path.
689 This is a single-node call.
692 c = Client("export_info", [path])
695 result = c.getresult().get(node, False)
698 return objects.SerializableConfigParser.Loads(str(result))
701 def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
702 """Request the import of a backup into an instance.
704 This is a single-node call.
707 params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
708 c = Client("instance_os_import", params)
711 return c.getresult().get(node, False)
714 def call_export_list(node_list):
715 """Gets the stored exports list.
717 This is a multi-node call.
720 c = Client("export_list", [])
721 c.connect_list(node_list)
723 result = c.getresult()
727 def call_export_remove(node, export):
728 """Requests removal of a given export.
730 This is a single-node call.
733 c = Client("export_remove", [export])
736 return c.getresult().get(node, False)
739 def call_node_leave_cluster(node):
740 """Requests a node to clean the cluster information it has.
742 This will remove the configuration information from the ganeti data
745 This is a single-node call.
748 c = Client("node_leave_cluster", [])
751 return c.getresult().get(node, False)
754 def call_node_volumes(node_list):
755 """Gets all volumes on node(s).
757 This is a multi-node call.
760 c = Client("node_volumes", [])
761 c.connect_list(node_list)
766 def call_test_delay(node_list, duration):
767 """Sleep for a fixed time on given node(s).
769 This is a multi-node call.
772 c = Client("test_delay", [duration])
773 c.connect_list(node_list)
778 def call_file_storage_dir_create(node, file_storage_dir):
779 """Create the given file storage directory.
781 This is a single-node call.
784 c = Client("file_storage_dir_create", [file_storage_dir])
787 return c.getresult().get(node, False)
790 def call_file_storage_dir_remove(node, file_storage_dir):
791 """Remove the given file storage directory.
793 This is a single-node call.
796 c = Client("file_storage_dir_remove", [file_storage_dir])
799 return c.getresult().get(node, False)
802 def call_file_storage_dir_rename(node, old_file_storage_dir,
803 new_file_storage_dir):
804 """Rename file storage directory.
806 This is a single-node call.
809 c = Client("file_storage_dir_rename",
810 [old_file_storage_dir, new_file_storage_dir])
813 return c.getresult().get(node, False)
816 def call_jobqueue_update(node_list, file_name, content):
819 This is a multi-node call.
822 c = Client("jobqueue_update", [file_name, content])
823 c.connect_list(node_list)
825 result = c.getresult()
829 def call_jobqueue_purge(node):
832 This is a single-node call.
835 c = Client("jobqueue_purge", [])
838 return c.getresult().get(node, False)
841 def call_jobqueue_rename(node_list, old, new):
842 """Rename a job queue file.
844 This is a multi-node call.
847 c = Client("jobqueue_rename", [old, new])
848 c.connect_list(node_list)
850 result = c.getresult()