X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/dcb939711bb4ed751a71f522e41c9af986bfe96e..6510a58aeba9fa860c7bea3ef7554afe97a6b1c1:/lib/rpc.py diff --git a/lib/rpc.py b/lib/rpc.py index ce4d787..e3e5a73 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +# # # Copyright (C) 2006, 2007 Google Inc. @@ -30,8 +30,9 @@ import os from twisted.internet.pollreactor import PollReactor class ReReactor(PollReactor): - """A re-startable Reactor implementation""" + """A re-startable Reactor implementation. + """ def run(self, installSignalHandlers=1): """Custom run method. @@ -90,7 +91,7 @@ class NodeController: reactor.stop() def cb_call(self, obj): - """Callback for successfull connect. + """Callback for successful connect. If the connect and login sequence succeeded, we proceed with making the actual call. @@ -159,8 +160,8 @@ class MirrorContextFactory: finally: fd.close() except EnvironmentError, err: - raise errors.ConfigurationError, ("missing SSL certificate: %s" % - str(err)) + raise errors.ConfigurationError("missing SSL certificate: %s" % + str(err)) self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data) self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert) self.mydigest = self.mycert.digest('SHA') @@ -186,7 +187,7 @@ class MirrorContextFactory: class Client: """RPC Client class. - This class, given a (remote) ethod name, a list of parameters and a + This class, given a (remote) method name, a list of parameters and a list of nodes, will contact (in parallel) all nodes, and return a dict of results (key: node name, value: result). @@ -287,12 +288,12 @@ def call_bridges_exist(node, bridges_list): def call_instance_start(node, instance, extra_args): - """Stars an instance. + """Starts an instance. This is a single-node call. """ - c = Client("instance_start", [instance.Dumps(), extra_args]) + c = Client("instance_start", [instance.ToDict(), extra_args]) c.connect(node) c.run() return c.getresult().get(node, False) @@ -304,7 +305,19 @@ def call_instance_shutdown(node, instance): This is a single-node call. """ - c = Client("instance_shutdown", [instance.Dumps()]) + c = Client("instance_shutdown", [instance.ToDict()]) + c.connect(node) + c.run() + return c.getresult().get(node, False) + + +def call_instance_reboot(node, instance, reboot_type, extra_args): + """Reboots an instance. + + This is a single-node call. + + """ + c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args]) c.connect(node) c.run() return c.getresult().get(node, False) @@ -316,13 +329,26 @@ def call_instance_os_add(node, inst, osdev, swapdev): This is a single-node call. """ - params = [inst.Dumps(), osdev, swapdev] + params = [inst.ToDict(), osdev, swapdev] c = Client("instance_os_add", params) c.connect(node) c.run() return c.getresult().get(node, False) +def call_instance_run_rename(node, inst, old_name, osdev, swapdev): + """Run the OS rename script for an instance. + + This is a single-node call. + + """ + params = [inst.ToDict(), old_name, osdev, swapdev] + c = Client("instance_run_rename", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) + + def call_instance_info(node, instance): """Returns information about a single instance. @@ -359,6 +385,18 @@ def call_instance_list(node_list): return c.getresult() +def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed): + """Do a TcpPing on the remote node + + This is a single-node call. + """ + c = Client("node_tcp_ping", [source, target, port, timeout, + live_port_needed]) + c.connect(node) + c.run() + return c.getresult().get(node, False) + + def call_node_info(node_list, vg_name): """Return node information. @@ -451,49 +489,51 @@ def call_version(node_list): return c.getresult() -def call_configfile_list(node_list): - """Return list of existing configuration files. +def call_blockdev_create(node, bdev, size, owner, on_primary, info): + """Request creation of a given block device. - This is a multi-node call. + This is a single-node call. """ - c = Client("configfile_list", []) - c.connect_list(node_list) + params = [bdev.ToDict(), size, owner, on_primary, info] + c = Client("blockdev_create", params) + c.connect(node) c.run() - return c.getresult() + return c.getresult().get(node, False) -def call_blockdev_create(node, bdev, size, on_primary): - """Request creation of a given block device. + +def call_blockdev_remove(node, bdev): + """Request removal of a given block device. This is a single-node call. """ - params = [bdev.Dumps(), size, on_primary] - c = Client("blockdev_create", params) + c = Client("blockdev_remove", [bdev.ToDict()]) c.connect(node) c.run() return c.getresult().get(node, False) -def call_blockdev_remove(node, bdev): - """Request removal of a given block device. +def call_blockdev_rename(node, devlist): + """Request rename of the given block devices. This is a single-node call. """ - c = Client("blockdev_remove", [bdev.Dumps()]) + params = [(d.ToDict(), uid) for d, uid in devlist] + c = Client("blockdev_rename", params) c.connect(node) c.run() return c.getresult().get(node, False) -def call_blockdev_assemble(node, disk, on_primary): +def call_blockdev_assemble(node, disk, owner, on_primary): """Request assembling of a given block device. This is a single-node call. """ - params = [disk.Dumps(), on_primary] + params = [disk.ToDict(), owner, on_primary] c = Client("blockdev_assemble", params) c.connect(node) c.run() @@ -506,33 +546,33 @@ def call_blockdev_shutdown(node, disk): This is a single-node call. """ - c = Client("blockdev_shutdown", [disk.Dumps()]) + c = Client("blockdev_shutdown", [disk.ToDict()]) c.connect(node) c.run() return c.getresult().get(node, False) -def call_blockdev_addchild(node, bdev, ndev): - """Request adding a new child to a (mirroring) device. +def call_blockdev_addchildren(node, bdev, ndevs): + """Request adding a list of children to a (mirroring) device. This is a single-node call. """ - params = [bdev.Dumps(), ndev.Dumps()] - c = Client("blockdev_addchild", params) + params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]] + c = Client("blockdev_addchildren", params) c.connect(node) c.run() return c.getresult().get(node, False) -def call_blockdev_removechild(node, bdev, ndev): - """Request removing a new child from a (mirroring) device. +def call_blockdev_removechildren(node, bdev, ndevs): + """Request removing a list of children from a (mirroring) device. This is a single-node call. """ - params = [bdev.Dumps(), ndev.Dumps()] - c = Client("blockdev_removechild", params) + params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]] + c = Client("blockdev_removechildren", params) c.connect(node) c.run() return c.getresult().get(node, False) @@ -544,7 +584,7 @@ def call_blockdev_getmirrorstatus(node, disks): This is a single-node call. """ - params = [dsk.Dumps() for dsk in disks] + params = [dsk.ToDict() for dsk in disks] c = Client("blockdev_getmirrorstatus", params) c.connect(node) c.run() @@ -557,7 +597,7 @@ def call_blockdev_find(node, disk): This is a single-node call. """ - c = Client("blockdev_find", [disk.Dumps()]) + c = Client("blockdev_find", [disk.ToDict()]) c.connect(node) c.run() return c.getresult().get(node, False) @@ -598,41 +638,28 @@ def call_os_diagnose(node_list): result = c.getresult() new_result = {} for node_name in result: - nr = [] if result[node_name]: - for data in result[node_name]: - if data: - if isinstance(data, basestring): - nr.append(objects.ConfigObject.Loads(data)) - elif isinstance(data, tuple) and len(data) == 2: - nr.append(errors.InvalidOS(data[0], data[1])) - else: - raise errors.ProgrammerError, ("Invalid data from" - " xcserver.os_diagnose") + nr = [objects.OS.FromDict(oss) for oss in result[node_name]] + else: + nr = [] new_result[node_name] = nr return new_result -def call_os_get(node_list, name): +def call_os_get(node, name): """Returns an OS definition. - This is a multi-node call. + This is a single-node call. """ c = Client("os_get", [name]) - c.connect_list(node_list) + c.connect(node) c.run() - result = c.getresult() - new_result = {} - for node_name in result: - data = result[node_name] - if isinstance(data, basestring): - new_result[node_name] = objects.ConfigObject.Loads(data) - elif isinstance(data, tuple) and len(data) == 2: - new_result[node_name] = errors.InvalidOS(data[0], data[1]) - else: - new_result[node_name] = data - return new_result + result = c.getresult().get(node, False) + if isinstance(result, dict): + return objects.OS.FromDict(result) + else: + return result def call_hooks_runner(node_list, hpath, phase, env): @@ -659,7 +686,7 @@ def call_blockdev_snapshot(node, cf_bdev): This is a single-node call. """ - c = Client("blockdev_snapshot", [cf_bdev.Dumps()]) + c = Client("blockdev_snapshot", [cf_bdev.ToDict()]) c.connect(node) c.run() return c.getresult().get(node, False) @@ -671,7 +698,7 @@ def call_snapshot_export(node, snap_bdev, dest_node, instance): This is a single-node call. """ - params = [snap_bdev.Dumps(), dest_node, instance.Dumps()] + params = [snap_bdev.ToDict(), dest_node, instance.ToDict()] c = Client("snapshot_export", params) c.connect(node) c.run() @@ -688,8 +715,8 @@ def call_finalize_export(node, instance, snap_disks): """ flat_disks = [] for disk in snap_disks: - flat_disks.append(disk.Dumps()) - params = [instance.Dumps(), flat_disks] + flat_disks.append(disk.ToDict()) + params = [instance.ToDict(), flat_disks] c = Client("finalize_export", params) c.connect(node) c.run() @@ -717,7 +744,7 @@ def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image): This is a single-node call. """ - params = [inst.Dumps(), osdev, swapdev, src_node, src_image] + params = [inst.ToDict(), osdev, swapdev, src_node, src_image] c = Client("instance_os_import", params) c.connect(node) c.run()