Switch gnt-debug submit-job to JobExecutor
[ganeti-local] / lib / jqueue.py
index d73136f..083031c 100644 (file)
@@ -49,9 +49,10 @@ from ganeti import rpc
 
 
 JOBQUEUE_THREADS = 25
+JOBS_PER_ARCHIVE_DIRECTORY = 10000
 
 
-class CancelJob:
+class CancelJob(Exception):
   """Special exception to cancel a job.
 
   """
@@ -68,7 +69,7 @@ def TimeStampNow():
 
 
 class _QueuedOpCode(object):
-  """Encasulates an opcode object.
+  """Encapsulates an opcode object.
 
   @ivar log: holds the execution log and consists of tuples
   of the form C{(log_serial, timestamp, level, message)}
@@ -79,6 +80,10 @@ class _QueuedOpCode(object):
   @ivar stop_timestamp: timestamp for the end of the execution
 
   """
+  __slots__ = ["input", "status", "result", "log",
+               "start_timestamp", "end_timestamp",
+               "__weakref__"]
+
   def __init__(self, op):
     """Constructor for the _QuededOpCode.
 
@@ -151,6 +156,11 @@ class _QueuedJob(object):
   @ivar change: a Condition variable we use for waiting for job changes
 
   """
+  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
+               "received_timestamp", "start_timestamp", "end_timestamp",
+               "change",
+               "__weakref__"]
+
   def __init__(self, queue, job_id, ops):
     """Constructor for the _QueuedJob.
 
@@ -285,7 +295,7 @@ class _QueuedJob(object):
     """Selectively returns the log entries.
 
     @type newer_than: None or int
-    @param newer_than: if this is None, return all log enties,
+    @param newer_than: if this is None, return all log entries,
         otherwise return only the log entries with serial higher
         than this value
     @rtype: list
@@ -343,7 +353,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
     @param job: the job to be processed
 
     """
-    logging.debug("Worker %s processing job %s",
+    logging.info("Worker %s processing job %s",
                   self.worker_id, job.id)
     proc = mcpu.Processor(self.pool.queue.context)
     self.queue = queue = job.queue
@@ -351,11 +361,24 @@ class _JobQueueWorker(workerpool.BaseWorker):
       try:
         count = len(job.ops)
         for idx, op in enumerate(job.ops):
+          op_summary = op.input.Summary()
+          if op.status == constants.OP_STATUS_SUCCESS:
+            # this is a job that was partially completed before master
+            # daemon shutdown, so it can be expected that some opcodes
+            # are already completed successfully (if any did error
+            # out, then the whole job should have been aborted and not
+            # resubmitted for processing)
+            logging.info("Op %s/%s: opcode %s already processed, skipping",
+                         idx + 1, count, op_summary)
+            continue
           try:
-            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
+            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
+                         op_summary)
 
             queue.acquire()
             try:
+              if op.status == constants.OP_STATUS_CANCELED:
+                raise CancelJob()
               assert op.status == constants.OP_STATUS_QUEUED
               job.run_op_index = idx
               op.status = constants.OP_STATUS_WAITLOCK
@@ -407,8 +430,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
             finally:
               queue.release()
 
-            logging.debug("Op %s/%s: Successfully finished %s",
-                          idx + 1, count, op)
+            logging.info("Op %s/%s: Successfully finished opcode %s",
+                         idx + 1, count, op_summary)
           except CancelJob:
             # Will be handled further up
             raise
@@ -419,7 +442,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
                 op.status = constants.OP_STATUS_ERROR
                 op.result = str(err)
                 op.end_timestamp = TimeStampNow()
-                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
+                logging.info("Op %s/%s: Error in opcode %s: %s",
+                             idx + 1, count, op_summary, err)
               finally:
                 queue.UpdateJobUnlocked(job)
             finally:
@@ -440,7 +464,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
       queue.acquire()
       try:
         try:
-          job.run_op_idx = -1
+          job.run_op_index = -1
           job.end_timestamp = TimeStampNow()
           queue.UpdateJobUnlocked(job)
         finally:
@@ -448,8 +472,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
           status = job.CalcStatus()
       finally:
         queue.release()
-      logging.debug("Worker %s finished job %s, status = %s",
-                    self.worker_id, job_id, status)
+      logging.info("Worker %s finished job %s, status = %s",
+                   self.worker_id, job_id, status)
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
@@ -463,7 +487,7 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
 
 
 class JobQueue(object):
-  """Quue used to manaage the jobs.
+  """Queue used to manage the jobs.
 
   @cvar _RE_JOB_FILE: regex matching the valid job file names
 
@@ -522,7 +546,8 @@ class JobQueue(object):
 
     # Get initial list of nodes
     self._nodes = dict((n.name, n.primary_ip)
-                       for n in self.context.cfg.GetAllNodesInfo().values())
+                       for n in self.context.cfg.GetAllNodesInfo().values()
+                       if n.master_candidate)
 
     # Remove master node
     try:
@@ -542,13 +567,14 @@ class JobQueue(object):
         logging.info("Inspecting job queue")
 
         all_job_ids = self._GetJobIDsUnlocked()
+        jobs_count = len(all_job_ids)
         lastinfo = time.time()
         for idx, job_id in enumerate(all_job_ids):
           # Give an update every 1000 jobs or 10 seconds
-          if idx % 1000 == 0 or time.time() >= (lastinfo + 10.0):
-            jobs_count = len(all_job_ids)
+          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
+              idx == (jobs_count - 1)):
             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
-                         idx, jobs_count, 100.0 * (idx + 1) / jobs_count)
+                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
             lastinfo = time.time()
 
           job = self._LoadJobUnlocked(job_id)
@@ -595,6 +621,12 @@ class JobQueue(object):
     # Clean queue directory on added node
     rpc.RpcRunner.call_jobqueue_purge(node_name)
 
+    if not node.master_candidate:
+      # remove if existing, ignoring errors
+      self._nodes.pop(node_name, None)
+      # and skip the replication of the job ids
+      return
+
     # Upload the whole queue excluding archived jobs
     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
 
@@ -637,7 +669,7 @@ class JobQueue(object):
 
     Since we aim to keep consistency should this node (the current
     master) fail, we will log errors if our rpc fail, and especially
-    log the case when more than half of the nodes failes.
+    log the case when more than half of the nodes fails.
 
     @param result: the data as returned from the rpc call
     @type nodes: list
@@ -694,24 +726,24 @@ class JobQueue(object):
     self._CheckRpcResult(result, self._nodes,
                          "Updating %s" % file_name)
 
-  def _RenameFileUnlocked(self, old, new):
+  def _RenameFilesUnlocked(self, rename):
     """Renames a file locally and then replicate the change.
 
     This function will rename a file in the local queue directory
     and then replicate this rename to all the other nodes we have.
 
-    @type old: str
-    @param old: the current name of the file
-    @type new: str
-    @param new: the new name of the file
+    @type rename: list of (old, new)
+    @param rename: List containing tuples mapping old to new names
 
     """
-    os.rename(old, new)
+    # Rename them locally
+    for old, new in rename:
+      utils.RenameFile(old, new, mkdir=True)
 
+    # ... and on all nodes
     names, addrs = self._GetNodeIp()
-    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
-    self._CheckRpcResult(result, self._nodes,
-                         "Moving %s to %s" % (old, new))
+    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
+    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
   def _FormatJobID(self, job_id):
     """Convert a job ID to string format.
@@ -733,6 +765,18 @@ class JobQueue(object):
 
     return str(job_id)
 
+  @classmethod
+  def _GetArchiveDirectory(cls, job_id):
+    """Returns the archive directory for a job.
+
+    @type job_id: str
+    @param job_id: Job identifier
+    @rtype: str
+    @return: Directory name
+
+    """
+    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
+
   def _NewSerialUnlocked(self):
     """Generates a new job identifier.
 
@@ -766,8 +810,8 @@ class JobQueue(object):
     """
     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
 
-  @staticmethod
-  def _GetArchivedJobPath(job_id):
+  @classmethod
+  def _GetArchivedJobPath(cls, job_id):
     """Returns the archived job file for a give job id.
 
     @type job_id: str
@@ -776,7 +820,8 @@ class JobQueue(object):
     @return: the path to the archived job file
 
     """
-    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
+    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
+    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
 
   @classmethod
   def _ExtractJobID(cls, name):
@@ -864,7 +909,7 @@ class JobQueue(object):
       else:
         # non-archived case
         logging.exception("Can't parse job %s, will archive.", job_id)
-        self._RenameFileUnlocked(filepath, new_path)
+        self._RenameFilesUnlocked([(filepath, new_path)])
       return None
 
     self._memcache[job_id] = job
@@ -907,7 +952,7 @@ class JobQueue(object):
     and in the future we might merge them.
 
     @type drain_flag: boolean
-    @param drain_flag: wheter to set or unset the drain flag
+    @param drain_flag: Whether to set or unset the drain flag
 
     """
     if drain_flag:
@@ -916,9 +961,8 @@ class JobQueue(object):
       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
     return True
 
-  @utils.LockedMethod
   @_RequireOpenQueue
-  def SubmitJob(self, ops):
+  def _SubmitJobUnlocked(self, ops):
     """Create and store a new job.
 
     This enters the job into our job queue and also puts it on the new
@@ -932,7 +976,19 @@ class JobQueue(object):
 
     """
     if self._IsQueueMarkedDrain():
-      raise errors.JobQueueDrainError()
+      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
+
+    # Check job queue size
+    size = len(self._ListJobFiles())
+    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
+      # TODO: Autoarchive jobs. Make sure it's not done on every job
+      # submission, though.
+      #size = ...
+      pass
+
+    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
+      raise errors.JobQueueFull()
+
     # Get job identifier
     job_id = self._NewSerialUnlocked()
     job = _QueuedJob(self, job_id, ops)
@@ -948,6 +1004,37 @@ class JobQueue(object):
 
     return job.id
 
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def SubmitJob(self, ops):
+    """Create and store a new job.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    return self._SubmitJobUnlocked(ops)
+
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def SubmitManyJobs(self, jobs):
+    """Create and store multiple jobs.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    results = []
+    for ops in jobs:
+      try:
+        data = self._SubmitJobUnlocked(ops)
+        status = True
+      except errors.GenericError, err:
+        data = str(err)
+        status = False
+      results.append((status, data))
+
+    return results
+
+
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
     """Update a job's on disk storage.
@@ -1083,55 +1170,69 @@ class JobQueue(object):
     """
     try:
       for op in job.ops:
-        op.status = constants.OP_STATUS_ERROR
+        op.status = constants.OP_STATUS_CANCELED
         op.result = "Job canceled by request"
     finally:
       self.UpdateJobUnlocked(job)
 
   @_RequireOpenQueue
-  def _ArchiveJobUnlocked(self, job_id):
-    """Archives a job.
+  def _ArchiveJobsUnlocked(self, jobs):
+    """Archives jobs.
 
-    @type job_id: string
-    @param job_id: Job ID of job to be archived.
+    @type jobs: list of L{_QueuedJob}
+    @param jobs: Job objects
+    @rtype: int
+    @return: Number of archived jobs
 
     """
-    logging.info("Archiving job %s", job_id)
+    archive_jobs = []
+    rename_files = []
+    for job in jobs:
+      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
+                                  constants.JOB_STATUS_SUCCESS,
+                                  constants.JOB_STATUS_ERROR):
+        logging.debug("Job %s is not yet done", job.id)
+        continue
 
-    job = self._LoadJobUnlocked(job_id)
-    if not job:
-      logging.debug("Job %s not found", job_id)
-      return
+      archive_jobs.append(job)
 
-    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
-                                constants.JOB_STATUS_SUCCESS,
-                                constants.JOB_STATUS_ERROR):
-      logging.debug("Job %s is not yet done", job.id)
-      return
+      old = self._GetJobPath(job.id)
+      new = self._GetArchivedJobPath(job.id)
+      rename_files.append((old, new))
 
-    old = self._GetJobPath(job.id)
-    new = self._GetArchivedJobPath(job.id)
+    # TODO: What if 1..n files fail to rename?
+    self._RenameFilesUnlocked(rename_files)
 
-    self._RenameFileUnlocked(old, new)
+    logging.debug("Successfully archived job(s) %s",
+                  ", ".join(job.id for job in archive_jobs))
 
-    logging.debug("Successfully archived job %s", job.id)
+    return len(archive_jobs)
 
   @utils.LockedMethod
   @_RequireOpenQueue
   def ArchiveJob(self, job_id):
     """Archives a job.
 
-    This is just a wrapper over L{_ArchiveJobUnlocked}.
+    This is just a wrapper over L{_ArchiveJobsUnlocked}.
 
     @type job_id: string
     @param job_id: Job ID of job to be archived.
+    @rtype: bool
+    @return: Whether job was archived
 
     """
-    return self._ArchiveJobUnlocked(job_id)
+    logging.info("Archiving job %s", job_id)
+
+    job = self._LoadJobUnlocked(job_id)
+    if not job:
+      logging.debug("Job %s not found", job_id)
+      return False
+
+    return self._ArchiveJobsUnlocked([job]) == 1
 
   @utils.LockedMethod
   @_RequireOpenQueue
-  def AutoArchiveJobs(self, age):
+  def AutoArchiveJobs(self, age, timeout):
     """Archives all jobs based on age.
 
     The method will archive all jobs which are older than the age
@@ -1146,22 +1247,44 @@ class JobQueue(object):
     logging.info("Archiving jobs with age more than %s seconds", age)
 
     now = time.time()
-    for jid in self._GetJobIDsUnlocked(archived=False):
-      job = self._LoadJobUnlocked(jid)
-      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
-                                  constants.OP_STATUS_ERROR,
-                                  constants.OP_STATUS_CANCELED):
-        continue
-      if job.end_timestamp is None:
-        if job.start_timestamp is None:
-          job_age = job.received_timestamp
+    end_time = now + timeout
+    archived_count = 0
+    last_touched = 0
+
+    all_job_ids = self._GetJobIDsUnlocked(archived=False)
+    pending = []
+    for idx, job_id in enumerate(all_job_ids):
+      last_touched = idx
+
+      # Not optimal because jobs could be pending
+      # TODO: Measure average duration for job archival and take number of
+      # pending jobs into account.
+      if time.time() > end_time:
+        break
+
+      # Returns None if the job failed to load
+      job = self._LoadJobUnlocked(job_id)
+      if job:
+        if job.end_timestamp is None:
+          if job.start_timestamp is None:
+            job_age = job.received_timestamp
+          else:
+            job_age = job.start_timestamp
         else:
-          job_age = job.start_timestamp
-      else:
-        job_age = job.end_timestamp
+          job_age = job.end_timestamp
+
+        if age == -1 or now - job_age[0] > age:
+          pending.append(job)
+
+          # Archive 10 jobs at a time
+          if len(pending) >= 10:
+            archived_count += self._ArchiveJobsUnlocked(pending)
+            pending = []
+
+    if pending:
+      archived_count += self._ArchiveJobsUnlocked(pending)
 
-      if age == -1 or now - job_age[0] > age:
-        self._ArchiveJobUnlocked(jid)
+    return (archived_count, len(all_job_ids) - last_touched - 1)
 
   def _GetJobInfoUnlocked(self, job, fields):
     """Returns information about a job.