X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/f362096fbd9ebec5f1d2b33d4ef914e136e2988a..9ae49f2753cdd158edb910029f7d08c257d32ad0:/daemons/ganeti-noded diff --git a/daemons/ganeti-noded b/daemons/ganeti-noded index 1736904..5388660 100755 --- a/daemons/ganeti-noded +++ b/daemons/ganeti-noded @@ -26,8 +26,10 @@ import os import sys -import resource import traceback +import BaseHTTPServer +import simplejson +import errno from optparse import OptionParser @@ -40,55 +42,72 @@ from ganeti import errors from ganeti import ssconf from ganeti import utils -from twisted.spread import pb -from twisted.internet import reactor -from twisted.cred import checkers, portal -from OpenSSL import SSL - -class ServerContextFactory: - """SSL context factory class that uses a given certificate. - - """ - @staticmethod - def getContext(): - """Return a customized context. - - The context will be set to use our certificate. - - """ - ctx = SSL.Context(SSL.TLSv1_METHOD) - ctx.use_certificate_file(constants.SSL_CERT_FILE) - ctx.use_privatekey_file(constants.SSL_CERT_FILE) - return ctx - -class ServerObject(pb.Avatar): +class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """The server implementation. This class holds all methods exposed over the RPC interface. """ - def __init__(self, name): - self.name = name - - def perspectiveMessageReceived(self, broker, message, args, kw): - """Custom message dispatching function. - - This function overrides the pb.Avatar function in order to provide - a simple form of exception passing (as text only). + def do_PUT(self): + """Handle a post request. """ - args = broker.unserialize(args, self) - kw = broker.unserialize(kw, self) - method = getattr(self, "perspective_%s" % message) - tb = None - state = None + path = self.path + if path.startswith("/"): + path = path[1:] + mname = "perspective_%s" % path + if not hasattr(self, mname): + self.send_error(404) + return False + + method = getattr(self, mname) try: - state = method(*args, **kw) - except: - tb = traceback.format_exc() + body_length = int(self.headers.get('Content-Length', '0')) + except ValueError: + self.send_error(400, 'No Content-Length header or invalid format') + return False - return broker.serialize((tb, state), self, method, args, kw) + try: + body = self.rfile.read(body_length) + except socket.error, err: + logger.Error("Socket error while reading: %s" % str(err)) + return + try: + params = simplejson.loads(body) + result = method(params) + payload = simplejson.dumps(result) + except errors.QuitGanetiException, err: + global _EXIT_GANETI_NODED + _EXIT_GANETI_NODED = True + if isinstance(err, tuple) and len(err) == 2: + if err[0]: + self.send_error(500, "Error: %s" % str(err[1])) + else: + payload = simplejson.dumps(err[1]) + else: + self.log_message('GanetiQuitException Usage Error') + self.send_error(500, "Error: %s" % str(err)) + except Exception, err: + self.send_error(500, "Error: %s" % str(err)) + return False + self.send_response(200) + self.send_header('Content-Length', str(len(payload))) + self.end_headers() + self.wfile.write(payload) + return True + + def log_message(self, format, *args): + """Log a request to the log. + + This is the same as the parent, we just log somewhere else. + + """ + msg = ("%s - - [%s] %s" % + (self.address_string(), + self.log_date_time_string(), + format % args)) + logger.Debug(msg) # the new block devices -------------------------- @@ -203,6 +222,23 @@ class ServerObject(pb.Avatar): cfbd = objects.Disk.FromDict(params[0]) return backend.SnapshotBlockDevice(cfbd) + @staticmethod + def perspective_blockdev_grow(params): + """Grow a stack of devices. + + """ + cfbd = objects.Disk.FromDict(params[0]) + amount = params[1] + return backend.GrowBlockDevice(cfbd, amount) + + @staticmethod + def perspective_blockdev_close(params): + """Closes the given block devices. + + """ + disks = [objects.Disk.FromDict(cf) for cf in params] + return backend.CloseBlockDevices(disks) + # export/import -------------------------- @staticmethod @@ -333,6 +369,14 @@ class ServerObject(pb.Avatar): return backend.StartInstance(instance, extra_args) @staticmethod + def perspective_instance_migrate(params): + """Migrates an instance. + + """ + instance, target, live = params + return backend.MigrateInstance(instance, target, live) + + @staticmethod def perspective_instance_reboot(params): """Reboot an instance. @@ -370,8 +414,8 @@ class ServerObject(pb.Avatar): """Do a TcpPing on the remote node. """ - return utils.TcpPing(params[0], params[1], params[2], - timeout=params[3], live_port_needed=params[4]) + return utils.TcpPing(params[1], params[2], timeout=params[3], + live_port_needed=params[4], source=params[0]) @staticmethod def perspective_node_info(params): @@ -476,20 +520,52 @@ class ServerObject(pb.Avatar): hr = backend.HooksRunner() return hr.RunHooks(hpath, phase, env) + # iallocator ----------------- -class MyRealm: - """Simple realm that forwards all requests to a ServerObject. + @staticmethod + def perspective_iallocator_runner(params): + """Run an iallocator script. - """ - __implements__ = portal.IRealm + """ + name, idata = params + iar = backend.IAllocatorRunner() + return iar.Run(name, idata) + + # test ----------------------- + + @staticmethod + def perspective_test_delay(params): + """Run test delay. + + """ + duration = params[0] + return utils.TestDelay(duration) + + @staticmethod + def perspective_file_storage_dir_create(params): + """Create the file storage directory. + + """ + file_storage_dir = params[0] + return backend.CreateFileStorageDir(file_storage_dir) + + @staticmethod + def perspective_file_storage_dir_remove(params): + """Remove the file storage directory. - def requestAvatar(self, avatarId, mind, *interfaces): - """Return an avatar based on our ServerObject class. + """ + file_storage_dir = params[0] + return backend.RemoveFileStorageDir(file_storage_dir) + + @staticmethod + def perspective_file_storage_dir_rename(params): + """Rename the file storage directory. """ - if pb.IPerspective not in interfaces: - raise NotImplementedError - return pb.IPerspective, ServerObject(avatarId), lambda:None + old_file_storage_dir = params[0] + new_file_storage_dir = params[1] + return backend.RenameFileStorageDir(old_file_storage_dir, + new_file_storage_dir) def ParseOptions(): @@ -533,73 +609,33 @@ def main(): print "Cluster configuration incomplete: '%s'" % str(err) sys.exit(5) + # create /var/run/ganeti if not existing, in order to take care of + # tmpfs /var/run + if not os.path.exists(constants.BDEV_CACHE_DIR): + try: + os.mkdir(constants.BDEV_CACHE_DIR, 0755) + except EnvironmentError, err: + if err.errno != errno.EEXIST: + print ("Node setup wrong, cannot create directory %s: %s" % + (constants.BDEV_CACHE_DIR, err)) + sys.exit(5) + if not os.path.isdir(constants.BDEV_CACHE_DIR): + print ("Node setup wrong, %s is not a directory" % + constants.BDEV_CACHE_DIR) + sys.exit(5) + # become a daemon if options.fork: - createDaemon() - - logger.SetupLogging(twisted_workaround=True, debug=options.debug, - program="ganeti-noded") + utils.Daemonize(logfile=constants.LOG_NODESERVER) - p = portal.Portal(MyRealm()) - p.registerChecker( - checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata)) - reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory()) - reactor.run() + logger.SetupLogging(program="ganeti-noded", debug=options.debug) + global _EXIT_GANETI_NODED + _EXIT_GANETI_NODED = False -def createDaemon(): - """Detach a process from the controlling terminal and run it in the - background as a daemon. - - """ - UMASK = 077 - WORKDIR = "/" - # Default maximum for the number of available file descriptors. - if 'SC_OPEN_MAX' in os.sysconf_names: - try: - MAXFD = os.sysconf('SC_OPEN_MAX') - if MAXFD < 0: - MAXFD = 1024 - except OSError: - MAXFD = 1024 - else: - MAXFD = 1024 - # The standard I/O file descriptors are redirected to /dev/null by default. - #REDIRECT_TO = getattr(os, "devnull", "/dev/null") - REDIRECT_TO = constants.LOG_NODESERVER - try: - pid = os.fork() - except OSError, e: - raise Exception("%s [%d]" % (e.strerror, e.errno)) - if (pid == 0): # The first child. - os.setsid() - try: - pid = os.fork() # Fork a second child. - except OSError, e: - raise Exception("%s [%d]" % (e.strerror, e.errno)) - if (pid == 0): # The second child. - os.chdir(WORKDIR) - os.umask(UMASK) - else: - # exit() or _exit()? See below. - os._exit(0) # Exit parent (the first child) of the second child. - else: - os._exit(0) # Exit parent of the first child. - maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] - if (maxfd == resource.RLIM_INFINITY): - maxfd = MAXFD - - # Iterate through and close all file descriptors. - for fd in range(0, maxfd): - try: - os.close(fd) - except OSError: # ERROR, fd wasn't open to begin with (ignored) - pass - os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND, 0600) - # Duplicate standard input to standard output and standard error. - os.dup2(0, 1) # standard output (1) - os.dup2(0, 2) # standard error (2) - return(0) + httpd = BaseHTTPServer.HTTPServer(('', port), ServerObject) + while (not _EXIT_GANETI_NODED): + httpd.handle_request() if __name__ == '__main__':