from ganeti import utils
from ganeti import logger
from ganeti import errors
-from ganeti import mcpu
from ganeti import constants
from ganeti import opcodes
from ganeti import luxi
+from ganeti import ssconf
from optparse import (OptionParser, make_option, TitledHelpFormatter,
- Option, OptionValueError, SUPPRESS_HELP)
+ Option, OptionValueError)
__all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
- "SubmitOpCode", "SubmitJob", "SubmitQuery",
+ "SubmitOpCode", "GetClient",
"cli_option", "GenerateTable", "AskUser",
"ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
- "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT",
+ "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
"ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
- "FormatError", "SplitNodeOption"
+ "FormatError", "SplitNodeOption", "SubmitOrSend",
+ "JobSubmittedException",
]
TAG_SRC_OPT = make_option("--from", dest="tags_source",
default=None, help="File with tag names")
+SUBMIT_OPT = make_option("--submit", dest="submit_only",
+ default=False, action="store_true",
+ help="Submit the job and return the job ID, but"
+ " don't wait for the job to finish")
+
def ARGS_FIXED(val):
"""Macro-like function denoting a fixed number of arguments"""
return answer
-def SubmitOpCode(op, proc=None, feedback_fn=None):
- """Function to submit an opcode.
+class JobSubmittedException(Exception):
+ """Job was submitted, client should exit.
- This is just a simple wrapper over the construction of the processor
- instance. It should be extended to better handle feedback and
- interaction functions.
+ This exception has one argument, the ID of the job that was
+ submitted. The handler should print this ID.
+
+ This is not an error, just a structured way to exit from clients.
"""
- # TODO: Fix feedback_fn situation.
- cl = luxi.Client()
- job = opcodes.Job(op_list=[op])
- jid = SubmitJob(job)
- query = {
- "object": "jobs",
- "fields": ["status"],
- "names": [jid],
- }
+
+def SendJob(ops, cl=None):
+ """Function to submit an opcode without waiting for the results.
+
+ @type ops: list
+ @param ops: list of opcodes
+ @type cl: luxi.Client
+ @param cl: the luxi client to use for communicating with the master;
+ if None, a new client will be created
+
+ """
+ if cl is None:
+ cl = GetClient()
+
+ job_id = cl.SubmitJob(ops)
+
+ return job_id
+
+
+def PollJob(job_id, cl=None, feedback_fn=None):
+ """Function to poll for the result of a job.
+
+ @type job_id: job identified
+ @param job_id: the job to poll for results
+ @type cl: luxi.Client
+ @param cl: the luxi client to use for communicating with the master;
+ if None, a new client will be created
+
+ """
+ if cl is None:
+ cl = GetClient()
+
+ prev_job_info = None
+ prev_logmsg_serial = None
while True:
- jdata = SubmitQuery(query)
- if not jdata:
+ result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
+ prev_logmsg_serial)
+ if not result:
# job not found, go away!
- raise errors.JobLost("Job with id %s lost" % jid)
+ raise errors.JobLost("Job with id %s lost" % job_id)
+
+ # Split result, a tuple of (field values, log entries)
+ (job_info, log_entries) = result
+ (status, ) = job_info
+
+ if log_entries:
+ for log_entry in log_entries:
+ (serial, timestamp, _, message) = log_entry
+ if callable(feedback_fn):
+ feedback_fn(log_entry[1:])
+ else:
+ print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), message)
+ prev_logmsg_serial = max(prev_logmsg_serial, serial)
- status = jdata[0][0]
- if status in (opcodes.Job.STATUS_SUCCESS, opcodes.Job.STATUS_FAIL):
+ # TODO: Handle canceled and archived jobs
+ elif status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
break
- time.sleep(1)
- query["fields"].extend(["op_list", "op_status", "op_result"])
- jdata = SubmitQuery(query)
- if not jdata:
- raise errors.JobLost("Job with id %s lost" % jid)
- status, op_list, op_status, op_result = jdata[0]
- if status != opcodes.Job.STATUS_SUCCESS:
- raise errors.OpExecError(op_result[0])
- return op_result[0]
+ prev_job_info = job_info
+ jobs = cl.QueryJobs([job_id], ["status", "opresult"])
+ if not jobs:
+ raise errors.JobLost("Job with id %s lost" % job_id)
-def SubmitJob(job, cl=None):
- if cl is None:
- cl = luxi.Client()
- return cl.SubmitJob(job)
+ status, result = jobs[0]
+ if status == constants.JOB_STATUS_SUCCESS:
+ return result[0]
+ else:
+ raise errors.OpExecError(result)
-def SubmitQuery(data, cl=None):
+def SubmitOpCode(op, cl=None, feedback_fn=None):
+ """Legacy function to submit an opcode.
+
+ This is just a simple wrapper over the construction of the processor
+ instance. It should be extended to better handle feedback and
+ interaction functions.
+
+ """
if cl is None:
- cl = luxi.Client()
- return cl.Query(data)
+ cl = GetClient()
+
+ job_id = SendJob([op], cl)
+
+ return PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
+
+
+def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
+ """Wrapper around SubmitOpCode or SendJob.
+
+ This function will decide, based on the 'opts' parameter, whether to
+ submit and wait for the result of the opcode (and return it), or
+ whether to just send the job and print its identifier. It is used in
+ order to simplify the implementation of the '--submit' option.
+
+ """
+ if opts and opts.submit_only:
+ job_id = SendJob([op], cl=cl)
+ raise JobSubmittedException(job_id)
+ else:
+ return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
+
+
+def GetClient():
+ # TODO: Cache object?
+ try:
+ client = luxi.Client()
+ except luxi.NoMasterError:
+ master, myself = ssconf.GetMasterAndMyself()
+ if master != myself:
+ raise errors.OpPrereqError("This is not the master node, please connect"
+ " to node '%s' and rerun the command" %
+ master)
+ else:
+ raise
+ return client
def FormatError(err):
obuf.write("Unhandled Ganeti error: %s" % msg)
elif isinstance(err, luxi.NoMasterError):
obuf.write("Cannot communicate with the master daemon.\nIs it running"
- " and listening on '%s'?" % err.args[0])
+ " and listening for connections?")
elif isinstance(err, luxi.TimeoutError):
obuf.write("Timeout while talking to the master daemon. Error:\n"
"%s" % msg)
elif isinstance(err, luxi.ProtocolError):
obuf.write("Unhandled protocol error while talking to the master daemon:\n"
"%s" % msg)
+ elif isinstance(err, JobSubmittedException):
+ obuf.write("JobID: %s\n" % err.args[0])
+ retcode = 0
else:
obuf.write("Unhandled exception: %s" % msg)
return retcode, obuf.getvalue().rstrip('\n')
for key, val in override.iteritems():
setattr(options, key, val)
- logger.SetupLogging(program=binary, debug=options.debug)
+ logger.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
+ stderr_logging=True, program=binary)
utils.debug = options.debug