#
#
-# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
from ganeti import netutils
from ganeti import objects
from ganeti import query
+from ganeti import runtime
+from ganeti import pathutils
+from ganeti import ht
CLIENT_REQUEST_WORKERS = 16
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
+def _LogNewJob(status, info, ops):
+ """Log information about a recently submitted job.
+
+ """
+ op_summary = utils.CommaJoin(op.Summary() for op in ops)
+
+ if status:
+ logging.info("New job with id %s, summary: %s", info, op_summary)
+ else:
+ logging.info("Failed to submit job, reason: '%s', summary: %s",
+ info, op_summary)
+
+
class ClientRequestWorker(workerpool.BaseWorker):
# pylint: disable=W0221
def RunTask(self, server, message, client):
self.server = server
def handle_request(self, method, args): # pylint: disable=R0911
- queue = self.server.context.jobqueue
+ context = self.server.context
+ queue = context.jobqueue
# TODO: Parameter validation
if not isinstance(args, (tuple, list)):
logging.info("Received invalid arguments of type '%s'", type(args))
raise ValueError("Invalid arguments type '%s'" % type(args))
+ if method not in luxi.REQ_ALL:
+ logging.info("Received invalid request '%s'", method)
+ raise ValueError("Invalid operation '%s'" % method)
+
# TODO: Rewrite to not exit in each 'if/elif' branch
if method == luxi.REQ_SUBMIT_JOB:
- logging.info("Received new job")
- ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
- return queue.SubmitJob(ops)
-
- if method == luxi.REQ_SUBMIT_MANY_JOBS:
- logging.info("Received multiple jobs")
+ logging.info("Receiving new job")
+ (job_def, ) = args
+ ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
+ job_id = queue.SubmitJob(ops)
+ _LogNewJob(True, job_id, ops)
+ return job_id
+
+ elif method == luxi.REQ_SUBMIT_MANY_JOBS:
+ logging.info("Receiving multiple jobs")
+ (job_defs, ) = args
jobs = []
- for ops in args:
+ for ops in job_defs:
jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
- return queue.SubmitManyJobs(jobs)
+ job_ids = queue.SubmitManyJobs(jobs)
+ for ((status, job_id), ops) in zip(job_ids, jobs):
+ _LogNewJob(status, job_id, ops)
+ return job_ids
elif method == luxi.REQ_CANCEL_JOB:
(job_id, ) = args
logging.info("Received job cancel request for %s", job_id)
return queue.CancelJob(job_id)
+ elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
+ (job_id, priority) = args
+ logging.info("Received request to change priority for job %s to %s",
+ job_id, priority)
+ return queue.ChangeJobPriority(job_id, priority)
+
elif method == luxi.REQ_ARCHIVE_JOB:
(job_id, ) = args
logging.info("Received job archive request for %s", job_id)
return queue.ArchiveJob(job_id)
- elif method == luxi.REQ_AUTOARCHIVE_JOBS:
+ elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
(age, timeout) = args
logging.info("Received job autoarchive request for age %s, timeout %s",
age, timeout)
elif method == luxi.REQ_QUERY:
(what, fields, qfilter) = args
- req = objects.QueryRequest(what=what, fields=fields, qfilter=qfilter)
-
- if req.what in constants.QR_VIA_OP:
- result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
- qfilter=req.qfilter))
- elif req.what == constants.QR_LOCK:
- if req.qfilter is not None:
- raise errors.OpPrereqError("Lock queries can't be filtered")
- return self.server.context.glm.QueryLocks(req.fields)
- elif req.what in constants.QR_VIA_LUXI:
+
+ if what in constants.QR_VIA_OP:
+ result = self._Query(opcodes.OpQuery(what=what, fields=fields,
+ qfilter=qfilter))
+ elif what == constants.QR_LOCK:
+ if qfilter is not None:
+ raise errors.OpPrereqError("Lock queries can't be filtered",
+ errors.ECODE_INVAL)
+ return context.glm.QueryLocks(fields)
+ elif what == constants.QR_JOB:
+ return queue.QueryJobs(fields, qfilter)
+ elif what in constants.QR_VIA_LUXI:
raise NotImplementedError
else:
- raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
+ raise errors.OpPrereqError("Resource type '%s' unknown" % what,
errors.ECODE_INVAL)
return result
else:
msg = str(job_ids)
logging.info("Received job query request for %s", msg)
- return queue.QueryJobs(job_ids, fields)
+ return queue.OldStyleQueryJobs(job_ids, fields)
elif method == luxi.REQ_QUERY_INSTANCES:
(names, fields, use_locking) = args
op = opcodes.OpGroupQuery(names=names, output_fields=fields)
return self._Query(op)
+ elif method == luxi.REQ_QUERY_NETWORKS:
+ (names, fields, use_locking) = args
+ logging.info("Received network query request for %s", names)
+ if use_locking:
+ raise errors.OpPrereqError("Sync queries are not allowed",
+ errors.ECODE_INVAL)
+ op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
+ return self._Query(op)
+
elif method == luxi.REQ_QUERY_EXPORTS:
(nodes, use_locking) = args
if use_locking:
elif method == luxi.REQ_QUERY_TAGS:
(kind, name) = args
logging.info("Received tags query request")
- op = opcodes.OpTagsGet(kind=kind, name=name)
+ op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
return self._Query(op)
- elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
+ elif method == luxi.REQ_SET_DRAIN_FLAG:
(drain_flag, ) = args
logging.info("Received queue drain flag change request to %s",
drain_flag)
elif method == luxi.REQ_SET_WATCHER_PAUSE:
(until, ) = args
- if until is None:
- logging.info("Received request to no longer pause the watcher")
- else:
- if not isinstance(until, (int, float)):
- raise TypeError("Duration must be an integer or float")
-
- if until < time.time():
- raise errors.GenericError("Unable to set pause end time in the past")
-
- logging.info("Received request to pause the watcher until %s", until)
-
- return _SetWatcherPause(until)
+ return _SetWatcherPause(context, until)
else:
- logging.info("Received invalid request '%s'", method)
- raise ValueError("Invalid operation '%s'" % method)
+ logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
+ raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
+ " but not implemented" % method)
def _Query(self, op):
"""Runs the specified opcode and returns the result.
"""
# Queries don't have a job id
- proc = mcpu.Processor(self.server.context, None)
+ proc = mcpu.Processor(self.server.context, None, enable_locks=False)
# TODO: Executing an opcode using locks will acquire them in blocking mode.
# Consider using a timeout for retries.
# Locking manager
self.glm = locking.GanetiLockManager(
- self.cfg.GetNodeList(),
- self.cfg.GetNodeGroupList(),
- self.cfg.GetInstanceList())
+ self.cfg.GetNodeList(),
+ self.cfg.GetNodeGroupList(),
+ self.cfg.GetInstanceList(),
+ self.cfg.GetNetworkList())
self.cfg.SetContext(self)
self.glm.remove(locking.LEVEL_NODE_RES, name)
-def _SetWatcherPause(until):
+def _SetWatcherPause(context, until):
"""Creates or removes the watcher pause file.
+ @type context: L{GanetiContext}
+ @param context: Global Ganeti context
@type until: None or int
@param until: Unix timestamp saying until when the watcher shouldn't run
"""
+ node_names = context.cfg.GetNodeList()
+
if until is None:
- utils.RemoveFile(constants.WATCHER_PAUSEFILE)
+ logging.info("Received request to no longer pause watcher")
else:
- utils.WriteFile(constants.WATCHER_PAUSEFILE,
- data="%d\n" % (until, ))
+ if not ht.TNumber(until):
+ raise TypeError("Duration must be numeric")
+
+ if until < time.time():
+ raise errors.GenericError("Unable to set pause end time in the past")
+
+ logging.info("Received request to pause watcher until %s", until)
+
+ result = context.rpc.call_set_watcher_pause(node_names, until)
+
+ errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
+ for (node_name, nres) in result.items()
+ if nres.fail_msg and not nres.offline)
+ if errmsg:
+ raise errors.OpExecError("Watcher pause was set where possible, but failed"
+ " on the following node(s): %s" % errmsg)
return until
(constants.MASTERD_USER, constants.DAEMONS_GROUP))
sys.exit(constants.EXIT_FAILURE)
+ # Determine static runtime architecture information
+ runtime.InitArchInfo()
+
# Check the configuration is sane before anything else
try:
config.ConfigWriter()
"""
# This is safe to do as the pid file guarantees against
# concurrent execution.
- utils.RemoveFile(constants.MASTER_SOCKET)
+ utils.RemoveFile(pathutils.MASTER_SOCKET)
mainloop = daemon.Mainloop()
- master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid)
+ master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
return (mainloop, master)
finally:
rpc.Shutdown()
finally:
- utils.RemoveFile(constants.MASTER_SOCKET)
+ utils.RemoveFile(pathutils.MASTER_SOCKET)
logging.info("Clean master daemon shutdown")