"ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
"FormatError", "SplitNodeOption", "SubmitOrSend",
"JobSubmittedException", "FormatTimestamp", "ParseTimespec",
- "ValidateBeParams",
- "ToStderr", "ToStdout",
- "UsesRPC",
+ "ToStderr", "ToStdout", "UsesRPC",
+ "GetOnlineNodes", "JobExecutor", "SYNC_OPT",
]
help="Submit the job and return the job ID, but"
" don't wait for the job to finish")
+SYNC_OPT = make_option("--sync", dest="do_locking",
+ default=False, action="store_true",
+ help="Grab locks while doing the queries"
+ " in order to ensure more consistent results")
+
def ARGS_FIXED(val):
"""Macro-like function denoting a fixed number of arguments"""
return (value, None)
-def ValidateBeParams(bep):
- """Parse and check the given beparams.
-
- The function will update in-place the given dictionary.
-
- @type bep: dict
- @param bep: input beparams
- @raise errors.ParameterError: if the input values are not OK
- @raise errors.UnitParseError: if the input values are not OK
-
- """
- if constants.BE_MEMORY in bep:
- bep[constants.BE_MEMORY] = utils.ParseUnit(bep[constants.BE_MEMORY])
-
- if constants.BE_VCPUS in bep:
- try:
- bep[constants.BE_VCPUS] = int(bep[constants.BE_VCPUS])
- except ValueError:
- raise errors.ParameterError("Invalid number of VCPUs")
-
-
def UsesRPC(fn):
def wrapper(*args, **kwargs):
rpc.Init()
if callable(feedback_fn):
feedback_fn(log_entry[1:])
else:
- print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), message)
+ encoded = utils.SafeEncode(message)
+ print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), encoded)
prev_logmsg_serial = max(prev_logmsg_serial, serial)
# TODO: Handle canceled and archived jobs
elif isinstance(err, errors.JobQueueFull):
obuf.write("Failure: the job queue is full and doesn't accept new"
" job submissions until old jobs are archived\n")
+ elif isinstance(err, errors.TypeEnforcementError):
+ obuf.write("Parameter Error: %s" % msg)
elif isinstance(err, errors.GenericError):
obuf.write("Unhandled Ganeti error: %s" % msg)
elif isinstance(err, luxi.NoMasterError):
return value
+def GetOnlineNodes(nodes, cl=None, nowarn=False):
+ """Returns the names of online nodes.
+
+ This function will also log a warning on stderr with the names of
+ the online nodes.
+
+ @param nodes: if not empty, use only this subset of nodes (minus the
+ offline ones)
+ @param cl: if not None, luxi client to use
+ @type nowarn: boolean
+ @param nowarn: by default, this function will output a note with the
+ offline nodes that are skipped; if this parameter is True the
+ note is not displayed
+
+ """
+ if cl is None:
+ cl = GetClient()
+
+ result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
+ use_locking=False)
+ offline = [row[0] for row in result if row[1]]
+ if offline and not nowarn:
+ ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
+ return [row[0] for row in result if not row[1]]
+
+
def _ToStream(stream, txt, *args):
"""Write a message to a stream, bypassing the logging system
"""
_ToStream(sys.stderr, txt, *args)
+
+
+class JobExecutor(object):
+ """Class which manages the submission and execution of multiple jobs.
+
+ Note that instances of this class should not be reused between
+ GetResults() calls.
+
+ """
+ def __init__(self, cl=None, verbose=True):
+ self.queue = []
+ if cl is None:
+ cl = GetClient()
+ self.cl = cl
+ self.verbose = verbose
+
+ def QueueJob(self, name, *ops):
+ """Submit a job for execution.
+
+ @type name: string
+ @param name: a description of the job, will be used in WaitJobSet
+ """
+ job_id = SendJob(ops, cl=self.cl)
+ self.queue.append((job_id, name))
+
+ def GetResults(self):
+ """Wait for and return the results of all jobs.
+
+ @rtype: list
+ @return: list of tuples (success, job results), in the same order
+ as the submitted jobs; if a job has failed, instead of the result
+ there will be the error message
+
+ """
+ results = []
+ if self.verbose:
+ ToStdout("Submitted jobs %s", ", ".join(row[0] for row in self.queue))
+ for jid, name in self.queue:
+ if self.verbose:
+ ToStdout("Waiting for job %s for %s...", jid, name)
+ try:
+ job_result = PollJob(jid, cl=self.cl)
+ success = True
+ except (errors.GenericError, luxi.ProtocolError), err:
+ _, job_result = FormatError(err)
+ success = False
+ # the error message will always be shown, verbose or not
+ ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
+
+ results.append((success, job_result))
+ return results
+
+ def WaitOrShow(self, wait):
+ """Wait for job results or only print the job IDs.
+
+ @type wait: boolean
+ @param wait: whether to wait or not
+
+ """
+ if wait:
+ return self.GetResults()
+ else:
+ for jid, name in self.queue:
+ ToStdout("%s: %s", jid, name)