Remove a TODO
[ganeti-local] / lib / cli.py
index 5df4aae..1932a00 100644 (file)
@@ -37,6 +37,7 @@ from ganeti import luxi
 from ganeti import ssconf
 from ganeti import rpc
 from ganeti import ssh
+from ganeti import compat
 
 from optparse import (OptionParser, TitledHelpFormatter,
                       Option, OptionValueError)
@@ -44,12 +45,14 @@ from optparse import (OptionParser, TitledHelpFormatter,
 
 __all__ = [
   # Command line options
+  "ADD_UIDS_OPT",
   "ALLOCATABLE_OPT",
   "ALL_OPT",
   "AUTO_PROMOTE_OPT",
   "AUTO_REPLACE_OPT",
   "BACKEND_OPT",
   "CLEANUP_OPT",
+  "CLUSTER_DOMAIN_SECRET_OPT",
   "CONFIRM_OPT",
   "CP_SIZE_OPT",
   "DEBUG_OPT",
@@ -71,15 +74,19 @@ __all__ = [
   "HVOPTS_OPT",
   "HYPERVISOR_OPT",
   "IALLOCATOR_OPT",
+  "IDENTIFY_DEFAULTS_OPT",
   "IGNORE_CONSIST_OPT",
   "IGNORE_FAILURES_OPT",
+  "IGNORE_REMOVE_FAILURES_OPT",
   "IGNORE_SECONDARIES_OPT",
   "IGNORE_SIZE_OPT",
   "MAC_PREFIX_OPT",
+  "MAINTAIN_NODE_HEALTH_OPT",
   "MASTER_NETDEV_OPT",
   "MC_OPT",
   "NET_OPT",
   "NEW_CLUSTER_CERT_OPT",
+  "NEW_CLUSTER_DOMAIN_SECRET_OPT",
   "NEW_CONFD_HMAC_KEY_OPT",
   "NEW_RAPI_CERT_OPT",
   "NEW_SECONDARY_OPT",
@@ -109,6 +116,9 @@ __all__ = [
   "RAPI_CERT_OPT",
   "READD_OPT",
   "REBOOT_TYPE_OPT",
+  "REMOVE_INSTANCE_OPT",
+  "REMOVE_UIDS_OPT",
+  "ROMAN_OPT",
   "SECONDARY_IP_OPT",
   "SELECT_OS_OPT",
   "SEP_OPT",
@@ -122,6 +132,7 @@ __all__ = [
   "SYNC_OPT",
   "TAG_SRC_OPT",
   "TIMEOUT_OPT",
+  "UIDPOOL_OPT",
   "USEUNITS_OPT",
   "USE_REPL_NET_OPT",
   "VERBOSE_OPT",
@@ -723,6 +734,18 @@ IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
                                  " configuration even if there are failures"
                                  " during the removal process")
 
+IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
+                                        dest="ignore_remove_failures",
+                                        action="store_true", default=False,
+                                        help="Remove the instance from the"
+                                        " cluster configuration even if there"
+                                        " are failures during the removal"
+                                        " process")
+
+REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
+                                 action="store_true", default=False,
+                                 help="Remove the instance from the cluster")
+
 NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
                                help="Specifies the new secondary node",
                                metavar="NODE", default=None,
@@ -833,7 +856,6 @@ MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
                                metavar="NETDEV",
                                default=constants.DEFAULT_BRIDGE)
 
-
 GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
                                 help="Specify the default directory (cluster-"
                                 "wide) for storing the file-based disks [%s]" %
@@ -907,12 +929,61 @@ NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
                                     help=("Create a new HMAC key for %s" %
                                           constants.CONFD))
 
+CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
+                                       dest="cluster_domain_secret",
+                                       default=None,
+                                       help=("Load new new cluster domain"
+                                             " secret from file"))
+
+NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
+                                           dest="new_cluster_domain_secret",
+                                           default=False, action="store_true",
+                                           help=("Create a new cluster domain"
+                                                 " secret"))
+
 USE_REPL_NET_OPT = cli_option("--use-replication-network",
                               dest="use_replication_network",
                               help="Whether to use the replication network"
                               " for talking to the nodes",
                               action="store_true", default=False)
 
+MAINTAIN_NODE_HEALTH_OPT = \
+    cli_option("--maintain-node-health", dest="maintain_node_health",
+               metavar=_YORNO, default=None, type="bool",
+               help="Configure the cluster to automatically maintain node"
+               " health, by shutting down unknown instances, shutting down"
+               " unknown DRBD devices, etc.")
+
+IDENTIFY_DEFAULTS_OPT = \
+    cli_option("--identify-defaults", dest="identify_defaults",
+               default=False, action="store_true",
+               help="Identify which saved instance parameters are equal to"
+               " the current cluster defaults and set them as such, instead"
+               " of marking them as overridden")
+
+UIDPOOL_OPT = cli_option("--uid-pool", default=None,
+                         action="store", dest="uid_pool",
+                         help=("A list of user-ids or user-id"
+                               " ranges separated by commas"))
+
+ADD_UIDS_OPT = cli_option("--add-uids", default=None,
+                          action="store", dest="add_uids",
+                          help=("A list of user-ids or user-id"
+                                " ranges separated by commas, to be"
+                                " added to the user-id pool"))
+
+REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
+                             action="store", dest="remove_uids",
+                             help=("A list of user-ids or user-id"
+                                   " ranges separated by commas, to be"
+                                   " removed from the user-id pool"))
+
+ROMAN_OPT = cli_option("--roman",
+                       dest="roman_integers", default=False,
+                       action="store_true",
+                       help="Use roman numbers for positive integers")
+
+
 
 def _ParseArgs(argv, commands, aliases):
   """Parser for the command line arguments.
@@ -1186,41 +1257,31 @@ def SendJob(ops, cl=None):
   return job_id
 
 
-def PollJob(job_id, cl=None, feedback_fn=None):
-  """Function to poll for the result of a job.
+def GenericPollJob(job_id, cbs, report_cbs):
+  """Generic job-polling function.
 
-  @type job_id: job identified
-  @param job_id: the job to poll for results
-  @type cl: luxi.Client
-  @param cl: the luxi client to use for communicating with the master;
-             if None, a new client will be created
+  @type job_id: number
+  @param job_id: Job ID
+  @type cbs: Instance of L{JobPollCbBase}
+  @param cbs: Data callbacks
+  @type report_cbs: Instance of L{JobPollReportCbBase}
+  @param report_cbs: Reporting callbacks
 
   """
-  if cl is None:
-    cl = GetClient()
-
   prev_job_info = None
   prev_logmsg_serial = None
 
   status = None
 
-  notified_queued = False
-  notified_waitlock = False
-
   while True:
-    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
-                                     prev_logmsg_serial)
+    result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
+                                      prev_logmsg_serial)
     if not result:
       # job not found, go away!
       raise errors.JobLost("Job with id %s lost" % job_id)
-    elif result == constants.JOB_NOTCHANGED:
-      if status is not None and not callable(feedback_fn):
-        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
-          ToStderr("Job %s is waiting in queue", job_id)
-          notified_queued = True
-        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
-          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
-          notified_waitlock = True
+
+    if result == constants.JOB_NOTCHANGED:
+      report_cbs.ReportNotChanged(job_id, status)
 
       # Wait again
       continue
@@ -1231,12 +1292,9 @@ def PollJob(job_id, cl=None, feedback_fn=None):
 
     if log_entries:
       for log_entry in log_entries:
-        (serial, timestamp, _, message) = log_entry
-        if callable(feedback_fn):
-          feedback_fn(log_entry[1:])
-        else:
-          encoded = utils.SafeEncode(message)
-          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
+        (serial, timestamp, log_type, message) = log_entry
+        report_cbs.ReportLogMessage(job_id, serial, timestamp,
+                                    log_type, message)
         prev_logmsg_serial = max(prev_logmsg_serial, serial)
 
     # TODO: Handle canceled and archived jobs
@@ -1248,30 +1306,189 @@ def PollJob(job_id, cl=None, feedback_fn=None):
 
     prev_job_info = job_info
 
-  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
+  jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
   if not jobs:
     raise errors.JobLost("Job with id %s lost" % job_id)
 
   status, opstatus, result = jobs[0]
+
   if status == constants.JOB_STATUS_SUCCESS:
     return result
-  elif status in (constants.JOB_STATUS_CANCELING,
-                  constants.JOB_STATUS_CANCELED):
+
+  if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
     raise errors.OpExecError("Job was canceled")
+
+  has_ok = False
+  for idx, (status, msg) in enumerate(zip(opstatus, result)):
+    if status == constants.OP_STATUS_SUCCESS:
+      has_ok = True
+    elif status == constants.OP_STATUS_ERROR:
+      errors.MaybeRaise(msg)
+
+      if has_ok:
+        raise errors.OpExecError("partial failure (opcode %d): %s" %
+                                 (idx, msg))
+
+      raise errors.OpExecError(str(msg))
+
+  # default failure mode
+  raise errors.OpExecError(result)
+
+
+class JobPollCbBase:
+  """Base class for L{GenericPollJob} callbacks.
+
+  """
+  def __init__(self):
+    """Initializes this class.
+
+    """
+
+  def WaitForJobChangeOnce(self, job_id, fields,
+                           prev_job_info, prev_log_serial):
+    """Waits for changes on a job.
+
+    """
+    raise NotImplementedError()
+
+  def QueryJobs(self, job_ids, fields):
+    """Returns the selected fields for the selected job IDs.
+
+    @type job_ids: list of numbers
+    @param job_ids: Job IDs
+    @type fields: list of strings
+    @param fields: Fields
+
+    """
+    raise NotImplementedError()
+
+
+class JobPollReportCbBase:
+  """Base class for L{GenericPollJob} reporting callbacks.
+
+  """
+  def __init__(self):
+    """Initializes this class.
+
+    """
+
+  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+    """Handles a log message.
+
+    """
+    raise NotImplementedError()
+
+  def ReportNotChanged(self, job_id, status):
+    """Called for if a job hasn't changed in a while.
+
+    @type job_id: number
+    @param job_id: Job ID
+    @type status: string or None
+    @param status: Job status if available
+
+    """
+    raise NotImplementedError()
+
+
+class _LuxiJobPollCb(JobPollCbBase):
+  def __init__(self, cl):
+    """Initializes this class.
+
+    """
+    JobPollCbBase.__init__(self)
+    self.cl = cl
+
+  def WaitForJobChangeOnce(self, job_id, fields,
+                           prev_job_info, prev_log_serial):
+    """Waits for changes on a job.
+
+    """
+    return self.cl.WaitForJobChangeOnce(job_id, fields,
+                                        prev_job_info, prev_log_serial)
+
+  def QueryJobs(self, job_ids, fields):
+    """Returns the selected fields for the selected job IDs.
+
+    """
+    return self.cl.QueryJobs(job_ids, fields)
+
+
+class FeedbackFnJobPollReportCb(JobPollReportCbBase):
+  def __init__(self, feedback_fn):
+    """Initializes this class.
+
+    """
+    JobPollReportCbBase.__init__(self)
+
+    self.feedback_fn = feedback_fn
+
+    assert callable(feedback_fn)
+
+  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+    """Handles a log message.
+
+    """
+    self.feedback_fn((timestamp, log_type, log_msg))
+
+  def ReportNotChanged(self, job_id, status):
+    """Called if a job hasn't changed in a while.
+
+    """
+    # Ignore
+
+
+class StdioJobPollReportCb(JobPollReportCbBase):
+  def __init__(self):
+    """Initializes this class.
+
+    """
+    JobPollReportCbBase.__init__(self)
+
+    self.notified_queued = False
+    self.notified_waitlock = False
+
+  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+    """Handles a log message.
+
+    """
+    ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
+             utils.SafeEncode(log_msg))
+
+  def ReportNotChanged(self, job_id, status):
+    """Called if a job hasn't changed in a while.
+
+    """
+    if status is None:
+      return
+
+    if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
+      ToStderr("Job %s is waiting in queue", job_id)
+      self.notified_queued = True
+
+    elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
+      ToStderr("Job %s is trying to acquire all necessary locks", job_id)
+      self.notified_waitlock = True
+
+
+def PollJob(job_id, cl=None, feedback_fn=None):
+  """Function to poll for the result of a job.
+
+  @type job_id: job identified
+  @param job_id: the job to poll for results
+  @type cl: luxi.Client
+  @param cl: the luxi client to use for communicating with the master;
+             if None, a new client will be created
+
+  """
+  if cl is None:
+    cl = GetClient()
+
+  if feedback_fn:
+    reporter = FeedbackFnJobPollReportCb(feedback_fn)
   else:
-    has_ok = False
-    for idx, (status, msg) in enumerate(zip(opstatus, result)):
-      if status == constants.OP_STATUS_SUCCESS:
-        has_ok = True
-      elif status == constants.OP_STATUS_ERROR:
-        errors.MaybeRaise(msg)
-        if has_ok:
-          raise errors.OpExecError("partial failure (opcode %d): %s" %
-                                   (idx, msg))
-        else:
-          raise errors.OpExecError(str(msg))
-    # default failure mode
-    raise errors.OpExecError(result)
+    reporter = StdioJobPollReportCb()
+
+  return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
 
 
 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
@@ -1414,8 +1631,6 @@ def FormatError(err):
     obuf.write("Parameter Error: %s" % msg)
   elif isinstance(err, errors.ParameterError):
     obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
-  elif isinstance(err, errors.GenericError):
-    obuf.write("Unhandled Ganeti error: %s" % msg)
   elif isinstance(err, luxi.NoMasterError):
     obuf.write("Cannot communicate with the master daemon.\nIs it running"
                " and listening for connections?")
@@ -1425,6 +1640,8 @@ def FormatError(err):
   elif isinstance(err, luxi.ProtocolError):
     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
                "%s" % msg)
+  elif isinstance(err, errors.GenericError):
+    obuf.write("Unhandled Ganeti error: %s" % msg)
   elif isinstance(err, JobSubmittedException):
     obuf.write("JobID: %s\n" % err.args[0])
     retcode = 0
@@ -1528,9 +1745,12 @@ def GenericInstanceCreate(mode, opts, args):
   elif opts.no_nics:
     # no nics
     nics = []
-  else:
+  elif mode == constants.INSTANCE_CREATE:
     # default of one nic, all auto
     nics = [{}]
+  else:
+    # mode == import
+    nics = []
 
   if opts.disk_template == constants.DT_DISKLESS:
     if opts.disks or opts.sd_size is not None:
@@ -1538,18 +1758,23 @@ def GenericInstanceCreate(mode, opts, args):
                                  " information passed")
     disks = []
   else:
-    if not opts.disks and not opts.sd_size:
+    if (not opts.disks and not opts.sd_size
+        and mode == constants.INSTANCE_CREATE):
       raise errors.OpPrereqError("No disk information specified")
     if opts.disks and opts.sd_size is not None:
       raise errors.OpPrereqError("Please use either the '--disk' or"
                                  " '-s' option")
     if opts.sd_size is not None:
       opts.disks = [(0, {"size": opts.sd_size})]
-    try:
-      disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
-    except ValueError, err:
-      raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
-    disks = [{}] * disk_max
+
+    if opts.disks:
+      try:
+        disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
+      except ValueError, err:
+        raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
+      disks = [{}] * disk_max
+    else:
+      disks = []
     for didx, ddict in opts.disks:
       didx = int(didx)
       if not isinstance(ddict, dict):
@@ -1583,12 +1808,14 @@ def GenericInstanceCreate(mode, opts, args):
     src_node = None
     src_path = None
     no_install = opts.no_install
+    identify_defaults = False
   elif mode == constants.INSTANCE_IMPORT:
     start = False
     os_type = None
     src_node = opts.src_node
     src_path = opts.src_dir
     no_install = None
+    identify_defaults = opts.identify_defaults
   else:
     raise errors.ProgrammerError("Invalid creation mode %s" % mode)
 
@@ -1611,7 +1838,8 @@ def GenericInstanceCreate(mode, opts, args):
                                 os_type=os_type,
                                 src_node=src_node,
                                 src_path=src_path,
-                                no_install=no_install)
+                                no_install=no_install,
+                                identify_defaults=identify_defaults)
 
   SubmitOrSend(op, opts)
   return 0
@@ -2020,11 +2248,18 @@ class JobExecutor(object):
     SetGenericOpcodeOpts(ops, self.opts)
     self.queue.append((name, ops))
 
-  def SubmitPending(self):
+  def SubmitPending(self, each=False):
     """Submit all pending jobs.
 
     """
-    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
+    if each:
+      results = []
+      for row in self.queue:
+        # SubmitJob will remove the success status, but raise an exception if
+        # the submission fails, so we'll notice that anyway.
+        results.append([True, self.cl.SubmitJob(row[1])])
+    else:
+      results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
     for (idx, ((status, data), (name, _))) in enumerate(zip(results,
                                                             self.queue)):
       self.jobs.append((idx, status, data, name))
@@ -2069,7 +2304,7 @@ class JobExecutor(object):
         ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
 
     # first, remove any non-submitted jobs
-    self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
+    self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
     for idx, _, jid, name in failures:
       ToStderr("Failed to submit job for %s: %s", name, jid)
       results.append((idx, False, jid))
@@ -2106,7 +2341,7 @@ class JobExecutor(object):
     else:
       if not self.jobs:
         self.SubmitPending()
-      for status, result, name in self.jobs:
+      for _, status, result, name in self.jobs:
         if status:
           ToStdout("%s: %s", result, name)
         else: