X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/ff5fac045a18b6e79f9ba490728a54d5c73dd666..4e071d3b89793af09f2f84304b182257221cb6b5:/daemons/ganeti-noded diff --git a/daemons/ganeti-noded b/daemons/ganeti-noded index cb72b4c..3de30d3 100755 --- a/daemons/ganeti-noded +++ b/daemons/ganeti-noded @@ -27,92 +27,72 @@ import os import sys import traceback -import BaseHTTPServer -import simplejson +import SocketServer import errno import logging +import signal from optparse import OptionParser - from ganeti import backend from ganeti import logger from ganeti import constants from ganeti import objects from ganeti import errors +from ganeti import jstore from ganeti import ssconf +from ganeti import http from ganeti import utils -_EXIT_GANETI_NODED = False +queue_lock = None + + +def _RequireJobQueueLock(fn): + """Decorator for job queue manipulating functions. + + """ + def wrapper(*args, **kwargs): + # Locking in exclusive, blocking mode because there could be several + # children running at the same time. Waiting up to 10 seconds. + queue_lock.Exclusive(blocking=True, timeout=10) + try: + return fn(*args, **kwargs) + finally: + queue_lock.Unlock() + return wrapper -class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): + +class NodeDaemonRequestHandler(http.HTTPRequestHandler): """The server implementation. This class holds all methods exposed over the RPC interface. """ - def do_PUT(self): - """Handle a post request. + def HandleRequest(self): + """Handle a request. """ + if self.command.upper() != "PUT": + raise http.HTTPBadRequest() + path = self.path if path.startswith("/"): path = path[1:] - logging.debug("ServerObject: received call '%s'", path) - mname = "perspective_%s" % path - if not hasattr(self, mname): - self.send_error(404) - return False - method = getattr(self, mname) - try: - body_length = int(self.headers.get('Content-Length', '0')) - except ValueError: - self.send_error(400, 'No Content-Length header or invalid format') - return False + method = getattr(self, "perspective_%s" % path, None) + if method is None: + raise httperror.HTTPNotFound() try: - body = self.rfile.read(body_length) - except socket.error, err: - logger.Error("Socket error while reading: %s" % str(err)) - return - try: - params = simplejson.loads(body) - logging.debug("ServerObject: method parameters: %s", params) - result = method(params) - payload = simplejson.dumps(result) + try: + return method(self.post_data) + except: + logging.exception("Error in RPC call") + raise except errors.QuitGanetiException, err: - global _EXIT_GANETI_NODED - _EXIT_GANETI_NODED = True - if isinstance(err.args, tuple) and len(err.args) == 2: - if err.args[0]: - self.send_error(500, "Error: %s" % str(err[1])) - else: - payload = simplejson.dumps(err.args[1]) - else: - self.log_message('QuitGanetiException Usage Error') - self.send_error(500, "Error: %s" % str(err)) - except Exception, err: - self.send_error(500, "Error: %s" % str(err)) - return False - self.send_response(200) - self.send_header('Content-Length', str(len(payload))) - self.end_headers() - self.wfile.write(payload) - return True - - def log_message(self, format, *args): - """Log a request to the log. - - This is the same as the parent, we just log somewhere else. - - """ - msg = ("%s - - [%s] %s" % - (self.address_string(), - self.log_date_time_string(), - format % args)) - logging.debug(msg) + # Tell parent to quit + os.kill(self.server.noded_pid, signal.SIGTERM) # the new block devices -------------------------- @@ -450,14 +430,14 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """Promote this node to master status. """ - return backend.StartMaster() + return backend.StartMaster(params[0]) @staticmethod def perspective_node_stop_master(params): """Demote this node from master status. """ - return backend.StopMaster() + return backend.StopMaster(params[0]) @staticmethod def perspective_node_leave_cluster(params): @@ -492,6 +472,12 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """ return backend.UploadFile(*params) + @staticmethod + def perspective_master_info(params): + """Query master information. + + """ + return backend.GetMasterInfo() # os ----------------------- @@ -546,6 +532,8 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): duration = params[0] return utils.TestDelay(duration) + # file storage --------------- + @staticmethod def perspective_file_storage_dir_create(params): """Create the file storage directory. @@ -572,6 +560,60 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): return backend.RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir) + # jobs ------------------------ + + @staticmethod + @_RequireJobQueueLock + def perspective_jobqueue_update(params): + """Update job queue. + + """ + (file_name, content) = params + return backend.JobQueueUpdate(file_name, content) + + @staticmethod + @_RequireJobQueueLock + def perspective_jobqueue_purge(params): + """Purge job queue. + + """ + return backend.JobQueuePurge() + + @staticmethod + @_RequireJobQueueLock + def perspective_jobqueue_rename(params): + """Rename a job queue file. + + """ + (old, new) = params + + return backend.JobQueueRename(old, new) + + +class NodeDaemonHttpServer(http.HTTPServer): + def __init__(self, server_address): + http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler) + self.noded_pid = os.getpid() + + def serve_forever(self): + """Handle requests until told to quit.""" + sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM]) + try: + while not sighandler.called: + self.handle_request() + # TODO: There could be children running at this point + finally: + sighandler.Reset() + + +class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer): + """Forking HTTP Server. + + This inherits from ForkingMixIn and HTTPServer in order to fork for each + request we handle. This allows more requests to be handled concurrently. + + """ + def ParseOptions(): """Parse the command line options. @@ -599,6 +641,8 @@ def main(): """Main function for the node daemon. """ + global queue_lock + options, args = ParseOptions() utils.debug = options.debug for fname in (constants.SSL_CERT_FILE,): @@ -633,15 +677,24 @@ def main(): if options.fork: utils.Daemonize(logfile=constants.LOG_NODESERVER) - logger.SetupDaemon(logfile=constants.LOG_NODESERVER, debug=options.debug, - stderr_logging=not options.fork) + utils.WritePidFile(constants.NODED_PID) + + logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug, + stderr_logging=not options.fork) logging.info("ganeti node daemon startup") - global _EXIT_GANETI_NODED + # Prepare job queue + queue_lock = jstore.InitAndVerifyQueue(must_lock=False) - httpd = BaseHTTPServer.HTTPServer(('', port), ServerObject) - while (not _EXIT_GANETI_NODED): - httpd.handle_request() + if options.fork: + server = ForkingHTTPServer(('', port)) + else: + server = NodeDaemonHttpServer(('', port)) + + try: + server.serve_forever() + finally: + utils.RemovePidFile(constants.NODED_PID) if __name__ == '__main__':