X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/06009e2796bd5bcefdbe5e666b1dc12d9f3fe50f..a237d0a89f861e284cd64c7eff3df1ef957a0dcc:/lib/rpc.py diff --git a/lib/rpc.py b/lib/rpc.py index 7a12107..f2c85db 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -26,43 +26,10 @@ # pylint: disable-msg=C0103 import os +import socket +import httplib -from twisted.internet.pollreactor import PollReactor - -class ReReactor(PollReactor): - """A re-startable Reactor implementation. - - """ - def run(self, installSignalHandlers=1): - """Custom run method. - - This is customized run that, before calling Reactor.run, will - reinstall the shutdown events and re-create the threadpool in case - these are not present (as will happen on the second run of the - reactor). - - """ - if not 'shutdown' in self._eventTriggers: - # the shutdown queue has been killed, we are most probably - # at the second run, thus recreate the queue - self.addSystemEventTrigger('during', 'shutdown', self.crash) - self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll) - if self.threadpool is not None and self.threadpool.joined == 1: - # in case the threadpool has been stopped, re-start it - # and add a trigger to stop it at reactor shutdown - self.threadpool.start() - self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop) - - return PollReactor.run(self, installSignalHandlers) - - -import twisted.internet.main -twisted.internet.main.installReactor(ReReactor()) - -from twisted.spread import pb -from twisted.internet import reactor -from twisted.cred import credentials -from OpenSSL import SSL, crypto +import simplejson from ganeti import logger from ganeti import utils @@ -71,6 +38,7 @@ from ganeti import constants from ganeti import objects from ganeti import ssconf + class NodeController: """Node-handling class. @@ -82,107 +50,41 @@ class NodeController: def __init__(self, parent, node): self.parent = parent self.node = node + self.failed = False - def _check_end(self): - """Stop the reactor if we got all the results. - - """ - if len(self.parent.results) == len(self.parent.nc): - reactor.stop() - - def cb_call(self, obj): - """Callback for successful connect. - - If the connect and login sequence succeeded, we proceed with - making the actual call. - - """ - deferred = obj.callRemote(self.parent.procedure, self.parent.args) - deferred.addCallbacks(self.cb_done, self.cb_err2) - - def cb_done(self, result): - """Callback for successful call. - - When we receive the result from a call, we check if it was an - error and if so we raise a generic RemoteError (we can't pass yet - the actual exception over). If there was no error, we store the - result. - - """ - tb, self.parent.results[self.node] = result - self._check_end() - if tb: - raise errors.RemoteError("Remote procedure error calling %s on %s:" - "\n%s" % (self.parent.procedure, - self.node, - tb)) - - def cb_err1(self, reason): - """Error callback for unsuccessful connect. - - """ - logger.Error("caller_connect: could not connect to remote host %s," - " reason %s" % (self.node, reason)) - self.parent.results[self.node] = False - self._check_end() - - def cb_err2(self, reason): - """Error callback for unsuccessful call. - - This is when the call didn't return anything, not even an error, - or when it time out, etc. - - """ - logger.Error("caller_call: could not call %s on node %s," - " reason %s" % (self.parent.procedure, self.node, reason)) - self.parent.results[self.node] = False - self._check_end() - - -class MirrorContextFactory: - """Certificate verifier factory. - - This factory creates contexts that verify if the remote end has a - specific certificate (i.e. our own certificate). - - The checks we do are that the PEM dump of the certificate is the - same as our own and (somewhat redundantly) that the SHA checksum is - the same. - - """ - isClient = 1 - - def __init__(self): + self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port) try: - fd = open(constants.SSL_CERT_FILE, 'r') - try: - data = fd.read(16384) - finally: - fd.close() - except EnvironmentError, err: - raise errors.ConfigurationError("missing SSL certificate: %s" % - str(err)) - self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data) - self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert) - self.mydigest = self.mycert.digest('SHA') - - def verifier(self, conn, x509, errno, err_depth, retcode): - """Certificate verify method. + hc.connect() + hc.putrequest('PUT', "/%s" % self.parent.procedure, + skip_accept_encoding=True) + hc.putheader('Content-Length', str(len(parent.body))) + hc.endheaders() + hc.send(parent.body) + except socket.error, err: + logger.Error("Error connecting to %s: %s" % (node, str(err))) + self.failed = True + + def get_response(self): + """Try to process the response from the node. """ - if self.mydigest != x509.digest('SHA'): + if self.failed: + # we already failed in connect return False - if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem: + resp = self.http_conn.getresponse() + if resp.status != 200: return False - return True - - def getContext(self): - """Context generator. + try: + length = int(resp.getheader('Content-Length', '0')) + except ValueError: + return False + if not length: + logger.Error("Zero-length reply from %s" % self.node) + return False + payload = resp.read(length) + unload = simplejson.loads(payload) + return unload - """ - context = SSL.Context(SSL.TLSv1_METHOD) - context.set_verify(SSL.VERIFY_PEER, self.verifier) - return context class Client: """RPC Client class. @@ -208,6 +110,7 @@ class Client: self.results = {} self.procedure = procedure self.args = args + self.body = simplejson.dumps(args) #--- generic connector ------------- @@ -222,13 +125,7 @@ class Client: """Add a node to the target list. """ - factory = pb.PBClientFactory() self.nc[connect_node] = nc = NodeController(self, connect_node) - reactor.connectSSL(connect_node, self.port, factory, - MirrorContextFactory()) - #d = factory.getRootObject() - d = factory.login(credentials.UsernamePassword("master_node", self.nodepw)) - d.addCallbacks(nc.cb_call, nc.cb_err1) def getresult(self): """Return the results of the call. @@ -243,8 +140,8 @@ class Client: queued, otherwise it does nothing. """ - if self.nc: - reactor.run() + for node, nc in self.nc.items(): + self.results[node] = nc.get_response() def call_volume_list(node_list, vg_name): @@ -311,6 +208,18 @@ def call_instance_shutdown(node, instance): return c.getresult().get(node, False) +def call_instance_migrate(node, instance, target, live): + """Migrate an instance. + + This is a single-node call. + + """ + c = Client("instance_migrate", [instance.name, target, live]) + c.connect(node) + c.run() + return c.getresult().get(node, False) + + def call_instance_reboot(node, instance, reboot_type, extra_args): """Reboots an instance. @@ -603,6 +512,19 @@ def call_blockdev_find(node, disk): return c.getresult().get(node, False) +def call_blockdev_close(node, disks): + """Closes the given block devices. + + This is a single-node call. + + """ + params = [cf.ToDict() for cf in disks] + c = Client("blockdev_close", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) + + def call_upload_file(node_list, file_name): """Upload a file. @@ -680,6 +602,36 @@ def call_hooks_runner(node_list, hpath, phase, env): return result +def call_iallocator_runner(node, name, idata): + """Call an iallocator on a remote node + + Args: + - name: the iallocator name + - input: the json-encoded input string + + This is a single-node call. + + """ + params = [name, idata] + c = Client("iallocator_runner", params) + c.connect(node) + c.run() + result = c.getresult().get(node, False) + return result + + +def call_blockdev_grow(node, cf_bdev, amount): + """Request a snapshot of the given block device. + + This is a single-node call. + + """ + c = Client("blockdev_grow", [cf_bdev.ToDict(), amount]) + c.connect(node) + c.run() + return c.getresult().get(node, False) + + def call_blockdev_snapshot(node, cf_bdev): """Request a snapshot of the given block device. @@ -735,7 +687,7 @@ def call_export_info(node, path): result = c.getresult().get(node, False) if not result: return result - return objects.SerializableConfigParser.Loads(result) + return objects.SerializableConfigParser.Loads(str(result)) def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image): @@ -813,3 +765,42 @@ def call_test_delay(node_list, duration): c.connect_list(node_list) c.run() return c.getresult() + + +def call_file_storage_dir_create(node, file_storage_dir): + """Create the given file storage directory. + + This is a single-node call. + + """ + c = Client("file_storage_dir_create", [file_storage_dir]) + c.connect(node) + c.run() + return c.getresult().get(node, False) + + +def call_file_storage_dir_remove(node, file_storage_dir): + """Remove the given file storage directory. + + This is a single-node call. + + """ + c = Client("file_storage_dir_remove", [file_storage_dir]) + c.connect(node) + c.run() + return c.getresult().get(node, False) + + +def call_file_storage_dir_rename(node, old_file_storage_dir, + new_file_storage_dir): + """Rename file storage directory. + + This is a single-node call. + + """ + c = Client("file_storage_dir_rename", + [old_file_storage_dir, new_file_storage_dir]) + c.connect(node) + c.run() + return c.getresult().get(node, False) +