X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/b459a848dc62e314e61e8ae14edd3ff6cc2b2822..3234695b0e09d78b79d2e3df076e4a9c646ef6d0:/lib/server/masterd.py diff --git a/lib/server/masterd.py b/lib/server/masterd.py index 27bf561..8b52c35 100644 --- a/lib/server/masterd.py +++ b/lib/server/masterd.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2010, 2011 Google Inc. +# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -57,6 +57,9 @@ from ganeti import bootstrap from ganeti import netutils from ganeti import objects from ganeti import query +from ganeti import runtime +from ganeti import pathutils +from ganeti import ht CLIENT_REQUEST_WORKERS = 16 @@ -65,6 +68,19 @@ EXIT_NOTMASTER = constants.EXIT_NOTMASTER EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR +def _LogNewJob(status, info, ops): + """Log information about a recently submitted job. + + """ + op_summary = utils.CommaJoin(op.Summary() for op in ops) + + if status: + logging.info("New job with id %s, summary: %s", info, op_summary) + else: + logging.info("Failed to submit job, reason: '%s', summary: %s", + info, op_summary) + + class ClientRequestWorker(workerpool.BaseWorker): # pylint: disable=W0221 def RunTask(self, server, message, client): @@ -125,6 +141,63 @@ class MasterClientHandler(daemon.AsyncTerminatedMessageStream): self.server.request_workers.AddTask((self.server, message, self)) +class _MasterShutdownCheck: + """Logic for master daemon shutdown. + + """ + #: How long to wait between checks + _CHECK_INTERVAL = 5.0 + + #: How long to wait after all jobs are done (e.g. to give clients time to + #: retrieve the job status) + _SHUTDOWN_LINGER = 5.0 + + def __init__(self): + """Initializes this class. + + """ + self._had_active_jobs = None + self._linger_timeout = None + + def __call__(self, jq_prepare_result): + """Determines if master daemon is ready for shutdown. + + @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown} + @rtype: None or number + @return: None if master daemon is ready, timeout if the check must be + repeated + + """ + if jq_prepare_result: + # Check again shortly + logging.info("Job queue has been notified for shutdown but is still" + " busy; next check in %s seconds", self._CHECK_INTERVAL) + self._had_active_jobs = True + return self._CHECK_INTERVAL + + if not self._had_active_jobs: + # Can shut down as there were no active jobs on the first check + return None + + # No jobs are running anymore, but maybe some clients want to collect some + # information. Give them a short amount of time. + if self._linger_timeout is None: + self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True) + + remaining = self._linger_timeout.Remaining() + + logging.info("Job queue no longer busy; shutting down master daemon" + " in %s seconds", remaining) + + # TODO: Should the master daemon socket be closed at this point? Doing so + # wouldn't affect existing connections. + + if remaining < 0: + return None + else: + return remaining + + class MasterServer(daemon.AsyncStreamServer): """Master Server. @@ -134,11 +207,9 @@ class MasterServer(daemon.AsyncStreamServer): """ family = socket.AF_UNIX - def __init__(self, mainloop, address, uid, gid): + def __init__(self, address, uid, gid): """MasterServer constructor - @type mainloop: ganeti.daemon.Mainloop - @param mainloop: Mainloop used to poll for I/O events @param address: the unix socket address to bind the MasterServer to @param uid: The uid of the owner of the socket @param gid: The gid of the owner of the socket @@ -150,13 +221,14 @@ class MasterServer(daemon.AsyncStreamServer): os.chown(temp_name, uid, gid) os.rename(temp_name, address) - self.mainloop = mainloop self.awaker = daemon.AsyncAwaker() # We'll only start threads once we've forked. self.context = None self.request_workers = None + self._shutdown_check = None + def handle_connection(self, connected_socket, client_address): # TODO: add connection count and limit the number of open connections to a # maximum number to avoid breaking for lack of file descriptors or memory. @@ -168,6 +240,15 @@ class MasterServer(daemon.AsyncStreamServer): CLIENT_REQUEST_WORKERS, ClientRequestWorker) + def WaitForShutdown(self): + """Prepares server for shutdown. + + """ + if self._shutdown_check is None: + self._shutdown_check = _MasterShutdownCheck() + + return self._shutdown_check(self.context.jobqueue.PrepareShutdown()) + def server_cleanup(self): """Cleanup the server. @@ -190,35 +271,52 @@ class ClientOps: self.server = server def handle_request(self, method, args): # pylint: disable=R0911 - queue = self.server.context.jobqueue + context = self.server.context + queue = context.jobqueue # TODO: Parameter validation + if not isinstance(args, (tuple, list)): + logging.info("Received invalid arguments of type '%s'", type(args)) + raise ValueError("Invalid arguments type '%s'" % type(args)) # TODO: Rewrite to not exit in each 'if/elif' branch if method == luxi.REQ_SUBMIT_JOB: - logging.info("Received new job") - ops = [opcodes.OpCode.LoadOpCode(state) for state in args] - return queue.SubmitJob(ops) - - if method == luxi.REQ_SUBMIT_MANY_JOBS: - logging.info("Received multiple jobs") + logging.info("Receiving new job") + (job_def, ) = args + ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def] + job_id = queue.SubmitJob(ops) + _LogNewJob(True, job_id, ops) + return job_id + + elif method == luxi.REQ_SUBMIT_MANY_JOBS: + logging.info("Receiving multiple jobs") + (job_defs, ) = args jobs = [] - for ops in args: + for ops in job_defs: jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops]) - return queue.SubmitManyJobs(jobs) + job_ids = queue.SubmitManyJobs(jobs) + for ((status, job_id), ops) in zip(job_ids, jobs): + _LogNewJob(status, job_id, ops) + return job_ids elif method == luxi.REQ_CANCEL_JOB: - job_id = args + (job_id, ) = args logging.info("Received job cancel request for %s", job_id) return queue.CancelJob(job_id) + elif method == luxi.REQ_CHANGE_JOB_PRIORITY: + (job_id, priority) = args + logging.info("Received request to change priority for job %s to %s", + job_id, priority) + return queue.ChangeJobPriority(job_id, priority) + elif method == luxi.REQ_ARCHIVE_JOB: - job_id = args + (job_id, ) = args logging.info("Received job archive request for %s", job_id) return queue.ArchiveJob(job_id) - elif method == luxi.REQ_AUTOARCHIVE_JOBS: + elif method == luxi.REQ_AUTO_ARCHIVE_JOBS: (age, timeout) = args logging.info("Received job autoarchive request for age %s, timeout %s", age, timeout) @@ -231,25 +329,29 @@ class ClientOps: prev_log_serial, timeout) elif method == luxi.REQ_QUERY: - req = objects.QueryRequest.FromDict(args) - - if req.what in constants.QR_VIA_OP: - result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields, - filter=req.filter)) - elif req.what == constants.QR_LOCK: - if req.filter is not None: - raise errors.OpPrereqError("Lock queries can't be filtered") - return self.server.context.glm.QueryLocks(req.fields) - elif req.what in constants.QR_VIA_LUXI: + (what, fields, qfilter) = args + + if what in constants.QR_VIA_OP: + result = self._Query(opcodes.OpQuery(what=what, fields=fields, + qfilter=qfilter)) + elif what == constants.QR_LOCK: + if qfilter is not None: + raise errors.OpPrereqError("Lock queries can't be filtered", + errors.ECODE_INVAL) + return context.glm.QueryLocks(fields) + elif what == constants.QR_JOB: + return queue.QueryJobs(fields, qfilter) + elif what in constants.QR_VIA_LUXI: raise NotImplementedError else: - raise errors.OpPrereqError("Resource type '%s' unknown" % req.what, + raise errors.OpPrereqError("Resource type '%s' unknown" % what, errors.ECODE_INVAL) return result elif method == luxi.REQ_QUERY_FIELDS: - req = objects.QueryFieldsRequest.FromDict(args) + (what, fields) = args + req = objects.QueryFieldsRequest(what=what, fields=fields) try: fielddefs = query.ALL_FIELDS[req.what] @@ -266,7 +368,7 @@ class ClientOps: else: msg = str(job_ids) logging.info("Received job query request for %s", msg) - return queue.QueryJobs(job_ids, fields) + return queue.OldStyleQueryJobs(job_ids, fields) elif method == luxi.REQ_QUERY_INSTANCES: (names, fields, use_locking) = args @@ -297,8 +399,17 @@ class ClientOps: op = opcodes.OpGroupQuery(names=names, output_fields=fields) return self._Query(op) + elif method == luxi.REQ_QUERY_NETWORKS: + (names, fields, use_locking) = args + logging.info("Received network query request for %s", names) + if use_locking: + raise errors.OpPrereqError("Sync queries are not allowed", + errors.ECODE_INVAL) + op = opcodes.OpNetworkQuery(names=names, output_fields=fields) + return self._Query(op) + elif method == luxi.REQ_QUERY_EXPORTS: - nodes, use_locking = args + (nodes, use_locking) = args if use_locking: raise errors.OpPrereqError("Sync queries are not allowed", errors.ECODE_INVAL) @@ -307,7 +418,7 @@ class ClientOps: return self._Query(op) elif method == luxi.REQ_QUERY_CONFIG_VALUES: - fields = args + (fields, ) = args logging.info("Received config values query request for %s", fields) op = opcodes.OpClusterConfigQuery(output_fields=fields) return self._Query(op) @@ -318,20 +429,13 @@ class ClientOps: return self._Query(op) elif method == luxi.REQ_QUERY_TAGS: - kind, name = args + (kind, name) = args logging.info("Received tags query request") - op = opcodes.OpTagsGet(kind=kind, name=name) + op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False) return self._Query(op) - elif method == luxi.REQ_QUERY_LOCKS: - (fields, sync) = args - logging.info("Received locks query request") - if sync: - raise NotImplementedError("Synchronous queries are not implemented") - return self.server.context.glm.OldStyleQueryLocks(fields) - - elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG: - drain_flag = args + elif method == luxi.REQ_SET_DRAIN_FLAG: + (drain_flag, ) = args logging.info("Received queue drain flag change request to %s", drain_flag) return queue.SetDrainFlag(drain_flag) @@ -339,18 +443,7 @@ class ClientOps: elif method == luxi.REQ_SET_WATCHER_PAUSE: (until, ) = args - if until is None: - logging.info("Received request to no longer pause the watcher") - else: - if not isinstance(until, (int, float)): - raise TypeError("Duration must be an integer or float") - - if until < time.time(): - raise errors.GenericError("Unable to set pause end time in the past") - - logging.info("Received request to pause the watcher until %s", until) - - return _SetWatcherPause(until) + return _SetWatcherPause(context, until) else: logging.info("Received invalid request '%s'", method) @@ -361,7 +454,7 @@ class ClientOps: """ # Queries don't have a job id - proc = mcpu.Processor(self.server.context, None) + proc = mcpu.Processor(self.server.context, None, enable_locks=False) # TODO: Executing an opcode using locks will acquire them in blocking mode. # Consider using a timeout for retries. @@ -392,9 +485,15 @@ class GanetiContext(object): # Locking manager self.glm = locking.GanetiLockManager( - self.cfg.GetNodeList(), - self.cfg.GetNodeGroupList(), - self.cfg.GetInstanceList()) + self.cfg.GetNodeList(), + self.cfg.GetNodeGroupList(), + self.cfg.GetInstanceList(), + self.cfg.GetNetworkList()) + + self.cfg.SetContext(self) + + # RPC runner + self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor) # Job queue self.jobqueue = jqueue.JobQueue(self) @@ -421,6 +520,7 @@ class GanetiContext(object): # Add the new node to the Ganeti Lock Manager self.glm.add(locking.LEVEL_NODE, node.name) + self.glm.add(locking.LEVEL_NODE_RES, node.name) def ReaddNode(self, node): """Updates a node that's already in the configuration @@ -441,20 +541,39 @@ class GanetiContext(object): # Remove the node from the Ganeti Lock Manager self.glm.remove(locking.LEVEL_NODE, name) + self.glm.remove(locking.LEVEL_NODE_RES, name) -def _SetWatcherPause(until): +def _SetWatcherPause(context, until): """Creates or removes the watcher pause file. + @type context: L{GanetiContext} + @param context: Global Ganeti context @type until: None or int @param until: Unix timestamp saying until when the watcher shouldn't run """ + node_names = context.cfg.GetNodeList() + if until is None: - utils.RemoveFile(constants.WATCHER_PAUSEFILE) + logging.info("Received request to no longer pause watcher") else: - utils.WriteFile(constants.WATCHER_PAUSEFILE, - data="%d\n" % (until, )) + if not ht.TNumber(until): + raise TypeError("Duration must be numeric") + + if until < time.time(): + raise errors.GenericError("Unable to set pause end time in the past") + + logging.info("Received request to pause watcher until %s", until) + + result = context.rpc.call_set_watcher_pause(node_names, until) + + errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg) + for (node_name, nres) in result.items() + if nres.fail_msg and not nres.offline) + if errmsg: + raise errors.OpExecError("Watcher pause was set where possible, but failed" + " on the following node(s): %s" % errmsg) return until @@ -523,8 +642,13 @@ def CheckAgreement(): @rpc.RunWithRPC def ActivateMasterIP(): # activate ip - master_node = ssconf.SimpleStore().GetMasterNode() - result = rpc.RpcRunner.call_node_start_master(master_node, False, False) + cfg = config.ConfigWriter() + master_params = cfg.GetMasterNetworkParameters() + ems = cfg.GetUseExternalMipScript() + runner = rpc.BootstrapRunner() + result = runner.call_node_activate_master_ip(master_params.name, + master_params, ems) + msg = result.fail_msg if msg: logging.error("Can't activate master IP address: %s", msg) @@ -548,6 +672,9 @@ def CheckMasterd(options, args): (constants.MASTERD_USER, constants.DAEMONS_GROUP)) sys.exit(constants.EXIT_FAILURE) + # Determine static runtime architecture information + runtime.InitArchInfo() + # Check the configuration is sane before anything else try: config.ConfigWriter() @@ -605,11 +732,10 @@ def PrepMasterd(options, _): """ # This is safe to do as the pid file guarantees against # concurrent execution. - utils.RemoveFile(constants.MASTER_SOCKET) + utils.RemoveFile(pathutils.MASTER_SOCKET) mainloop = daemon.Mainloop() - master = MasterServer(mainloop, constants.MASTER_SOCKET, - options.uid, options.gid) + master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid) return (mainloop, master) @@ -623,13 +749,15 @@ def ExecMasterd(options, args, prep_data): # pylint: disable=W0613 try: master.setup_queue() try: - mainloop.Run() + mainloop.Run(shutdown_wait_fn=master.WaitForShutdown) finally: master.server_cleanup() finally: rpc.Shutdown() finally: - utils.RemoveFile(constants.MASTER_SOCKET) + utils.RemoveFile(pathutils.MASTER_SOCKET) + + logging.info("Clean master daemon shutdown") def Main():