Fix a typo in InitCluster
[ganeti-local] / lib / jqueue.py
index dbfc49b..e06c5f8 100644 (file)
@@ -69,7 +69,7 @@ def TimeStampNow():
 
 
 class _QueuedOpCode(object):
 
 
 class _QueuedOpCode(object):
-  """Encasulates an opcode object.
+  """Encapsulates an opcode object.
 
   @ivar log: holds the execution log and consists of tuples
   of the form C{(log_serial, timestamp, level, message)}
 
   @ivar log: holds the execution log and consists of tuples
   of the form C{(log_serial, timestamp, level, message)}
@@ -80,6 +80,10 @@ class _QueuedOpCode(object):
   @ivar stop_timestamp: timestamp for the end of the execution
 
   """
   @ivar stop_timestamp: timestamp for the end of the execution
 
   """
+  __slots__ = ["input", "status", "result", "log",
+               "start_timestamp", "end_timestamp",
+               "__weakref__"]
+
   def __init__(self, op):
     """Constructor for the _QuededOpCode.
 
   def __init__(self, op):
     """Constructor for the _QuededOpCode.
 
@@ -152,6 +156,11 @@ class _QueuedJob(object):
   @ivar change: a Condition variable we use for waiting for job changes
 
   """
   @ivar change: a Condition variable we use for waiting for job changes
 
   """
+  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
+               "received_timestamp", "start_timestamp", "end_timestamp",
+               "change",
+               "__weakref__"]
+
   def __init__(self, queue, job_id, ops):
     """Constructor for the _QueuedJob.
 
   def __init__(self, queue, job_id, ops):
     """Constructor for the _QueuedJob.
 
@@ -286,7 +295,7 @@ class _QueuedJob(object):
     """Selectively returns the log entries.
 
     @type newer_than: None or int
     """Selectively returns the log entries.
 
     @type newer_than: None or int
-    @param newer_than: if this is None, return all log enties,
+    @param newer_than: if this is None, return all log entries,
         otherwise return only the log entries with serial higher
         than this value
     @rtype: list
         otherwise return only the log entries with serial higher
         than this value
     @rtype: list
@@ -304,6 +313,26 @@ class _QueuedJob(object):
 
     return entries
 
 
     return entries
 
+  def MarkUnfinishedOps(self, status, result):
+    """Mark unfinished opcodes with a given status and result.
+
+    This is an utility function for marking all running or waiting to
+    be run opcodes with a given status. Opcodes which are already
+    finalised are not changed.
+
+    @param status: a given opcode status
+    @param result: the opcode result
+
+    """
+    not_marked = True
+    for op in self.ops:
+      if op.status in constants.OPS_FINALIZED:
+        assert not_marked, "Finalized opcodes found after non-finalized ones"
+        continue
+      op.status = status
+      op.result = result
+      not_marked = False
+
 
 class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
 
 class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
@@ -344,7 +373,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
     @param job: the job to be processed
 
     """
     @param job: the job to be processed
 
     """
-    logging.debug("Worker %s processing job %s",
+    logging.info("Worker %s processing job %s",
                   self.worker_id, job.id)
     proc = mcpu.Processor(self.pool.queue.context)
     self.queue = queue = job.queue
                   self.worker_id, job.id)
     proc = mcpu.Processor(self.pool.queue.context)
     self.queue = queue = job.queue
@@ -352,11 +381,24 @@ class _JobQueueWorker(workerpool.BaseWorker):
       try:
         count = len(job.ops)
         for idx, op in enumerate(job.ops):
       try:
         count = len(job.ops)
         for idx, op in enumerate(job.ops):
+          op_summary = op.input.Summary()
+          if op.status == constants.OP_STATUS_SUCCESS:
+            # this is a job that was partially completed before master
+            # daemon shutdown, so it can be expected that some opcodes
+            # are already completed successfully (if any did error
+            # out, then the whole job should have been aborted and not
+            # resubmitted for processing)
+            logging.info("Op %s/%s: opcode %s already processed, skipping",
+                         idx + 1, count, op_summary)
+            continue
           try:
           try:
-            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
+            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
+                         op_summary)
 
             queue.acquire()
             try:
 
             queue.acquire()
             try:
+              if op.status == constants.OP_STATUS_CANCELED:
+                raise CancelJob()
               assert op.status == constants.OP_STATUS_QUEUED
               job.run_op_index = idx
               op.status = constants.OP_STATUS_WAITLOCK
               assert op.status == constants.OP_STATUS_QUEUED
               job.run_op_index = idx
               op.status = constants.OP_STATUS_WAITLOCK
@@ -408,8 +450,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
             finally:
               queue.release()
 
             finally:
               queue.release()
 
-            logging.debug("Op %s/%s: Successfully finished %s",
-                          idx + 1, count, op)
+            logging.info("Op %s/%s: Successfully finished opcode %s",
+                         idx + 1, count, op_summary)
           except CancelJob:
             # Will be handled further up
             raise
           except CancelJob:
             # Will be handled further up
             raise
@@ -420,7 +462,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
                 op.status = constants.OP_STATUS_ERROR
                 op.result = str(err)
                 op.end_timestamp = TimeStampNow()
                 op.status = constants.OP_STATUS_ERROR
                 op.result = str(err)
                 op.end_timestamp = TimeStampNow()
-                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
+                logging.info("Op %s/%s: Error in opcode %s: %s",
+                             idx + 1, count, op_summary, err)
               finally:
                 queue.UpdateJobUnlocked(job)
             finally:
               finally:
                 queue.UpdateJobUnlocked(job)
             finally:
@@ -441,7 +484,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
       queue.acquire()
       try:
         try:
       queue.acquire()
       try:
         try:
-          job.run_op_idx = -1
+          job.run_op_index = -1
           job.end_timestamp = TimeStampNow()
           queue.UpdateJobUnlocked(job)
         finally:
           job.end_timestamp = TimeStampNow()
           queue.UpdateJobUnlocked(job)
         finally:
@@ -449,8 +492,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
           status = job.CalcStatus()
       finally:
         queue.release()
           status = job.CalcStatus()
       finally:
         queue.release()
-      logging.debug("Worker %s finished job %s, status = %s",
-                    self.worker_id, job_id, status)
+      logging.info("Worker %s finished job %s, status = %s",
+                   self.worker_id, job_id, status)
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
@@ -464,7 +507,7 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
 
 
 class JobQueue(object):
 
 
 class JobQueue(object):
-  """Quue used to manaage the jobs.
+  """Queue used to manage the jobs.
 
   @cvar _RE_JOB_FILE: regex matching the valid job file names
 
 
   @cvar _RE_JOB_FILE: regex matching the valid job file names
 
@@ -570,9 +613,8 @@ class JobQueue(object):
                           constants.JOB_STATUS_CANCELING):
             logging.warning("Unfinished job %s found: %s", job.id, job)
             try:
                           constants.JOB_STATUS_CANCELING):
             logging.warning("Unfinished job %s found: %s", job.id, job)
             try:
-              for op in job.ops:
-                op.status = constants.OP_STATUS_ERROR
-                op.result = "Unclean master daemon shutdown"
+              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                    "Unclean master daemon shutdown")
             finally:
               self.UpdateJobUnlocked(job)
 
             finally:
               self.UpdateJobUnlocked(job)
 
@@ -646,7 +688,7 @@ class JobQueue(object):
 
     Since we aim to keep consistency should this node (the current
     master) fail, we will log errors if our rpc fail, and especially
 
     Since we aim to keep consistency should this node (the current
     master) fail, we will log errors if our rpc fail, and especially
-    log the case when more than half of the nodes failes.
+    log the case when more than half of the nodes fails.
 
     @param result: the data as returned from the rpc call
     @type nodes: list
 
     @param result: the data as returned from the rpc call
     @type nodes: list
@@ -703,24 +745,24 @@ class JobQueue(object):
     self._CheckRpcResult(result, self._nodes,
                          "Updating %s" % file_name)
 
     self._CheckRpcResult(result, self._nodes,
                          "Updating %s" % file_name)
 
-  def _RenameFileUnlocked(self, old, new):
+  def _RenameFilesUnlocked(self, rename):
     """Renames a file locally and then replicate the change.
 
     This function will rename a file in the local queue directory
     and then replicate this rename to all the other nodes we have.
 
     """Renames a file locally and then replicate the change.
 
     This function will rename a file in the local queue directory
     and then replicate this rename to all the other nodes we have.
 
-    @type old: str
-    @param old: the current name of the file
-    @type new: str
-    @param new: the new name of the file
+    @type rename: list of (old, new)
+    @param rename: List containing tuples mapping old to new names
 
     """
 
     """
-    utils.RenameFile(old, new, mkdir=True)
+    # Rename them locally
+    for old, new in rename:
+      utils.RenameFile(old, new, mkdir=True)
 
 
+    # ... and on all nodes
     names, addrs = self._GetNodeIp()
     names, addrs = self._GetNodeIp()
-    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
-    self._CheckRpcResult(result, self._nodes,
-                         "Moving %s to %s" % (old, new))
+    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
+    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
   def _FormatJobID(self, job_id):
     """Convert a job ID to string format.
 
   def _FormatJobID(self, job_id):
     """Convert a job ID to string format.
@@ -886,7 +928,7 @@ class JobQueue(object):
       else:
         # non-archived case
         logging.exception("Can't parse job %s, will archive.", job_id)
       else:
         # non-archived case
         logging.exception("Can't parse job %s, will archive.", job_id)
-        self._RenameFileUnlocked(filepath, new_path)
+        self._RenameFilesUnlocked([(filepath, new_path)])
       return None
 
     self._memcache[job_id] = job
       return None
 
     self._memcache[job_id] = job
@@ -929,7 +971,7 @@ class JobQueue(object):
     and in the future we might merge them.
 
     @type drain_flag: boolean
     and in the future we might merge them.
 
     @type drain_flag: boolean
-    @param drain_flag: wheter to set or unset the drain flag
+    @param drain_flag: Whether to set or unset the drain flag
 
     """
     if drain_flag:
 
     """
     if drain_flag:
@@ -938,9 +980,8 @@ class JobQueue(object):
       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
     return True
 
       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
     return True
 
-  @utils.LockedMethod
   @_RequireOpenQueue
   @_RequireOpenQueue
-  def SubmitJob(self, ops):
+  def _SubmitJobUnlocked(self, ops):
     """Create and store a new job.
 
     This enters the job into our job queue and also puts it on the new
     """Create and store a new job.
 
     This enters the job into our job queue and also puts it on the new
@@ -954,7 +995,7 @@ class JobQueue(object):
 
     """
     if self._IsQueueMarkedDrain():
 
     """
     if self._IsQueueMarkedDrain():
-      raise errors.JobQueueDrainError()
+      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
 
     # Check job queue size
     size = len(self._ListJobFiles())
 
     # Check job queue size
     size = len(self._ListJobFiles())
@@ -982,6 +1023,37 @@ class JobQueue(object):
 
     return job.id
 
 
     return job.id
 
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def SubmitJob(self, ops):
+    """Create and store a new job.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    return self._SubmitJobUnlocked(ops)
+
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def SubmitManyJobs(self, jobs):
+    """Create and store multiple jobs.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    results = []
+    for ops in jobs:
+      try:
+        data = self._SubmitJobUnlocked(ops)
+        status = True
+      except errors.GenericError, err:
+        data = str(err)
+        status = False
+      results.append((status, data))
+
+    return results
+
+
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
     """Update a job's on disk storage.
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
     """Update a job's on disk storage.
@@ -1029,6 +1101,10 @@ class JobQueue(object):
 
     """
     logging.debug("Waiting for changes in job %s", job_id)
 
     """
     logging.debug("Waiting for changes in job %s", job_id)
+
+    job_info = None
+    log_entries = None
+
     end_time = time.time() + timeout
     while True:
       delta_time = end_time - time.time()
     end_time = time.time() + timeout
     while True:
       delta_time = end_time - time.time()
@@ -1070,7 +1146,10 @@ class JobQueue(object):
 
     logging.debug("Job %s changed", job_id)
 
 
     logging.debug("Job %s changed", job_id)
 
-    return (job_info, log_entries)
+    if job_info is None and log_entries is None:
+      return None
+    else:
+      return (job_info, log_entries)
 
   @utils.LockedMethod
   @_RequireOpenQueue
 
   @utils.LockedMethod
   @_RequireOpenQueue
@@ -1094,8 +1173,8 @@ class JobQueue(object):
 
     if job_status not in (constants.JOB_STATUS_QUEUED,
                           constants.JOB_STATUS_WAITLOCK):
 
     if job_status not in (constants.JOB_STATUS_QUEUED,
                           constants.JOB_STATUS_WAITLOCK):
-      logging.debug("Job %s is no longer in the queue", job.id)
-      return (False, "Job %s is no longer in the queue" % job.id)
+      logging.debug("Job %s is no longer waiting in the queue", job.id)
+      return (False, "Job %s is no longer waiting in the queue" % job.id)
 
     if job_status == constants.JOB_STATUS_QUEUED:
       self.CancelJobUnlocked(job)
 
     if job_status == constants.JOB_STATUS_QUEUED:
       self.CancelJobUnlocked(job)
@@ -1104,8 +1183,7 @@ class JobQueue(object):
     elif job_status == constants.JOB_STATUS_WAITLOCK:
       # The worker will notice the new status and cancel the job
       try:
     elif job_status == constants.JOB_STATUS_WAITLOCK:
       # The worker will notice the new status and cancel the job
       try:
-        for op in job.ops:
-          op.status = constants.OP_STATUS_CANCELING
+        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
       finally:
         self.UpdateJobUnlocked(job)
       return (True, "Job %s will be canceled" % job.id)
       finally:
         self.UpdateJobUnlocked(job)
       return (True, "Job %s will be canceled" % job.id)
@@ -1116,43 +1194,50 @@ class JobQueue(object):
 
     """
     try:
 
     """
     try:
-      for op in job.ops:
-        op.status = constants.OP_STATUS_ERROR
-        op.result = "Job canceled by request"
+      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                            "Job canceled by request")
     finally:
       self.UpdateJobUnlocked(job)
 
   @_RequireOpenQueue
     finally:
       self.UpdateJobUnlocked(job)
 
   @_RequireOpenQueue
-  def _ArchiveJobUnlocked(self, job):
-    """Archives a job.
+  def _ArchiveJobsUnlocked(self, jobs):
+    """Archives jobs.
 
 
-    @type job: L{_QueuedJob}
-    @param job: Job object
-    @rtype bool
-    @return Whether job was archived
+    @type jobs: list of L{_QueuedJob}
+    @param jobs: Job objects
+    @rtype: int
+    @return: Number of archived jobs
 
     """
 
     """
-    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
-                                constants.JOB_STATUS_SUCCESS,
-                                constants.JOB_STATUS_ERROR):
-      logging.debug("Job %s is not yet done", job.id)
-      return False
+    archive_jobs = []
+    rename_files = []
+    for job in jobs:
+      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
+                                  constants.JOB_STATUS_SUCCESS,
+                                  constants.JOB_STATUS_ERROR):
+        logging.debug("Job %s is not yet done", job.id)
+        continue
 
 
-    old = self._GetJobPath(job.id)
-    new = self._GetArchivedJobPath(job.id)
+      archive_jobs.append(job)
 
 
-    self._RenameFileUnlocked(old, new)
+      old = self._GetJobPath(job.id)
+      new = self._GetArchivedJobPath(job.id)
+      rename_files.append((old, new))
 
 
-    logging.debug("Successfully archived job %s", job.id)
+    # TODO: What if 1..n files fail to rename?
+    self._RenameFilesUnlocked(rename_files)
 
 
-    return True
+    logging.debug("Successfully archived job(s) %s",
+                  ", ".join(job.id for job in archive_jobs))
+
+    return len(archive_jobs)
 
   @utils.LockedMethod
   @_RequireOpenQueue
   def ArchiveJob(self, job_id):
     """Archives a job.
 
 
   @utils.LockedMethod
   @_RequireOpenQueue
   def ArchiveJob(self, job_id):
     """Archives a job.
 
-    This is just a wrapper over L{_ArchiveJobUnlocked}.
+    This is just a wrapper over L{_ArchiveJobsUnlocked}.
 
     @type job_id: string
     @param job_id: Job ID of job to be archived.
 
     @type job_id: string
     @param job_id: Job ID of job to be archived.
@@ -1167,11 +1252,11 @@ class JobQueue(object):
       logging.debug("Job %s not found", job_id)
       return False
 
       logging.debug("Job %s not found", job_id)
       return False
 
-    return self._ArchiveJobUnlocked(job)
+    return self._ArchiveJobsUnlocked([job]) == 1
 
   @utils.LockedMethod
   @_RequireOpenQueue
 
   @utils.LockedMethod
   @_RequireOpenQueue
-  def AutoArchiveJobs(self, age):
+  def AutoArchiveJobs(self, age, timeout):
     """Archives all jobs based on age.
 
     The method will archive all jobs which are older than the age
     """Archives all jobs based on age.
 
     The method will archive all jobs which are older than the age
@@ -1186,22 +1271,44 @@ class JobQueue(object):
     logging.info("Archiving jobs with age more than %s seconds", age)
 
     now = time.time()
     logging.info("Archiving jobs with age more than %s seconds", age)
 
     now = time.time()
-    for job_id in self._GetJobIDsUnlocked(archived=False):
+    end_time = now + timeout
+    archived_count = 0
+    last_touched = 0
+
+    all_job_ids = self._GetJobIDsUnlocked(archived=False)
+    pending = []
+    for idx, job_id in enumerate(all_job_ids):
+      last_touched = idx
+
+      # Not optimal because jobs could be pending
+      # TODO: Measure average duration for job archival and take number of
+      # pending jobs into account.
+      if time.time() > end_time:
+        break
+
       # Returns None if the job failed to load
       job = self._LoadJobUnlocked(job_id)
       # Returns None if the job failed to load
       job = self._LoadJobUnlocked(job_id)
-      if not job:
-        continue
-
-      if job.end_timestamp is None:
-        if job.start_timestamp is None:
-          job_age = job.received_timestamp
+      if job:
+        if job.end_timestamp is None:
+          if job.start_timestamp is None:
+            job_age = job.received_timestamp
+          else:
+            job_age = job.start_timestamp
         else:
         else:
-          job_age = job.start_timestamp
-      else:
-        job_age = job.end_timestamp
+          job_age = job.end_timestamp
+
+        if age == -1 or now - job_age[0] > age:
+          pending.append(job)
+
+          # Archive 10 jobs at a time
+          if len(pending) >= 10:
+            archived_count += self._ArchiveJobsUnlocked(pending)
+            pending = []
+
+    if pending:
+      archived_count += self._ArchiveJobsUnlocked(pending)
 
 
-      if age == -1 or now - job_age[0] > age:
-        self._ArchiveJobUnlocked(job)
+    return (archived_count, len(all_job_ids) - last_touched - 1)
 
   def _GetJobInfoUnlocked(self, job, fields):
     """Returns information about a job.
 
   def _GetJobInfoUnlocked(self, job, fields):
     """Returns information about a job.