+
+ @staticmethod
+ def _GetJobPath(job_id):
+ """Returns the job file for a given job id.
+
+ @type job_id: str
+ @param job_id: the job identifier
+ @rtype: str
+ @return: the path to the job file
+
+ """
+ return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
+
+ @classmethod
+ def _GetArchivedJobPath(cls, job_id):
+ """Returns the archived job file for a give job id.
+
+ @type job_id: str
+ @param job_id: the job identifier
+ @rtype: str
+ @return: the path to the archived job file
+
+ """
+ return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
+ cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
+
+ def _GetJobIDsUnlocked(self, sort=True):
+ """Return all known job IDs.
+
+ The method only looks at disk because it's a requirement that all
+ jobs are present on disk (so in the _memcache we don't have any
+ extra IDs).
+
+ @type sort: boolean
+ @param sort: perform sorting on the returned job ids
+ @rtype: list
+ @return: the list of job IDs
+
+ """
+ jlist = []
+ for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
+ m = self._RE_JOB_FILE.match(filename)
+ if m:
+ jlist.append(m.group(1))
+ if sort:
+ jlist = utils.NiceSort(jlist)
+ return jlist
+
+ def _LoadJobUnlocked(self, job_id):
+ """Loads a job from the disk or memory.
+
+ Given a job id, this will return the cached job object if
+ existing, or try to load the job from the disk. If loading from
+ disk, it will also add the job to the cache.
+
+ @param job_id: the job id
+ @rtype: L{_QueuedJob} or None
+ @return: either None or the job object
+
+ """
+ job = self._memcache.get(job_id, None)
+ if job:
+ logging.debug("Found job %s in memcache", job_id)
+ return job
+
+ try:
+ job = self._LoadJobFromDisk(job_id)
+ if job is None:
+ return job
+ except errors.JobFileCorrupted:
+ old_path = self._GetJobPath(job_id)
+ new_path = self._GetArchivedJobPath(job_id)
+ if old_path == new_path:
+ # job already archived (future case)
+ logging.exception("Can't parse job %s", job_id)
+ else:
+ # non-archived case
+ logging.exception("Can't parse job %s, will archive.", job_id)
+ self._RenameFilesUnlocked([(old_path, new_path)])
+ return None
+
+ self._memcache[job_id] = job
+ logging.debug("Added job %s to the cache", job_id)
+ return job
+
+ def _LoadJobFromDisk(self, job_id):
+ """Load the given job file from disk.
+
+ Given a job file, read, load and restore it in a _QueuedJob format.
+
+ @type job_id: string
+ @param job_id: job identifier
+ @rtype: L{_QueuedJob} or None
+ @return: either None or the job object
+
+ """
+ filepath = self._GetJobPath(job_id)
+ logging.debug("Loading job from %s", filepath)
+ try:
+ raw_data = utils.ReadFile(filepath)
+ except EnvironmentError, err:
+ if err.errno in (errno.ENOENT, ):
+ return None
+ raise
+
+ try:
+ data = serializer.LoadJson(raw_data)
+ job = _QueuedJob.Restore(self, data)
+ except Exception, err: # pylint: disable-msg=W0703
+ raise errors.JobFileCorrupted(err)
+
+ return job
+
+ def SafeLoadJobFromDisk(self, job_id):
+ """Load the given job file from disk.
+
+ Given a job file, read, load and restore it in a _QueuedJob format.
+ In case of error reading the job, it gets returned as None, and the
+ exception is logged.
+
+ @type job_id: string
+ @param job_id: job identifier
+ @rtype: L{_QueuedJob} or None
+ @return: either None or the job object
+
+ """
+ try:
+ return self._LoadJobFromDisk(job_id)
+ except (errors.JobFileCorrupted, EnvironmentError):
+ logging.exception("Can't load/parse job %s", job_id)
+ return None
+
+ def _UpdateQueueSizeUnlocked(self):
+ """Update the queue size.
+
+ """
+ self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
+
+ @locking.ssynchronized(_LOCK)
+ @_RequireOpenQueue
+ def SetDrainFlag(self, drain_flag):
+ """Sets the drain flag for the queue.
+
+ @type drain_flag: boolean
+ @param drain_flag: Whether to set or unset the drain flag
+
+ """
+ jstore.SetDrainFlag(drain_flag)
+
+ self._drained = drain_flag
+
+ return True
+
+ @_RequireOpenQueue
+ def _SubmitJobUnlocked(self, job_id, ops):
+ """Create and store a new job.
+
+ This enters the job into our job queue and also puts it on the new
+ queue, in order for it to be picked up by the queue processors.
+
+ @type job_id: job ID
+ @param job_id: the job ID for the new job
+ @type ops: list
+ @param ops: The list of OpCodes that will become the new job.
+ @rtype: L{_QueuedJob}
+ @return: the job object to be queued
+ @raise errors.JobQueueDrainError: if the job queue is marked for draining
+ @raise errors.JobQueueFull: if the job queue has too many jobs in it
+ @raise errors.GenericError: If an opcode is not valid
+
+ """
+ # Ok when sharing the big job queue lock, as the drain file is created when
+ # the lock is exclusive.
+ if self._drained:
+ raise errors.JobQueueDrainError("Job queue is drained, refusing job")
+
+ if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
+ raise errors.JobQueueFull()
+
+ job = _QueuedJob(self, job_id, ops)
+
+ # Check priority
+ for idx, op in enumerate(job.ops):
+ if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
+ allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
+ raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
+ " are %s" % (idx, op.priority, allowed))
+
+ # Write to disk
+ self.UpdateJobUnlocked(job)
+
+ self._queue_size += 1
+
+ logging.debug("Adding new job %s to the cache", job_id)
+ self._memcache[job_id] = job
+
+ return job
+
+ @locking.ssynchronized(_LOCK)
+ @_RequireOpenQueue
+ def SubmitJob(self, ops):
+ """Create and store a new job.
+
+ @see: L{_SubmitJobUnlocked}
+
+ """
+ job_id = self._NewSerialsUnlocked(1)[0]
+ self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
+ return job_id
+
+ @locking.ssynchronized(_LOCK)
+ @_RequireOpenQueue
+ def SubmitManyJobs(self, jobs):
+ """Create and store multiple jobs.
+
+ @see: L{_SubmitJobUnlocked}
+
+ """
+ results = []
+ added_jobs = []
+ all_job_ids = self._NewSerialsUnlocked(len(jobs))
+ for job_id, ops in zip(all_job_ids, jobs):
+ try:
+ added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
+ status = True
+ data = job_id
+ except errors.GenericError, err:
+ data = ("%s; opcodes %s" %
+ (err, utils.CommaJoin(op.Summary() for op in ops)))
+ status = False
+ results.append((status, data))
+
+ self._EnqueueJobs(added_jobs)
+
+ return results
+
+ def _EnqueueJobs(self, jobs):
+ """Helper function to add jobs to worker pool's queue.
+
+ @type jobs: list
+ @param jobs: List of all jobs
+
+ """
+ self._wpool.AddManyTasks([(job, ) for job in jobs],
+ priority=[job.CalcPriority() for job in jobs])
+
+ @_RequireOpenQueue
+ def UpdateJobUnlocked(self, job, replicate=True):
+ """Update a job's on disk storage.
+
+ After a job has been modified, this function needs to be called in
+ order to write the changes to disk and replicate them to the other
+ nodes.
+
+ @type job: L{_QueuedJob}
+ @param job: the changed job
+ @type replicate: boolean
+ @param replicate: whether to replicate the change to remote nodes
+
+ """
+ filename = self._GetJobPath(job.id)
+ data = serializer.DumpJson(job.Serialize(), indent=False)
+ logging.debug("Writing job %s to %s", job.id, filename)
+ self._UpdateJobQueueFile(filename, data, replicate)
+
+ def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
+ timeout):
+ """Waits for changes in a job.
+
+ @type job_id: string
+ @param job_id: Job identifier
+ @type fields: list of strings
+ @param fields: Which fields to check for changes
+ @type prev_job_info: list or None
+ @param prev_job_info: Last job information returned
+ @type prev_log_serial: int
+ @param prev_log_serial: Last job message serial number
+ @type timeout: float
+ @param timeout: maximum time to wait in seconds
+ @rtype: tuple (job info, log entries)
+ @return: a tuple of the job information as required via
+ the fields parameter, and the log entries as a list
+
+ if the job has not changed and the timeout has expired,
+ we instead return a special value,
+ L{constants.JOB_NOTCHANGED}, which should be interpreted
+ as such by the clients
+
+ """
+ load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
+
+ helper = _WaitForJobChangesHelper()
+
+ return helper(self._GetJobPath(job_id), load_fn,
+ fields, prev_job_info, prev_log_serial, timeout)
+
+ @locking.ssynchronized(_LOCK)
+ @_RequireOpenQueue
+ def CancelJob(self, job_id):
+ """Cancels a job.
+
+ This will only succeed if the job has not started yet.
+
+ @type job_id: string
+ @param job_id: job ID of job to be cancelled.
+
+ """
+ logging.info("Cancelling job %s", job_id)
+
+ job = self._LoadJobUnlocked(job_id)
+ if not job:
+ logging.debug("Job %s not found", job_id)
+ return (False, "Job %s not found" % job_id)
+
+ (success, msg) = job.Cancel()
+
+ if success:
+ self.UpdateJobUnlocked(job)
+
+ return (success, msg)
+
+ @_RequireOpenQueue
+ def _ArchiveJobsUnlocked(self, jobs):
+ """Archives jobs.
+
+ @type jobs: list of L{_QueuedJob}
+ @param jobs: Job objects
+ @rtype: int
+ @return: Number of archived jobs
+
+ """
+ archive_jobs = []
+ rename_files = []
+ for job in jobs:
+ if job.CalcStatus() not in constants.JOBS_FINALIZED:
+ logging.debug("Job %s is not yet done", job.id)
+ continue
+
+ archive_jobs.append(job)
+
+ old = self._GetJobPath(job.id)
+ new = self._GetArchivedJobPath(job.id)
+ rename_files.append((old, new))
+
+ # TODO: What if 1..n files fail to rename?
+ self._RenameFilesUnlocked(rename_files)
+
+ logging.debug("Successfully archived job(s) %s",
+ utils.CommaJoin(job.id for job in archive_jobs))
+
+ # Since we haven't quite checked, above, if we succeeded or failed renaming
+ # the files, we update the cached queue size from the filesystem. When we
+ # get around to fix the TODO: above, we can use the number of actually
+ # archived jobs to fix this.
+ self._UpdateQueueSizeUnlocked()
+ return len(archive_jobs)
+
+ @locking.ssynchronized(_LOCK)
+ @_RequireOpenQueue
+ def ArchiveJob(self, job_id):
+ """Archives a job.
+
+ This is just a wrapper over L{_ArchiveJobsUnlocked}.
+
+ @type job_id: string
+ @param job_id: Job ID of job to be archived.
+ @rtype: bool
+ @return: Whether job was archived
+
+ """
+ logging.info("Archiving job %s", job_id)
+
+ job = self._LoadJobUnlocked(job_id)
+ if not job:
+ logging.debug("Job %s not found", job_id)
+ return False
+
+ return self._ArchiveJobsUnlocked([job]) == 1
+
+ @locking.ssynchronized(_LOCK)
+ @_RequireOpenQueue
+ def AutoArchiveJobs(self, age, timeout):
+ """Archives all jobs based on age.
+
+ The method will archive all jobs which are older than the age
+ parameter. For jobs that don't have an end timestamp, the start
+ timestamp will be considered. The special '-1' age will cause
+ archival of all jobs (that are not running or queued).
+
+ @type age: int
+ @param age: the minimum age in seconds
+
+ """
+ logging.info("Archiving jobs with age more than %s seconds", age)
+
+ now = time.time()
+ end_time = now + timeout
+ archived_count = 0
+ last_touched = 0
+
+ all_job_ids = self._GetJobIDsUnlocked()
+ pending = []
+ for idx, job_id in enumerate(all_job_ids):
+ last_touched = idx + 1
+
+ # Not optimal because jobs could be pending
+ # TODO: Measure average duration for job archival and take number of
+ # pending jobs into account.
+ if time.time() > end_time:
+ break
+
+ # Returns None if the job failed to load
+ job = self._LoadJobUnlocked(job_id)
+ if job:
+ if job.end_timestamp is None:
+ if job.start_timestamp is None:
+ job_age = job.received_timestamp
+ else:
+ job_age = job.start_timestamp
+ else:
+ job_age = job.end_timestamp
+
+ if age == -1 or now - job_age[0] > age:
+ pending.append(job)
+
+ # Archive 10 jobs at a time
+ if len(pending) >= 10:
+ archived_count += self._ArchiveJobsUnlocked(pending)
+ pending = []
+
+ if pending:
+ archived_count += self._ArchiveJobsUnlocked(pending)
+
+ return (archived_count, len(all_job_ids) - last_touched)
+
+ def QueryJobs(self, job_ids, fields):
+ """Returns a list of jobs in queue.
+
+ @type job_ids: list
+ @param job_ids: sequence of job identifiers or None for all
+ @type fields: list
+ @param fields: names of fields to return
+ @rtype: list
+ @return: list one element per job, each element being list with
+ the requested fields
+
+ """
+ jobs = []
+ list_all = False
+ if not job_ids:
+ # Since files are added to/removed from the queue atomically, there's no
+ # risk of getting the job ids in an inconsistent state.
+ job_ids = self._GetJobIDsUnlocked()
+ list_all = True
+
+ for job_id in job_ids:
+ job = self.SafeLoadJobFromDisk(job_id)
+ if job is not None:
+ jobs.append(job.GetInfo(fields))
+ elif not list_all:
+ jobs.append(None)
+
+ return jobs
+
+ @locking.ssynchronized(_LOCK)
+ @_RequireOpenQueue
+ def Shutdown(self):
+ """Stops the job queue.
+
+ This shutdowns all the worker threads an closes the queue.
+
+ """
+ self._wpool.TerminateWorkers()
+
+ self._queue_filelock.Close()
+ self._queue_filelock = None