Remove a TODO
[ganeti-local] / lib / cli.py
index 00c6dfd..1932a00 100644 (file)
@@ -37,6 +37,7 @@ from ganeti import luxi
 from ganeti import ssconf
 from ganeti import rpc
 from ganeti import ssh
+from ganeti import compat
 
 from optparse import (OptionParser, TitledHelpFormatter,
                       Option, OptionValueError)
@@ -44,6 +45,7 @@ from optparse import (OptionParser, TitledHelpFormatter,
 
 __all__ = [
   # Command line options
+  "ADD_UIDS_OPT",
   "ALLOCATABLE_OPT",
   "ALL_OPT",
   "AUTO_PROMOTE_OPT",
@@ -72,6 +74,7 @@ __all__ = [
   "HVOPTS_OPT",
   "HYPERVISOR_OPT",
   "IALLOCATOR_OPT",
+  "IDENTIFY_DEFAULTS_OPT",
   "IGNORE_CONSIST_OPT",
   "IGNORE_FAILURES_OPT",
   "IGNORE_REMOVE_FAILURES_OPT",
@@ -114,6 +117,8 @@ __all__ = [
   "READD_OPT",
   "REBOOT_TYPE_OPT",
   "REMOVE_INSTANCE_OPT",
+  "REMOVE_UIDS_OPT",
+  "ROMAN_OPT",
   "SECONDARY_IP_OPT",
   "SELECT_OS_OPT",
   "SEP_OPT",
@@ -127,6 +132,7 @@ __all__ = [
   "SYNC_OPT",
   "TAG_SRC_OPT",
   "TIMEOUT_OPT",
+  "UIDPOOL_OPT",
   "USEUNITS_OPT",
   "USE_REPL_NET_OPT",
   "VERBOSE_OPT",
@@ -948,6 +954,36 @@ MAINTAIN_NODE_HEALTH_OPT = \
                " health, by shutting down unknown instances, shutting down"
                " unknown DRBD devices, etc.")
 
+IDENTIFY_DEFAULTS_OPT = \
+    cli_option("--identify-defaults", dest="identify_defaults",
+               default=False, action="store_true",
+               help="Identify which saved instance parameters are equal to"
+               " the current cluster defaults and set them as such, instead"
+               " of marking them as overridden")
+
+UIDPOOL_OPT = cli_option("--uid-pool", default=None,
+                         action="store", dest="uid_pool",
+                         help=("A list of user-ids or user-id"
+                               " ranges separated by commas"))
+
+ADD_UIDS_OPT = cli_option("--add-uids", default=None,
+                          action="store", dest="add_uids",
+                          help=("A list of user-ids or user-id"
+                                " ranges separated by commas, to be"
+                                " added to the user-id pool"))
+
+REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
+                             action="store", dest="remove_uids",
+                             help=("A list of user-ids or user-id"
+                                   " ranges separated by commas, to be"
+                                   " removed from the user-id pool"))
+
+ROMAN_OPT = cli_option("--roman",
+                       dest="roman_integers", default=False,
+                       action="store_true",
+                       help="Use roman numbers for positive integers")
+
+
 
 def _ParseArgs(argv, commands, aliases):
   """Parser for the command line arguments.
@@ -1221,41 +1257,31 @@ def SendJob(ops, cl=None):
   return job_id
 
 
-def PollJob(job_id, cl=None, feedback_fn=None):
-  """Function to poll for the result of a job.
+def GenericPollJob(job_id, cbs, report_cbs):
+  """Generic job-polling function.
 
-  @type job_id: job identified
-  @param job_id: the job to poll for results
-  @type cl: luxi.Client
-  @param cl: the luxi client to use for communicating with the master;
-             if None, a new client will be created
+  @type job_id: number
+  @param job_id: Job ID
+  @type cbs: Instance of L{JobPollCbBase}
+  @param cbs: Data callbacks
+  @type report_cbs: Instance of L{JobPollReportCbBase}
+  @param report_cbs: Reporting callbacks
 
   """
-  if cl is None:
-    cl = GetClient()
-
   prev_job_info = None
   prev_logmsg_serial = None
 
   status = None
 
-  notified_queued = False
-  notified_waitlock = False
-
   while True:
-    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
-                                     prev_logmsg_serial)
+    result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
+                                      prev_logmsg_serial)
     if not result:
       # job not found, go away!
       raise errors.JobLost("Job with id %s lost" % job_id)
-    elif result == constants.JOB_NOTCHANGED:
-      if status is not None and not callable(feedback_fn):
-        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
-          ToStderr("Job %s is waiting in queue", job_id)
-          notified_queued = True
-        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
-          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
-          notified_waitlock = True
+
+    if result == constants.JOB_NOTCHANGED:
+      report_cbs.ReportNotChanged(job_id, status)
 
       # Wait again
       continue
@@ -1266,12 +1292,9 @@ def PollJob(job_id, cl=None, feedback_fn=None):
 
     if log_entries:
       for log_entry in log_entries:
-        (serial, timestamp, _, message) = log_entry
-        if callable(feedback_fn):
-          feedback_fn(log_entry[1:])
-        else:
-          encoded = utils.SafeEncode(message)
-          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
+        (serial, timestamp, log_type, message) = log_entry
+        report_cbs.ReportLogMessage(job_id, serial, timestamp,
+                                    log_type, message)
         prev_logmsg_serial = max(prev_logmsg_serial, serial)
 
     # TODO: Handle canceled and archived jobs
@@ -1283,30 +1306,189 @@ def PollJob(job_id, cl=None, feedback_fn=None):
 
     prev_job_info = job_info
 
-  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
+  jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
   if not jobs:
     raise errors.JobLost("Job with id %s lost" % job_id)
 
   status, opstatus, result = jobs[0]
+
   if status == constants.JOB_STATUS_SUCCESS:
     return result
-  elif status in (constants.JOB_STATUS_CANCELING,
-                  constants.JOB_STATUS_CANCELED):
+
+  if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
     raise errors.OpExecError("Job was canceled")
+
+  has_ok = False
+  for idx, (status, msg) in enumerate(zip(opstatus, result)):
+    if status == constants.OP_STATUS_SUCCESS:
+      has_ok = True
+    elif status == constants.OP_STATUS_ERROR:
+      errors.MaybeRaise(msg)
+
+      if has_ok:
+        raise errors.OpExecError("partial failure (opcode %d): %s" %
+                                 (idx, msg))
+
+      raise errors.OpExecError(str(msg))
+
+  # default failure mode
+  raise errors.OpExecError(result)
+
+
+class JobPollCbBase:
+  """Base class for L{GenericPollJob} callbacks.
+
+  """
+  def __init__(self):
+    """Initializes this class.
+
+    """
+
+  def WaitForJobChangeOnce(self, job_id, fields,
+                           prev_job_info, prev_log_serial):
+    """Waits for changes on a job.
+
+    """
+    raise NotImplementedError()
+
+  def QueryJobs(self, job_ids, fields):
+    """Returns the selected fields for the selected job IDs.
+
+    @type job_ids: list of numbers
+    @param job_ids: Job IDs
+    @type fields: list of strings
+    @param fields: Fields
+
+    """
+    raise NotImplementedError()
+
+
+class JobPollReportCbBase:
+  """Base class for L{GenericPollJob} reporting callbacks.
+
+  """
+  def __init__(self):
+    """Initializes this class.
+
+    """
+
+  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+    """Handles a log message.
+
+    """
+    raise NotImplementedError()
+
+  def ReportNotChanged(self, job_id, status):
+    """Called for if a job hasn't changed in a while.
+
+    @type job_id: number
+    @param job_id: Job ID
+    @type status: string or None
+    @param status: Job status if available
+
+    """
+    raise NotImplementedError()
+
+
+class _LuxiJobPollCb(JobPollCbBase):
+  def __init__(self, cl):
+    """Initializes this class.
+
+    """
+    JobPollCbBase.__init__(self)
+    self.cl = cl
+
+  def WaitForJobChangeOnce(self, job_id, fields,
+                           prev_job_info, prev_log_serial):
+    """Waits for changes on a job.
+
+    """
+    return self.cl.WaitForJobChangeOnce(job_id, fields,
+                                        prev_job_info, prev_log_serial)
+
+  def QueryJobs(self, job_ids, fields):
+    """Returns the selected fields for the selected job IDs.
+
+    """
+    return self.cl.QueryJobs(job_ids, fields)
+
+
+class FeedbackFnJobPollReportCb(JobPollReportCbBase):
+  def __init__(self, feedback_fn):
+    """Initializes this class.
+
+    """
+    JobPollReportCbBase.__init__(self)
+
+    self.feedback_fn = feedback_fn
+
+    assert callable(feedback_fn)
+
+  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+    """Handles a log message.
+
+    """
+    self.feedback_fn((timestamp, log_type, log_msg))
+
+  def ReportNotChanged(self, job_id, status):
+    """Called if a job hasn't changed in a while.
+
+    """
+    # Ignore
+
+
+class StdioJobPollReportCb(JobPollReportCbBase):
+  def __init__(self):
+    """Initializes this class.
+
+    """
+    JobPollReportCbBase.__init__(self)
+
+    self.notified_queued = False
+    self.notified_waitlock = False
+
+  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+    """Handles a log message.
+
+    """
+    ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
+             utils.SafeEncode(log_msg))
+
+  def ReportNotChanged(self, job_id, status):
+    """Called if a job hasn't changed in a while.
+
+    """
+    if status is None:
+      return
+
+    if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
+      ToStderr("Job %s is waiting in queue", job_id)
+      self.notified_queued = True
+
+    elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
+      ToStderr("Job %s is trying to acquire all necessary locks", job_id)
+      self.notified_waitlock = True
+
+
+def PollJob(job_id, cl=None, feedback_fn=None):
+  """Function to poll for the result of a job.
+
+  @type job_id: job identified
+  @param job_id: the job to poll for results
+  @type cl: luxi.Client
+  @param cl: the luxi client to use for communicating with the master;
+             if None, a new client will be created
+
+  """
+  if cl is None:
+    cl = GetClient()
+
+  if feedback_fn:
+    reporter = FeedbackFnJobPollReportCb(feedback_fn)
   else:
-    has_ok = False
-    for idx, (status, msg) in enumerate(zip(opstatus, result)):
-      if status == constants.OP_STATUS_SUCCESS:
-        has_ok = True
-      elif status == constants.OP_STATUS_ERROR:
-        errors.MaybeRaise(msg)
-        if has_ok:
-          raise errors.OpExecError("partial failure (opcode %d): %s" %
-                                   (idx, msg))
-        else:
-          raise errors.OpExecError(str(msg))
-    # default failure mode
-    raise errors.OpExecError(result)
+    reporter = StdioJobPollReportCb()
+
+  return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
 
 
 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
@@ -1563,9 +1745,12 @@ def GenericInstanceCreate(mode, opts, args):
   elif opts.no_nics:
     # no nics
     nics = []
-  else:
+  elif mode == constants.INSTANCE_CREATE:
     # default of one nic, all auto
     nics = [{}]
+  else:
+    # mode == import
+    nics = []
 
   if opts.disk_template == constants.DT_DISKLESS:
     if opts.disks or opts.sd_size is not None:
@@ -1573,18 +1758,23 @@ def GenericInstanceCreate(mode, opts, args):
                                  " information passed")
     disks = []
   else:
-    if not opts.disks and not opts.sd_size:
+    if (not opts.disks and not opts.sd_size
+        and mode == constants.INSTANCE_CREATE):
       raise errors.OpPrereqError("No disk information specified")
     if opts.disks and opts.sd_size is not None:
       raise errors.OpPrereqError("Please use either the '--disk' or"
                                  " '-s' option")
     if opts.sd_size is not None:
       opts.disks = [(0, {"size": opts.sd_size})]
-    try:
-      disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
-    except ValueError, err:
-      raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
-    disks = [{}] * disk_max
+
+    if opts.disks:
+      try:
+        disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
+      except ValueError, err:
+        raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
+      disks = [{}] * disk_max
+    else:
+      disks = []
     for didx, ddict in opts.disks:
       didx = int(didx)
       if not isinstance(ddict, dict):
@@ -1618,12 +1808,14 @@ def GenericInstanceCreate(mode, opts, args):
     src_node = None
     src_path = None
     no_install = opts.no_install
+    identify_defaults = False
   elif mode == constants.INSTANCE_IMPORT:
     start = False
     os_type = None
     src_node = opts.src_node
     src_path = opts.src_dir
     no_install = None
+    identify_defaults = opts.identify_defaults
   else:
     raise errors.ProgrammerError("Invalid creation mode %s" % mode)
 
@@ -1646,7 +1838,8 @@ def GenericInstanceCreate(mode, opts, args):
                                 os_type=os_type,
                                 src_node=src_node,
                                 src_path=src_path,
-                                no_install=no_install)
+                                no_install=no_install,
+                                identify_defaults=identify_defaults)
 
   SubmitOrSend(op, opts)
   return 0
@@ -2055,11 +2248,18 @@ class JobExecutor(object):
     SetGenericOpcodeOpts(ops, self.opts)
     self.queue.append((name, ops))
 
-  def SubmitPending(self):
+  def SubmitPending(self, each=False):
     """Submit all pending jobs.
 
     """
-    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
+    if each:
+      results = []
+      for row in self.queue:
+        # SubmitJob will remove the success status, but raise an exception if
+        # the submission fails, so we'll notice that anyway.
+        results.append([True, self.cl.SubmitJob(row[1])])
+    else:
+      results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
     for (idx, ((status, data), (name, _))) in enumerate(zip(results,
                                                             self.queue)):
       self.jobs.append((idx, status, data, name))
@@ -2104,7 +2304,7 @@ class JobExecutor(object):
         ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
 
     # first, remove any non-submitted jobs
-    self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
+    self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
     for idx, _, jid, name in failures:
       ToStderr("Failed to submit job for %s: %s", name, jid)
       results.append((idx, False, jid))