Disable synchronous (locking) queries
[ganeti-local] / lib / jqueue.py
index 3734476..3364a93 100644 (file)
@@ -49,6 +49,7 @@ from ganeti import rpc
 
 
 JOBQUEUE_THREADS = 25
 
 
 JOBQUEUE_THREADS = 25
+JOBS_PER_ARCHIVE_DIRECTORY = 10000
 
 
 class CancelJob(Exception):
 
 
 class CancelJob(Exception):
@@ -343,7 +344,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
     @param job: the job to be processed
 
     """
     @param job: the job to be processed
 
     """
-    logging.debug("Worker %s processing job %s",
+    logging.info("Worker %s processing job %s",
                   self.worker_id, job.id)
     proc = mcpu.Processor(self.pool.queue.context)
     self.queue = queue = job.queue
                   self.worker_id, job.id)
     proc = mcpu.Processor(self.pool.queue.context)
     self.queue = queue = job.queue
@@ -351,11 +352,15 @@ class _JobQueueWorker(workerpool.BaseWorker):
       try:
         count = len(job.ops)
         for idx, op in enumerate(job.ops):
       try:
         count = len(job.ops)
         for idx, op in enumerate(job.ops):
+          op_summary = op.input.Summary()
           try:
           try:
-            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
+            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
+                         op_summary)
 
             queue.acquire()
             try:
 
             queue.acquire()
             try:
+              if op.status == constants.OP_STATUS_CANCELED:
+                raise CancelJob()
               assert op.status == constants.OP_STATUS_QUEUED
               job.run_op_index = idx
               op.status = constants.OP_STATUS_WAITLOCK
               assert op.status == constants.OP_STATUS_QUEUED
               job.run_op_index = idx
               op.status = constants.OP_STATUS_WAITLOCK
@@ -407,8 +412,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
             finally:
               queue.release()
 
             finally:
               queue.release()
 
-            logging.debug("Op %s/%s: Successfully finished %s",
-                          idx + 1, count, op)
+            logging.info("Op %s/%s: Successfully finished opcode %s",
+                         idx + 1, count, op_summary)
           except CancelJob:
             # Will be handled further up
             raise
           except CancelJob:
             # Will be handled further up
             raise
@@ -419,7 +424,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
                 op.status = constants.OP_STATUS_ERROR
                 op.result = str(err)
                 op.end_timestamp = TimeStampNow()
                 op.status = constants.OP_STATUS_ERROR
                 op.result = str(err)
                 op.end_timestamp = TimeStampNow()
-                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
+                logging.info("Op %s/%s: Error in opcode %s: %s",
+                             idx + 1, count, op_summary, err)
               finally:
                 queue.UpdateJobUnlocked(job)
             finally:
               finally:
                 queue.UpdateJobUnlocked(job)
             finally:
@@ -448,8 +454,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
           status = job.CalcStatus()
       finally:
         queue.release()
           status = job.CalcStatus()
       finally:
         queue.release()
-      logging.debug("Worker %s finished job %s, status = %s",
-                    self.worker_id, job_id, status)
+      logging.info("Worker %s finished job %s, status = %s",
+                   self.worker_id, job_id, status)
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
@@ -702,24 +708,24 @@ class JobQueue(object):
     self._CheckRpcResult(result, self._nodes,
                          "Updating %s" % file_name)
 
     self._CheckRpcResult(result, self._nodes,
                          "Updating %s" % file_name)
 
-  def _RenameFileUnlocked(self, old, new):
+  def _RenameFilesUnlocked(self, rename):
     """Renames a file locally and then replicate the change.
 
     This function will rename a file in the local queue directory
     and then replicate this rename to all the other nodes we have.
 
     """Renames a file locally and then replicate the change.
 
     This function will rename a file in the local queue directory
     and then replicate this rename to all the other nodes we have.
 
-    @type old: str
-    @param old: the current name of the file
-    @type new: str
-    @param new: the new name of the file
+    @type rename: list of (old, new)
+    @param rename: List containing tuples mapping old to new names
 
     """
 
     """
-    os.rename(old, new)
+    # Rename them locally
+    for old, new in rename:
+      utils.RenameFile(old, new, mkdir=True)
 
 
+    # ... and on all nodes
     names, addrs = self._GetNodeIp()
     names, addrs = self._GetNodeIp()
-    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
-    self._CheckRpcResult(result, self._nodes,
-                         "Moving %s to %s" % (old, new))
+    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
+    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
   def _FormatJobID(self, job_id):
     """Convert a job ID to string format.
 
   def _FormatJobID(self, job_id):
     """Convert a job ID to string format.
@@ -741,6 +747,18 @@ class JobQueue(object):
 
     return str(job_id)
 
 
     return str(job_id)
 
+  @classmethod
+  def _GetArchiveDirectory(cls, job_id):
+    """Returns the archive directory for a job.
+
+    @type job_id: str
+    @param job_id: Job identifier
+    @rtype: str
+    @return: Directory name
+
+    """
+    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
+
   def _NewSerialUnlocked(self):
     """Generates a new job identifier.
 
   def _NewSerialUnlocked(self):
     """Generates a new job identifier.
 
@@ -774,8 +792,8 @@ class JobQueue(object):
     """
     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
 
     """
     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
 
-  @staticmethod
-  def _GetArchivedJobPath(job_id):
+  @classmethod
+  def _GetArchivedJobPath(cls, job_id):
     """Returns the archived job file for a give job id.
 
     @type job_id: str
     """Returns the archived job file for a give job id.
 
     @type job_id: str
@@ -784,7 +802,8 @@ class JobQueue(object):
     @return: the path to the archived job file
 
     """
     @return: the path to the archived job file
 
     """
-    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
+    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
+    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
 
   @classmethod
   def _ExtractJobID(cls, name):
 
   @classmethod
   def _ExtractJobID(cls, name):
@@ -872,7 +891,7 @@ class JobQueue(object):
       else:
         # non-archived case
         logging.exception("Can't parse job %s, will archive.", job_id)
       else:
         # non-archived case
         logging.exception("Can't parse job %s, will archive.", job_id)
-        self._RenameFileUnlocked(filepath, new_path)
+        self._RenameFilesUnlocked([(filepath, new_path)])
       return None
 
     self._memcache[job_id] = job
       return None
 
     self._memcache[job_id] = job
@@ -1103,55 +1122,69 @@ class JobQueue(object):
     """
     try:
       for op in job.ops:
     """
     try:
       for op in job.ops:
-        op.status = constants.OP_STATUS_ERROR
+        op.status = constants.OP_STATUS_CANCELED
         op.result = "Job canceled by request"
     finally:
       self.UpdateJobUnlocked(job)
 
   @_RequireOpenQueue
         op.result = "Job canceled by request"
     finally:
       self.UpdateJobUnlocked(job)
 
   @_RequireOpenQueue
-  def _ArchiveJobUnlocked(self, job_id):
-    """Archives a job.
+  def _ArchiveJobsUnlocked(self, jobs):
+    """Archives jobs.
 
 
-    @type job_id: string
-    @param job_id: the ID of job to be archived
+    @type jobs: list of L{_QueuedJob}
+    @param jobs: Job objects
+    @rtype: int
+    @return: Number of archived jobs
 
     """
 
     """
-    logging.info("Archiving job %s", job_id)
+    archive_jobs = []
+    rename_files = []
+    for job in jobs:
+      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
+                                  constants.JOB_STATUS_SUCCESS,
+                                  constants.JOB_STATUS_ERROR):
+        logging.debug("Job %s is not yet done", job.id)
+        continue
 
 
-    job = self._LoadJobUnlocked(job_id)
-    if not job:
-      logging.debug("Job %s not found", job_id)
-      return
+      archive_jobs.append(job)
 
 
-    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
-                                constants.JOB_STATUS_SUCCESS,
-                                constants.JOB_STATUS_ERROR):
-      logging.debug("Job %s is not yet done", job.id)
-      return
+      old = self._GetJobPath(job.id)
+      new = self._GetArchivedJobPath(job.id)
+      rename_files.append((old, new))
 
 
-    old = self._GetJobPath(job.id)
-    new = self._GetArchivedJobPath(job.id)
+    # TODO: What if 1..n files fail to rename?
+    self._RenameFilesUnlocked(rename_files)
 
 
-    self._RenameFileUnlocked(old, new)
+    logging.debug("Successfully archived job(s) %s",
+                  ", ".join(job.id for job in archive_jobs))
 
 
-    logging.debug("Successfully archived job %s", job.id)
+    return len(archive_jobs)
 
   @utils.LockedMethod
   @_RequireOpenQueue
   def ArchiveJob(self, job_id):
     """Archives a job.
 
 
   @utils.LockedMethod
   @_RequireOpenQueue
   def ArchiveJob(self, job_id):
     """Archives a job.
 
-    This is just a wrapper over L{_ArchiveJobUnlocked}.
+    This is just a wrapper over L{_ArchiveJobsUnlocked}.
 
     @type job_id: string
     @param job_id: Job ID of job to be archived.
 
     @type job_id: string
     @param job_id: Job ID of job to be archived.
+    @rtype: bool
+    @return: Whether job was archived
 
     """
 
     """
-    return self._ArchiveJobUnlocked(job_id)
+    logging.info("Archiving job %s", job_id)
+
+    job = self._LoadJobUnlocked(job_id)
+    if not job:
+      logging.debug("Job %s not found", job_id)
+      return False
+
+    return self._ArchiveJobsUnlocked([job]) == 1
 
   @utils.LockedMethod
   @_RequireOpenQueue
 
   @utils.LockedMethod
   @_RequireOpenQueue
-  def AutoArchiveJobs(self, age):
+  def AutoArchiveJobs(self, age, timeout):
     """Archives all jobs based on age.
 
     The method will archive all jobs which are older than the age
     """Archives all jobs based on age.
 
     The method will archive all jobs which are older than the age
@@ -1166,22 +1199,44 @@ class JobQueue(object):
     logging.info("Archiving jobs with age more than %s seconds", age)
 
     now = time.time()
     logging.info("Archiving jobs with age more than %s seconds", age)
 
     now = time.time()
-    for jid in self._GetJobIDsUnlocked(archived=False):
-      job = self._LoadJobUnlocked(jid)
-      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
-                                  constants.OP_STATUS_ERROR,
-                                  constants.OP_STATUS_CANCELED):
-        continue
-      if job.end_timestamp is None:
-        if job.start_timestamp is None:
-          job_age = job.received_timestamp
+    end_time = now + timeout
+    archived_count = 0
+    last_touched = 0
+
+    all_job_ids = self._GetJobIDsUnlocked(archived=False)
+    pending = []
+    for idx, job_id in enumerate(all_job_ids):
+      last_touched = idx
+
+      # Not optimal because jobs could be pending
+      # TODO: Measure average duration for job archival and take number of
+      # pending jobs into account.
+      if time.time() > end_time:
+        break
+
+      # Returns None if the job failed to load
+      job = self._LoadJobUnlocked(job_id)
+      if job:
+        if job.end_timestamp is None:
+          if job.start_timestamp is None:
+            job_age = job.received_timestamp
+          else:
+            job_age = job.start_timestamp
         else:
         else:
-          job_age = job.start_timestamp
-      else:
-        job_age = job.end_timestamp
+          job_age = job.end_timestamp
+
+        if age == -1 or now - job_age[0] > age:
+          pending.append(job)
+
+          # Archive 10 jobs at a time
+          if len(pending) >= 10:
+            archived_count += self._ArchiveJobsUnlocked(pending)
+            pending = []
+
+    if pending:
+      archived_count += self._ArchiveJobsUnlocked(pending)
 
 
-      if age == -1 or now - job_age[0] > age:
-        self._ArchiveJobUnlocked(jid)
+    return (archived_count, len(all_job_ids) - last_touched - 1)
 
   def _GetJobInfoUnlocked(self, job, fields):
     """Returns information about a job.
 
   def _GetJobInfoUnlocked(self, job, fields):
     """Returns information about a job.