X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/83c046a243ace4f073afccde018b56622f29a4c7..3234695b0e09d78b79d2e3df076e4a9c646ef6d0:/lib/server/masterd.py diff --git a/lib/server/masterd.py b/lib/server/masterd.py index 94c60f4..8b52c35 100644 --- a/lib/server/masterd.py +++ b/lib/server/masterd.py @@ -58,6 +58,8 @@ from ganeti import netutils from ganeti import objects from ganeti import query from ganeti import runtime +from ganeti import pathutils +from ganeti import ht CLIENT_REQUEST_WORKERS = 16 @@ -66,6 +68,19 @@ EXIT_NOTMASTER = constants.EXIT_NOTMASTER EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR +def _LogNewJob(status, info, ops): + """Log information about a recently submitted job. + + """ + op_summary = utils.CommaJoin(op.Summary() for op in ops) + + if status: + logging.info("New job with id %s, summary: %s", info, op_summary) + else: + logging.info("Failed to submit job, reason: '%s', summary: %s", + info, op_summary) + + class ClientRequestWorker(workerpool.BaseWorker): # pylint: disable=W0221 def RunTask(self, server, message, client): @@ -267,22 +282,35 @@ class ClientOps: # TODO: Rewrite to not exit in each 'if/elif' branch if method == luxi.REQ_SUBMIT_JOB: - logging.info("Received new job") - ops = [opcodes.OpCode.LoadOpCode(state) for state in args] - return queue.SubmitJob(ops) - - if method == luxi.REQ_SUBMIT_MANY_JOBS: - logging.info("Received multiple jobs") + logging.info("Receiving new job") + (job_def, ) = args + ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def] + job_id = queue.SubmitJob(ops) + _LogNewJob(True, job_id, ops) + return job_id + + elif method == luxi.REQ_SUBMIT_MANY_JOBS: + logging.info("Receiving multiple jobs") + (job_defs, ) = args jobs = [] - for ops in args: + for ops in job_defs: jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops]) - return queue.SubmitManyJobs(jobs) + job_ids = queue.SubmitManyJobs(jobs) + for ((status, job_id), ops) in zip(job_ids, jobs): + _LogNewJob(status, job_id, ops) + return job_ids elif method == luxi.REQ_CANCEL_JOB: (job_id, ) = args logging.info("Received job cancel request for %s", job_id) return queue.CancelJob(job_id) + elif method == luxi.REQ_CHANGE_JOB_PRIORITY: + (job_id, priority) = args + logging.info("Received request to change priority for job %s to %s", + job_id, priority) + return queue.ChangeJobPriority(job_id, priority) + elif method == luxi.REQ_ARCHIVE_JOB: (job_id, ) = args logging.info("Received job archive request for %s", job_id) @@ -308,7 +336,8 @@ class ClientOps: qfilter=qfilter)) elif what == constants.QR_LOCK: if qfilter is not None: - raise errors.OpPrereqError("Lock queries can't be filtered") + raise errors.OpPrereqError("Lock queries can't be filtered", + errors.ECODE_INVAL) return context.glm.QueryLocks(fields) elif what == constants.QR_JOB: return queue.QueryJobs(fields, qfilter) @@ -370,6 +399,15 @@ class ClientOps: op = opcodes.OpGroupQuery(names=names, output_fields=fields) return self._Query(op) + elif method == luxi.REQ_QUERY_NETWORKS: + (names, fields, use_locking) = args + logging.info("Received network query request for %s", names) + if use_locking: + raise errors.OpPrereqError("Sync queries are not allowed", + errors.ECODE_INVAL) + op = opcodes.OpNetworkQuery(names=names, output_fields=fields) + return self._Query(op) + elif method == luxi.REQ_QUERY_EXPORTS: (nodes, use_locking) = args if use_locking: @@ -405,18 +443,7 @@ class ClientOps: elif method == luxi.REQ_SET_WATCHER_PAUSE: (until, ) = args - if until is None: - logging.info("Received request to no longer pause the watcher") - else: - if not isinstance(until, (int, float)): - raise TypeError("Duration must be an integer or float") - - if until < time.time(): - raise errors.GenericError("Unable to set pause end time in the past") - - logging.info("Received request to pause the watcher until %s", until) - - return _SetWatcherPause(until) + return _SetWatcherPause(context, until) else: logging.info("Received invalid request '%s'", method) @@ -458,9 +485,10 @@ class GanetiContext(object): # Locking manager self.glm = locking.GanetiLockManager( - self.cfg.GetNodeList(), - self.cfg.GetNodeGroupList(), - self.cfg.GetInstanceList()) + self.cfg.GetNodeList(), + self.cfg.GetNodeGroupList(), + self.cfg.GetInstanceList(), + self.cfg.GetNetworkList()) self.cfg.SetContext(self) @@ -516,18 +544,36 @@ class GanetiContext(object): self.glm.remove(locking.LEVEL_NODE_RES, name) -def _SetWatcherPause(until): +def _SetWatcherPause(context, until): """Creates or removes the watcher pause file. + @type context: L{GanetiContext} + @param context: Global Ganeti context @type until: None or int @param until: Unix timestamp saying until when the watcher shouldn't run """ + node_names = context.cfg.GetNodeList() + if until is None: - utils.RemoveFile(constants.WATCHER_PAUSEFILE) + logging.info("Received request to no longer pause watcher") else: - utils.WriteFile(constants.WATCHER_PAUSEFILE, - data="%d\n" % (until, )) + if not ht.TNumber(until): + raise TypeError("Duration must be numeric") + + if until < time.time(): + raise errors.GenericError("Unable to set pause end time in the past") + + logging.info("Received request to pause watcher until %s", until) + + result = context.rpc.call_set_watcher_pause(node_names, until) + + errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg) + for (node_name, nres) in result.items() + if nres.fail_msg and not nres.offline) + if errmsg: + raise errors.OpExecError("Watcher pause was set where possible, but failed" + " on the following node(s): %s" % errmsg) return until @@ -686,10 +732,10 @@ def PrepMasterd(options, _): """ # This is safe to do as the pid file guarantees against # concurrent execution. - utils.RemoveFile(constants.MASTER_SOCKET) + utils.RemoveFile(pathutils.MASTER_SOCKET) mainloop = daemon.Mainloop() - master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid) + master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid) return (mainloop, master) @@ -709,7 +755,7 @@ def ExecMasterd(options, args, prep_data): # pylint: disable=W0613 finally: rpc.Shutdown() finally: - utils.RemoveFile(constants.MASTER_SOCKET) + utils.RemoveFile(pathutils.MASTER_SOCKET) logging.info("Clean master daemon shutdown")