X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/e07f7f7a02a1d83a8faa0d937e412c53558acbdc..3234695b0e09d78b79d2e3df076e4a9c646ef6d0:/lib/server/masterd.py diff --git a/lib/server/masterd.py b/lib/server/masterd.py index d92b0de..8b52c35 100644 --- a/lib/server/masterd.py +++ b/lib/server/masterd.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2010, 2011 Google Inc. +# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -57,6 +57,9 @@ from ganeti import bootstrap from ganeti import netutils from ganeti import objects from ganeti import query +from ganeti import runtime +from ganeti import pathutils +from ganeti import ht CLIENT_REQUEST_WORKERS = 16 @@ -65,6 +68,19 @@ EXIT_NOTMASTER = constants.EXIT_NOTMASTER EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR +def _LogNewJob(status, info, ops): + """Log information about a recently submitted job. + + """ + op_summary = utils.CommaJoin(op.Summary() for op in ops) + + if status: + logging.info("New job with id %s, summary: %s", info, op_summary) + else: + logging.info("Failed to submit job, reason: '%s', summary: %s", + info, op_summary) + + class ClientRequestWorker(workerpool.BaseWorker): # pylint: disable=W0221 def RunTask(self, server, message, client): @@ -266,28 +282,41 @@ class ClientOps: # TODO: Rewrite to not exit in each 'if/elif' branch if method == luxi.REQ_SUBMIT_JOB: - logging.info("Received new job") - ops = [opcodes.OpCode.LoadOpCode(state) for state in args] - return queue.SubmitJob(ops) - - if method == luxi.REQ_SUBMIT_MANY_JOBS: - logging.info("Received multiple jobs") + logging.info("Receiving new job") + (job_def, ) = args + ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def] + job_id = queue.SubmitJob(ops) + _LogNewJob(True, job_id, ops) + return job_id + + elif method == luxi.REQ_SUBMIT_MANY_JOBS: + logging.info("Receiving multiple jobs") + (job_defs, ) = args jobs = [] - for ops in args: + for ops in job_defs: jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops]) - return queue.SubmitManyJobs(jobs) + job_ids = queue.SubmitManyJobs(jobs) + for ((status, job_id), ops) in zip(job_ids, jobs): + _LogNewJob(status, job_id, ops) + return job_ids elif method == luxi.REQ_CANCEL_JOB: (job_id, ) = args logging.info("Received job cancel request for %s", job_id) return queue.CancelJob(job_id) + elif method == luxi.REQ_CHANGE_JOB_PRIORITY: + (job_id, priority) = args + logging.info("Received request to change priority for job %s to %s", + job_id, priority) + return queue.ChangeJobPriority(job_id, priority) + elif method == luxi.REQ_ARCHIVE_JOB: (job_id, ) = args logging.info("Received job archive request for %s", job_id) return queue.ArchiveJob(job_id) - elif method == luxi.REQ_AUTOARCHIVE_JOBS: + elif method == luxi.REQ_AUTO_ARCHIVE_JOBS: (age, timeout) = args logging.info("Received job autoarchive request for age %s, timeout %s", age, timeout) @@ -301,21 +330,21 @@ class ClientOps: elif method == luxi.REQ_QUERY: (what, fields, qfilter) = args - req = objects.QueryRequest(what=what, fields=fields, qfilter=qfilter) - - if req.what in constants.QR_VIA_OP: - result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields, - qfilter=req.qfilter)) - elif req.what == constants.QR_LOCK: - if req.qfilter is not None: - raise errors.OpPrereqError("Lock queries can't be filtered") - return context.glm.QueryLocks(req.fields) - elif req.what == constants.QR_JOB: - return queue.QueryJobs(req.fields, req.qfilter) - elif req.what in constants.QR_VIA_LUXI: + + if what in constants.QR_VIA_OP: + result = self._Query(opcodes.OpQuery(what=what, fields=fields, + qfilter=qfilter)) + elif what == constants.QR_LOCK: + if qfilter is not None: + raise errors.OpPrereqError("Lock queries can't be filtered", + errors.ECODE_INVAL) + return context.glm.QueryLocks(fields) + elif what == constants.QR_JOB: + return queue.QueryJobs(fields, qfilter) + elif what in constants.QR_VIA_LUXI: raise NotImplementedError else: - raise errors.OpPrereqError("Resource type '%s' unknown" % req.what, + raise errors.OpPrereqError("Resource type '%s' unknown" % what, errors.ECODE_INVAL) return result @@ -370,6 +399,15 @@ class ClientOps: op = opcodes.OpGroupQuery(names=names, output_fields=fields) return self._Query(op) + elif method == luxi.REQ_QUERY_NETWORKS: + (names, fields, use_locking) = args + logging.info("Received network query request for %s", names) + if use_locking: + raise errors.OpPrereqError("Sync queries are not allowed", + errors.ECODE_INVAL) + op = opcodes.OpNetworkQuery(names=names, output_fields=fields) + return self._Query(op) + elif method == luxi.REQ_QUERY_EXPORTS: (nodes, use_locking) = args if use_locking: @@ -393,10 +431,10 @@ class ClientOps: elif method == luxi.REQ_QUERY_TAGS: (kind, name) = args logging.info("Received tags query request") - op = opcodes.OpTagsGet(kind=kind, name=name) + op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False) return self._Query(op) - elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG: + elif method == luxi.REQ_SET_DRAIN_FLAG: (drain_flag, ) = args logging.info("Received queue drain flag change request to %s", drain_flag) @@ -405,18 +443,7 @@ class ClientOps: elif method == luxi.REQ_SET_WATCHER_PAUSE: (until, ) = args - if until is None: - logging.info("Received request to no longer pause the watcher") - else: - if not isinstance(until, (int, float)): - raise TypeError("Duration must be an integer or float") - - if until < time.time(): - raise errors.GenericError("Unable to set pause end time in the past") - - logging.info("Received request to pause the watcher until %s", until) - - return _SetWatcherPause(until) + return _SetWatcherPause(context, until) else: logging.info("Received invalid request '%s'", method) @@ -427,7 +454,7 @@ class ClientOps: """ # Queries don't have a job id - proc = mcpu.Processor(self.server.context, None) + proc = mcpu.Processor(self.server.context, None, enable_locks=False) # TODO: Executing an opcode using locks will acquire them in blocking mode. # Consider using a timeout for retries. @@ -458,9 +485,10 @@ class GanetiContext(object): # Locking manager self.glm = locking.GanetiLockManager( - self.cfg.GetNodeList(), - self.cfg.GetNodeGroupList(), - self.cfg.GetInstanceList()) + self.cfg.GetNodeList(), + self.cfg.GetNodeGroupList(), + self.cfg.GetInstanceList(), + self.cfg.GetNetworkList()) self.cfg.SetContext(self) @@ -516,18 +544,36 @@ class GanetiContext(object): self.glm.remove(locking.LEVEL_NODE_RES, name) -def _SetWatcherPause(until): +def _SetWatcherPause(context, until): """Creates or removes the watcher pause file. + @type context: L{GanetiContext} + @param context: Global Ganeti context @type until: None or int @param until: Unix timestamp saying until when the watcher shouldn't run """ + node_names = context.cfg.GetNodeList() + if until is None: - utils.RemoveFile(constants.WATCHER_PAUSEFILE) + logging.info("Received request to no longer pause watcher") else: - utils.WriteFile(constants.WATCHER_PAUSEFILE, - data="%d\n" % (until, )) + if not ht.TNumber(until): + raise TypeError("Duration must be numeric") + + if until < time.time(): + raise errors.GenericError("Unable to set pause end time in the past") + + logging.info("Received request to pause watcher until %s", until) + + result = context.rpc.call_set_watcher_pause(node_names, until) + + errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg) + for (node_name, nres) in result.items() + if nres.fail_msg and not nres.offline) + if errmsg: + raise errors.OpExecError("Watcher pause was set where possible, but failed" + " on the following node(s): %s" % errmsg) return until @@ -626,6 +672,9 @@ def CheckMasterd(options, args): (constants.MASTERD_USER, constants.DAEMONS_GROUP)) sys.exit(constants.EXIT_FAILURE) + # Determine static runtime architecture information + runtime.InitArchInfo() + # Check the configuration is sane before anything else try: config.ConfigWriter() @@ -683,10 +732,10 @@ def PrepMasterd(options, _): """ # This is safe to do as the pid file guarantees against # concurrent execution. - utils.RemoveFile(constants.MASTER_SOCKET) + utils.RemoveFile(pathutils.MASTER_SOCKET) mainloop = daemon.Mainloop() - master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid) + master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid) return (mainloop, master) @@ -706,7 +755,7 @@ def ExecMasterd(options, args, prep_data): # pylint: disable=W0613 finally: rpc.Shutdown() finally: - utils.RemoveFile(constants.MASTER_SOCKET) + utils.RemoveFile(pathutils.MASTER_SOCKET) logging.info("Clean master daemon shutdown")