X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/d4d654bd4295ca628a8a9df1ebc0380b0b7269e0..415feb2ee0a760e48a21a50488f6072f8afa9cff:/lib/server/masterd.py diff --git a/lib/server/masterd.py b/lib/server/masterd.py index 3f2aea1..41df24e 100644 --- a/lib/server/masterd.py +++ b/lib/server/masterd.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2010 Google Inc. +# Copyright (C) 2006, 2007, 2010, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -26,7 +26,7 @@ inheritance from parent classes requires it. """ -# pylint: disable-msg=C0103 +# pylint: disable=C0103 # C0103: Invalid name ganeti-masterd import grp @@ -66,7 +66,7 @@ EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR class ClientRequestWorker(workerpool.BaseWorker): - # pylint: disable-msg=W0221 + # pylint: disable=W0221 def RunTask(self, server, message, client): """Process the request. @@ -103,7 +103,7 @@ class ClientRequestWorker(workerpool.BaseWorker): client.send_message(reply) # awake the main thread so that it can write out the data. server.awaker.signal() - except: # pylint: disable-msg=W0702 + except: # pylint: disable=W0702 logging.exception("Send error") client.close_log() @@ -113,6 +113,7 @@ class MasterClientHandler(daemon.AsyncTerminatedMessageStream): """ _MAX_UNHANDLED = 1 + def __init__(self, server, connected_socket, client_address, family): daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket, client_address, @@ -124,6 +125,63 @@ class MasterClientHandler(daemon.AsyncTerminatedMessageStream): self.server.request_workers.AddTask((self.server, message, self)) +class _MasterShutdownCheck: + """Logic for master daemon shutdown. + + """ + #: How long to wait between checks + _CHECK_INTERVAL = 5.0 + + #: How long to wait after all jobs are done (e.g. to give clients time to + #: retrieve the job status) + _SHUTDOWN_LINGER = 5.0 + + def __init__(self): + """Initializes this class. + + """ + self._had_active_jobs = None + self._linger_timeout = None + + def __call__(self, jq_prepare_result): + """Determines if master daemon is ready for shutdown. + + @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown} + @rtype: None or number + @return: None if master daemon is ready, timeout if the check must be + repeated + + """ + if jq_prepare_result: + # Check again shortly + logging.info("Job queue has been notified for shutdown but is still" + " busy; next check in %s seconds", self._CHECK_INTERVAL) + self._had_active_jobs = True + return self._CHECK_INTERVAL + + if not self._had_active_jobs: + # Can shut down as there were no active jobs on the first check + return None + + # No jobs are running anymore, but maybe some clients want to collect some + # information. Give them a short amount of time. + if self._linger_timeout is None: + self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True) + + remaining = self._linger_timeout.Remaining() + + logging.info("Job queue no longer busy; shutting down master daemon" + " in %s seconds", remaining) + + # TODO: Should the master daemon socket be closed at this point? Doing so + # wouldn't affect existing connections. + + if remaining < 0: + return None + else: + return remaining + + class MasterServer(daemon.AsyncStreamServer): """Master Server. @@ -133,11 +191,9 @@ class MasterServer(daemon.AsyncStreamServer): """ family = socket.AF_UNIX - def __init__(self, mainloop, address, uid, gid): + def __init__(self, address, uid, gid): """MasterServer constructor - @type mainloop: ganeti.daemon.Mainloop - @param mainloop: Mainloop used to poll for I/O events @param address: the unix socket address to bind the MasterServer to @param uid: The uid of the owner of the socket @param gid: The gid of the owner of the socket @@ -149,13 +205,14 @@ class MasterServer(daemon.AsyncStreamServer): os.chown(temp_name, uid, gid) os.rename(temp_name, address) - self.mainloop = mainloop self.awaker = daemon.AsyncAwaker() # We'll only start threads once we've forked. self.context = None self.request_workers = None + self._shutdown_check = None + def handle_connection(self, connected_socket, client_address): # TODO: add connection count and limit the number of open connections to a # maximum number to avoid breaking for lack of file descriptors or memory. @@ -167,6 +224,15 @@ class MasterServer(daemon.AsyncStreamServer): CLIENT_REQUEST_WORKERS, ClientRequestWorker) + def WaitForShutdown(self): + """Prepares server for shutdown. + + """ + if self._shutdown_check is None: + self._shutdown_check = _MasterShutdownCheck() + + return self._shutdown_check(self.context.jobqueue.PrepareShutdown()) + def server_cleanup(self): """Cleanup the server. @@ -188,10 +254,13 @@ class ClientOps: def __init__(self, server): self.server = server - def handle_request(self, method, args): # pylint: disable-msg=R0911 + def handle_request(self, method, args): # pylint: disable=R0911 queue = self.server.context.jobqueue # TODO: Parameter validation + if not isinstance(args, (tuple, list)): + logging.info("Received invalid arguments of type '%s'", type(args)) + raise ValueError("Invalid arguments type '%s'" % type(args)) # TODO: Rewrite to not exit in each 'if/elif' branch @@ -208,12 +277,12 @@ class ClientOps: return queue.SubmitManyJobs(jobs) elif method == luxi.REQ_CANCEL_JOB: - job_id = args + (job_id, ) = args logging.info("Received job cancel request for %s", job_id) return queue.CancelJob(job_id) elif method == luxi.REQ_ARCHIVE_JOB: - job_id = args + (job_id, ) = args logging.info("Received job archive request for %s", job_id) return queue.ArchiveJob(job_id) @@ -230,16 +299,17 @@ class ClientOps: prev_log_serial, timeout) elif method == luxi.REQ_QUERY: - req = objects.QueryRequest.FromDict(args) + (what, fields, qfilter) = args + req = objects.QueryRequest(what=what, fields=fields, qfilter=qfilter) - if req.what in constants.QR_OP_QUERY: + if req.what in constants.QR_VIA_OP: result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields, - filter=req.filter)) + qfilter=req.qfilter)) elif req.what == constants.QR_LOCK: - if req.filter is not None: + if req.qfilter is not None: raise errors.OpPrereqError("Lock queries can't be filtered") return self.server.context.glm.QueryLocks(req.fields) - elif req.what in constants.QR_OP_LUXI: + elif req.what in constants.QR_VIA_LUXI: raise NotImplementedError else: raise errors.OpPrereqError("Resource type '%s' unknown" % req.what, @@ -248,20 +318,16 @@ class ClientOps: return result elif method == luxi.REQ_QUERY_FIELDS: - req = objects.QueryFieldsRequest.FromDict(args) + (what, fields) = args + req = objects.QueryFieldsRequest(what=what, fields=fields) - if req.what in constants.QR_OP_QUERY: - result = self._Query(opcodes.OpQueryFields(what=req.what, - fields=req.fields)) - elif req.what == constants.QR_LOCK: - return query.QueryFields(query.LOCK_FIELDS, req.fields) - elif req.what in constants.QR_OP_LUXI: - raise NotImplementedError - else: + try: + fielddefs = query.ALL_FIELDS[req.what] + except KeyError: raise errors.OpPrereqError("Resource type '%s' unknown" % req.what, errors.ECODE_INVAL) - return result + return query.QueryFields(fielddefs, req.fields) elif method == luxi.REQ_QUERY_JOBS: (job_ids, fields) = args @@ -278,8 +344,8 @@ class ClientOps: if use_locking: raise errors.OpPrereqError("Sync queries are not allowed", errors.ECODE_INVAL) - op = opcodes.OpQueryInstances(names=names, output_fields=fields, - use_locking=use_locking) + op = opcodes.OpInstanceQuery(names=names, output_fields=fields, + use_locking=use_locking) return self._Query(op) elif method == luxi.REQ_QUERY_NODES: @@ -288,8 +354,8 @@ class ClientOps: if use_locking: raise errors.OpPrereqError("Sync queries are not allowed", errors.ECODE_INVAL) - op = opcodes.OpQueryNodes(names=names, output_fields=fields, - use_locking=use_locking) + op = opcodes.OpNodeQuery(names=names, output_fields=fields, + use_locking=use_locking) return self._Query(op) elif method == luxi.REQ_QUERY_GROUPS: @@ -302,7 +368,7 @@ class ClientOps: return self._Query(op) elif method == luxi.REQ_QUERY_EXPORTS: - nodes, use_locking = args + (nodes, use_locking) = args if use_locking: raise errors.OpPrereqError("Sync queries are not allowed", errors.ECODE_INVAL) @@ -311,7 +377,7 @@ class ClientOps: return self._Query(op) elif method == luxi.REQ_QUERY_CONFIG_VALUES: - fields = args + (fields, ) = args logging.info("Received config values query request for %s", fields) op = opcodes.OpClusterConfigQuery(output_fields=fields) return self._Query(op) @@ -322,9 +388,9 @@ class ClientOps: return self._Query(op) elif method == luxi.REQ_QUERY_TAGS: - kind, name = args + (kind, name) = args logging.info("Received tags query request") - op = opcodes.OpGetTags(kind=kind, name=name) + op = opcodes.OpTagsGet(kind=kind, name=name) return self._Query(op) elif method == luxi.REQ_QUERY_LOCKS: @@ -335,7 +401,7 @@ class ClientOps: return self.server.context.glm.OldStyleQueryLocks(fields) elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG: - drain_flag = args + (drain_flag, ) = args logging.info("Received queue drain flag change request to %s", drain_flag) return queue.SetDrainFlag(drain_flag) @@ -378,7 +444,7 @@ class GanetiContext(object): This class creates and holds common objects shared by all threads. """ - # pylint: disable-msg=W0212 + # pylint: disable=W0212 # we do want to ensure a singleton here _instance = None @@ -400,6 +466,11 @@ class GanetiContext(object): self.cfg.GetNodeGroupList(), self.cfg.GetInstanceList()) + self.cfg.SetContext(self) + + # RPC runner + self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor) + # Job queue self.jobqueue = jqueue.JobQueue(self) @@ -425,6 +496,7 @@ class GanetiContext(object): # Add the new node to the Ganeti Lock Manager self.glm.add(locking.LEVEL_NODE, node.name) + self.glm.add(locking.LEVEL_NODE_RES, node.name) def ReaddNode(self, node): """Updates a node that's already in the configuration @@ -445,6 +517,7 @@ class GanetiContext(object): # Remove the node from the Ganeti Lock Manager self.glm.remove(locking.LEVEL_NODE, name) + self.glm.remove(locking.LEVEL_NODE_RES, name) def _SetWatcherPause(until): @@ -527,8 +600,13 @@ def CheckAgreement(): @rpc.RunWithRPC def ActivateMasterIP(): # activate ip - master_node = ssconf.SimpleStore().GetMasterNode() - result = rpc.RpcRunner.call_node_start_master(master_node, False, False) + cfg = config.ConfigWriter() + master_params = cfg.GetMasterNetworkParameters() + ems = cfg.GetUseExternalMipScript() + runner = rpc.BootstrapRunner() + result = runner.call_node_activate_master_ip(master_params.name, + master_params, ems) + msg = result.fail_msg if msg: logging.error("Can't activate master IP address: %s", msg) @@ -578,26 +656,24 @@ def CheckMasterd(options, args): # If CheckMaster didn't fail we believe we are the master, but we have to # confirm with the other nodes. if options.no_voting: - if options.yes_do_it: - return + if not options.yes_do_it: + sys.stdout.write("The 'no voting' option has been selected.\n") + sys.stdout.write("This is dangerous, please confirm by" + " typing uppercase 'yes': ") + sys.stdout.flush() - sys.stdout.write("The 'no voting' option has been selected.\n") - sys.stdout.write("This is dangerous, please confirm by" - " typing uppercase 'yes': ") - sys.stdout.flush() + confirmation = sys.stdin.readline().strip() + if confirmation != "YES": + print >> sys.stderr, "Aborting." + sys.exit(constants.EXIT_FAILURE) - confirmation = sys.stdin.readline().strip() - if confirmation != "YES": - print >> sys.stderr, "Aborting." + else: + # CheckAgreement uses RPC and threads, hence it needs to be run in + # a separate process before we call utils.Daemonize in the current + # process. + if not utils.RunInSeparateProcess(CheckAgreement): sys.exit(constants.EXIT_FAILURE) - return - - # CheckAgreement uses RPC and threads, hence it needs to be run in a separate - # process before we call utils.Daemonize in the current process. - if not utils.RunInSeparateProcess(CheckAgreement): - sys.exit(constants.EXIT_FAILURE) - # ActivateMasterIP also uses RPC/threads, so we run it again via a # separate process. @@ -614,12 +690,11 @@ def PrepMasterd(options, _): utils.RemoveFile(constants.MASTER_SOCKET) mainloop = daemon.Mainloop() - master = MasterServer(mainloop, constants.MASTER_SOCKET, - options.uid, options.gid) + master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid) return (mainloop, master) -def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613 +def ExecMasterd(options, args, prep_data): # pylint: disable=W0613 """Main master daemon function, executed with the PID file held. """ @@ -629,7 +704,7 @@ def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613 try: master.setup_queue() try: - mainloop.Run() + mainloop.Run(shutdown_wait_fn=master.WaitForShutdown) finally: master.server_cleanup() finally: @@ -637,6 +712,8 @@ def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613 finally: utils.RemoveFile(constants.MASTER_SOCKET) + logging.info("Clean master daemon shutdown") + def Main(): """Main function"""