Fix RPC result handling in _AssembleInstanceDisks
[ganeti-local] / lib / cli.py
index dde5936..0d5a5db 100644 (file)
@@ -50,9 +50,8 @@ __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
            "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
            "FormatError", "SplitNodeOption", "SubmitOrSend",
            "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
-           "ValidateBeParams",
-           "ToStderr", "ToStdout",
-           "UsesRPC",
+           "ToStderr", "ToStdout", "UsesRPC",
+           "GetOnlineNodes", "JobExecutor", "SYNC_OPT",
            ]
 
 
@@ -191,6 +190,11 @@ SUBMIT_OPT = make_option("--submit", dest="submit_only",
                          help="Submit the job and return the job ID, but"
                          " don't wait for the job to finish")
 
+SYNC_OPT = make_option("--sync", dest="do_locking",
+                       default=False, action="store_true",
+                       help="Grab locks while doing the queries"
+                       " in order to ensure more consistent results")
+
 
 def ARGS_FIXED(val):
   """Macro-like function denoting a fixed number of arguments"""
@@ -314,15 +318,15 @@ keyval_option = KeyValOption
 
 
 def _ParseArgs(argv, commands, aliases):
-  """Parses the command line and return the function which must be
-  executed together with its arguments
+  """Parser for the command line arguments.
 
-  Arguments:
-    argv: the command line
+  This function parses the arguements and returns the function which
+  must be executed together with its (modified) arguments.
 
-    commands: dictionary with special contents, see the design doc for
-    cmdline handling
-    aliases: dictionary with command aliases {'alias': 'target, ...}
+  @param argv: the command line
+  @param commands: dictionary with special contents, see the design
+      doc for cmdline handling
+  @param aliases: dictionary with command aliases {'alias': 'target, ...}
 
   """
   if len(argv) == 0:
@@ -405,27 +409,6 @@ def SplitNodeOption(value):
     return (value, None)
 
 
-def ValidateBeParams(bep):
-  """Parse and check the given beparams.
-
-  The function will update in-place the given dictionary.
-
-  @type bep: dict
-  @param bep: input beparams
-  @raise errors.ParameterError: if the input values are not OK
-  @raise errors.UnitParseError: if the input values are not OK
-
-  """
-  if constants.BE_MEMORY in bep:
-    bep[constants.BE_MEMORY] = utils.ParseUnit(bep[constants.BE_MEMORY])
-
-  if constants.BE_VCPUS in bep:
-    try:
-      bep[constants.BE_VCPUS] = int(bep[constants.BE_VCPUS])
-    except ValueError:
-      raise errors.ParameterError("Invalid number of VCPUs")
-
-
 def UsesRPC(fn):
   def wrapper(*args, **kwargs):
     rpc.Init()
@@ -439,17 +422,16 @@ def UsesRPC(fn):
 def AskUser(text, choices=None):
   """Ask the user a question.
 
-  Args:
-    text - the question to ask.
+  @param text: the question to ask
 
-    choices - list with elements tuples (input_char, return_value,
-    description); if not given, it will default to: [('y', True,
-    'Perform the operation'), ('n', False, 'Do no do the operation')];
-    note that the '?' char is reserved for help
+  @param choices: list with elements tuples (input_char, return_value,
+      description); if not given, it will default to: [('y', True,
+      'Perform the operation'), ('n', False, 'Do no do the operation')];
+      note that the '?' char is reserved for help
 
-  Returns: one of the return values from the choices list; if input is
-  not possible (i.e. not running with a tty, we return the last entry
-  from the list
+  @return: one of the return values from the choices list; if input is
+      not possible (i.e. not running with a tty, we return the last
+      entry from the list
 
   """
   if choices is None:
@@ -556,7 +538,8 @@ def PollJob(job_id, cl=None, feedback_fn=None):
         if callable(feedback_fn):
           feedback_fn(log_entry[1:])
         else:
-          print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), message)
+          encoded = utils.SafeEncode(message)
+          print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), encoded)
         prev_logmsg_serial = max(prev_logmsg_serial, serial)
 
     # TODO: Handle canceled and archived jobs
@@ -688,6 +671,11 @@ def FormatError(err):
   elif isinstance(err, errors.JobQueueDrainError):
     obuf.write("Failure: the job queue is marked for drain and doesn't"
                " accept new requests\n")
+  elif isinstance(err, errors.JobQueueFull):
+    obuf.write("Failure: the job queue is full and doesn't accept new"
+               " job submissions until old jobs are archived\n")
+  elif isinstance(err, errors.TypeEnforcementError):
+    obuf.write("Parameter Error: %s" % msg)
   elif isinstance(err, errors.GenericError):
     obuf.write("Unhandled Ganeti error: %s" % msg)
   elif isinstance(err, luxi.NoMasterError):
@@ -919,6 +907,32 @@ def ParseTimespec(value):
   return value
 
 
+def GetOnlineNodes(nodes, cl=None, nowarn=False):
+  """Returns the names of online nodes.
+
+  This function will also log a warning on stderr with the names of
+  the online nodes.
+
+  @param nodes: if not empty, use only this subset of nodes (minus the
+      offline ones)
+  @param cl: if not None, luxi client to use
+  @type nowarn: boolean
+  @param nowarn: by default, this function will output a note with the
+      offline nodes that are skipped; if this parameter is True the
+      note is not displayed
+
+  """
+  if cl is None:
+    cl = GetClient()
+
+  result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
+                         use_locking=False)
+  offline = [row[0] for row in result if row[1]]
+  if offline and not nowarn:
+    ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
+  return [row[0] for row in result if not row[1]]
+
+
 def _ToStream(stream, txt, *args):
   """Write a message to a stream, bypassing the logging system
 
@@ -959,3 +973,67 @@ def ToStderr(txt, *args):
 
   """
   _ToStream(sys.stderr, txt, *args)
+
+
+class JobExecutor(object):
+  """Class which manages the submission and execution of multiple jobs.
+
+  Note that instances of this class should not be reused between
+  GetResults() calls.
+
+  """
+  def __init__(self, cl=None, verbose=True):
+    self.queue = []
+    if cl is None:
+      cl = GetClient()
+    self.cl = cl
+    self.verbose = verbose
+
+  def QueueJob(self, name, *ops):
+    """Submit a job for execution.
+
+    @type name: string
+    @param name: a description of the job, will be used in WaitJobSet
+    """
+    job_id = SendJob(ops, cl=self.cl)
+    self.queue.append((job_id, name))
+
+  def GetResults(self):
+    """Wait for and return the results of all jobs.
+
+    @rtype: list
+    @return: list of tuples (success, job results), in the same order
+        as the submitted jobs; if a job has failed, instead of the result
+        there will be the error message
+
+    """
+    results = []
+    if self.verbose:
+      ToStdout("Submitted jobs %s", ", ".join(row[0] for row in self.queue))
+    for jid, name in self.queue:
+      if self.verbose:
+        ToStdout("Waiting for job %s for %s...", jid, name)
+      try:
+        job_result = PollJob(jid, cl=self.cl)
+        success = True
+      except (errors.GenericError, luxi.ProtocolError), err:
+        _, job_result = FormatError(err)
+        success = False
+        # the error message will always be shown, verbose or not
+        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
+
+      results.append((success, job_result))
+    return results
+
+  def WaitOrShow(self, wait):
+    """Wait for job results or only print the job IDs.
+
+    @type wait: boolean
+    @param wait: whether to wait or not
+
+    """
+    if wait:
+      return self.GetResults()
+    else:
+      for jid, name in self.queue:
+        ToStdout("%s: %s", jid, name)