4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
39 # pylint: disable-msg=E0611
40 from pyinotify import pyinotify
44 from ganeti import asyncnotifier
45 from ganeti import constants
46 from ganeti import serializer
47 from ganeti import workerpool
48 from ganeti import locking
49 from ganeti import opcodes
50 from ganeti import errors
51 from ganeti import mcpu
52 from ganeti import utils
53 from ganeti import jstore
54 from ganeti import rpc
55 from ganeti import runtime
56 from ganeti import netutils
57 from ganeti import compat
61 JOBS_PER_ARCHIVE_DIRECTORY = 10000
63 # member lock names to be passed to @ssynchronized decorator
68 class CancelJob(Exception):
69 """Special exception to cancel a job.
75 """Returns the current timestamp.
78 @return: the current time in the (seconds, microseconds) format
81 return utils.SplitTime(time.time())
84 class _QueuedOpCode(object):
85 """Encapsulates an opcode object.
87 @ivar log: holds the execution log and consists of tuples
88 of the form C{(log_serial, timestamp, level, message)}
89 @ivar input: the OpCode we encapsulate
90 @ivar status: the current status
91 @ivar result: the result of the LU execution
92 @ivar start_timestamp: timestamp for the start of the execution
93 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94 @ivar stop_timestamp: timestamp for the end of the execution
97 __slots__ = ["input", "status", "result", "log", "priority",
98 "start_timestamp", "exec_timestamp", "end_timestamp",
101 def __init__(self, op):
102 """Constructor for the _QuededOpCode.
104 @type op: L{opcodes.OpCode}
105 @param op: the opcode we encapsulate
109 self.status = constants.OP_STATUS_QUEUED
112 self.start_timestamp = None
113 self.exec_timestamp = None
114 self.end_timestamp = None
116 # Get initial priority (it might change during the lifetime of this opcode)
117 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
120 def Restore(cls, state):
121 """Restore the _QueuedOpCode from the serialized form.
124 @param state: the serialized state
125 @rtype: _QueuedOpCode
126 @return: a new _QueuedOpCode instance
129 obj = _QueuedOpCode.__new__(cls)
130 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
131 obj.status = state["status"]
132 obj.result = state["result"]
133 obj.log = state["log"]
134 obj.start_timestamp = state.get("start_timestamp", None)
135 obj.exec_timestamp = state.get("exec_timestamp", None)
136 obj.end_timestamp = state.get("end_timestamp", None)
137 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
141 """Serializes this _QueuedOpCode.
144 @return: the dictionary holding the serialized state
148 "input": self.input.__getstate__(),
149 "status": self.status,
150 "result": self.result,
152 "start_timestamp": self.start_timestamp,
153 "exec_timestamp": self.exec_timestamp,
154 "end_timestamp": self.end_timestamp,
155 "priority": self.priority,
159 class _QueuedJob(object):
160 """In-memory job representation.
162 This is what we use to track the user-submitted jobs. Locking must
163 be taken care of by users of this class.
165 @type queue: L{JobQueue}
166 @ivar queue: the parent queue
169 @ivar ops: the list of _QueuedOpCode that constitute the job
170 @type log_serial: int
171 @ivar log_serial: holds the index for the next log entry
172 @ivar received_timestamp: the timestamp for when the job was received
173 @ivar start_timestmap: the timestamp for start of execution
174 @ivar end_timestamp: the timestamp for end of execution
177 # pylint: disable-msg=W0212
178 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
179 "received_timestamp", "start_timestamp", "end_timestamp",
182 def __init__(self, queue, job_id, ops):
183 """Constructor for the _QueuedJob.
185 @type queue: L{JobQueue}
186 @param queue: our parent queue
188 @param job_id: our job id
190 @param ops: the list of opcodes we hold, which will be encapsulated
195 raise errors.GenericError("A job needs at least one opcode")
199 self.ops = [_QueuedOpCode(op) for op in ops]
201 self.received_timestamp = TimeStampNow()
202 self.start_timestamp = None
203 self.end_timestamp = None
205 self._InitInMemory(self)
208 def _InitInMemory(obj):
209 """Initializes in-memory variables.
216 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
218 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
220 return "<%s at %#x>" % (" ".join(status), id(self))
223 def Restore(cls, queue, state):
224 """Restore a _QueuedJob from serialized state:
226 @type queue: L{JobQueue}
227 @param queue: to which queue the restored job belongs
229 @param state: the serialized state
231 @return: the restored _JobQueue instance
234 obj = _QueuedJob.__new__(cls)
237 obj.received_timestamp = state.get("received_timestamp", None)
238 obj.start_timestamp = state.get("start_timestamp", None)
239 obj.end_timestamp = state.get("end_timestamp", None)
243 for op_state in state["ops"]:
244 op = _QueuedOpCode.Restore(op_state)
245 for log_entry in op.log:
246 obj.log_serial = max(obj.log_serial, log_entry[0])
249 cls._InitInMemory(obj)
254 """Serialize the _JobQueue instance.
257 @return: the serialized state
262 "ops": [op.Serialize() for op in self.ops],
263 "start_timestamp": self.start_timestamp,
264 "end_timestamp": self.end_timestamp,
265 "received_timestamp": self.received_timestamp,
268 def CalcStatus(self):
269 """Compute the status of this job.
271 This function iterates over all the _QueuedOpCodes in the job and
272 based on their status, computes the job status.
275 - if we find a cancelled, or finished with error, the job
276 status will be the same
277 - otherwise, the last opcode with the status one of:
282 will determine the job status
284 - otherwise, it means either all opcodes are queued, or success,
285 and the job status will be the same
287 @return: the job status
290 status = constants.JOB_STATUS_QUEUED
294 if op.status == constants.OP_STATUS_SUCCESS:
299 if op.status == constants.OP_STATUS_QUEUED:
301 elif op.status == constants.OP_STATUS_WAITLOCK:
302 status = constants.JOB_STATUS_WAITLOCK
303 elif op.status == constants.OP_STATUS_RUNNING:
304 status = constants.JOB_STATUS_RUNNING
305 elif op.status == constants.OP_STATUS_CANCELING:
306 status = constants.JOB_STATUS_CANCELING
308 elif op.status == constants.OP_STATUS_ERROR:
309 status = constants.JOB_STATUS_ERROR
310 # The whole job fails if one opcode failed
312 elif op.status == constants.OP_STATUS_CANCELED:
313 status = constants.OP_STATUS_CANCELED
317 status = constants.JOB_STATUS_SUCCESS
321 def CalcPriority(self):
322 """Gets the current priority for this job.
324 Only unfinished opcodes are considered. When all are done, the default
330 priorities = [op.priority for op in self.ops
331 if op.status not in constants.OPS_FINALIZED]
334 # All opcodes are done, assume default priority
335 return constants.OP_PRIO_DEFAULT
337 return min(priorities)
339 def GetLogEntries(self, newer_than):
340 """Selectively returns the log entries.
342 @type newer_than: None or int
343 @param newer_than: if this is None, return all log entries,
344 otherwise return only the log entries with serial higher
347 @return: the list of the log entries selected
350 if newer_than is None:
357 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
361 def GetInfo(self, fields):
362 """Returns information about a job.
365 @param fields: names of fields to return
367 @return: list with one element for each field
368 @raise errors.OpExecError: when an invalid field
376 elif fname == "status":
377 row.append(self.CalcStatus())
378 elif fname == "priority":
379 row.append(self.CalcPriority())
381 row.append([op.input.__getstate__() for op in self.ops])
382 elif fname == "opresult":
383 row.append([op.result for op in self.ops])
384 elif fname == "opstatus":
385 row.append([op.status for op in self.ops])
386 elif fname == "oplog":
387 row.append([op.log for op in self.ops])
388 elif fname == "opstart":
389 row.append([op.start_timestamp for op in self.ops])
390 elif fname == "opexec":
391 row.append([op.exec_timestamp for op in self.ops])
392 elif fname == "opend":
393 row.append([op.end_timestamp for op in self.ops])
394 elif fname == "oppriority":
395 row.append([op.priority for op in self.ops])
396 elif fname == "received_ts":
397 row.append(self.received_timestamp)
398 elif fname == "start_ts":
399 row.append(self.start_timestamp)
400 elif fname == "end_ts":
401 row.append(self.end_timestamp)
402 elif fname == "summary":
403 row.append([op.input.Summary() for op in self.ops])
405 raise errors.OpExecError("Invalid self query field '%s'" % fname)
408 def MarkUnfinishedOps(self, status, result):
409 """Mark unfinished opcodes with a given status and result.
411 This is an utility function for marking all running or waiting to
412 be run opcodes with a given status. Opcodes which are already
413 finalised are not changed.
415 @param status: a given opcode status
416 @param result: the opcode result
421 if op.status in constants.OPS_FINALIZED:
422 assert not_marked, "Finalized opcodes found after non-finalized ones"
429 """Marks the job as finalized.
432 self.end_timestamp = TimeStampNow()
435 """Marks job as canceled/-ing if possible.
437 @rtype: tuple; (bool, string)
438 @return: Boolean describing whether job was successfully canceled or marked
439 as canceling and a text message
442 status = self.CalcStatus()
444 if status == constants.JOB_STATUS_QUEUED:
445 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
446 "Job canceled by request")
448 return (True, "Job %s canceled" % self.id)
450 elif status == constants.JOB_STATUS_WAITLOCK:
451 # The worker will notice the new status and cancel the job
452 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
453 return (True, "Job %s will be canceled" % self.id)
456 logging.debug("Job %s is no longer waiting in the queue", self.id)
457 return (False, "Job %s is no longer waiting in the queue" % self.id)
460 class _OpExecCallbacks(mcpu.OpExecCbBase):
461 def __init__(self, queue, job, op):
462 """Initializes this class.
464 @type queue: L{JobQueue}
465 @param queue: Job queue
466 @type job: L{_QueuedJob}
467 @param job: Job object
468 @type op: L{_QueuedOpCode}
472 assert queue, "Queue is missing"
473 assert job, "Job is missing"
474 assert op, "Opcode is missing"
480 def _CheckCancel(self):
481 """Raises an exception to cancel the job if asked to.
484 # Cancel here if we were asked to
485 if self._op.status == constants.OP_STATUS_CANCELING:
486 logging.debug("Canceling opcode")
489 @locking.ssynchronized(_QUEUE, shared=1)
490 def NotifyStart(self):
491 """Mark the opcode as running, not lock-waiting.
493 This is called from the mcpu code as a notifier function, when the LU is
494 finally about to start the Exec() method. Of course, to have end-user
495 visible results, the opcode must be initially (before calling into
496 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
499 assert self._op in self._job.ops
500 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
501 constants.OP_STATUS_CANCELING)
503 # Cancel here if we were asked to
506 logging.debug("Opcode is now running")
508 self._op.status = constants.OP_STATUS_RUNNING
509 self._op.exec_timestamp = TimeStampNow()
511 # And finally replicate the job status
512 self._queue.UpdateJobUnlocked(self._job)
514 @locking.ssynchronized(_QUEUE, shared=1)
515 def _AppendFeedback(self, timestamp, log_type, log_msg):
516 """Internal feedback append function, with locks
519 self._job.log_serial += 1
520 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
521 self._queue.UpdateJobUnlocked(self._job, replicate=False)
523 def Feedback(self, *args):
524 """Append a log entry.
530 log_type = constants.ELOG_MESSAGE
533 (log_type, log_msg) = args
535 # The time is split to make serialization easier and not lose
537 timestamp = utils.SplitTime(time.time())
538 self._AppendFeedback(timestamp, log_type, log_msg)
540 def CheckCancel(self):
541 """Check whether job has been cancelled.
544 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
545 constants.OP_STATUS_CANCELING)
547 # Cancel here if we were asked to
550 def SubmitManyJobs(self, jobs):
551 """Submits jobs for processing.
553 See L{JobQueue.SubmitManyJobs}.
556 # Locking is done in job queue
557 return self._queue.SubmitManyJobs(jobs)
560 class _JobChangesChecker(object):
561 def __init__(self, fields, prev_job_info, prev_log_serial):
562 """Initializes this class.
564 @type fields: list of strings
565 @param fields: Fields requested by LUXI client
566 @type prev_job_info: string
567 @param prev_job_info: previous job info, as passed by the LUXI client
568 @type prev_log_serial: string
569 @param prev_log_serial: previous job serial, as passed by the LUXI client
572 self._fields = fields
573 self._prev_job_info = prev_job_info
574 self._prev_log_serial = prev_log_serial
576 def __call__(self, job):
577 """Checks whether job has changed.
579 @type job: L{_QueuedJob}
580 @param job: Job object
583 status = job.CalcStatus()
584 job_info = job.GetInfo(self._fields)
585 log_entries = job.GetLogEntries(self._prev_log_serial)
587 # Serializing and deserializing data can cause type changes (e.g. from
588 # tuple to list) or precision loss. We're doing it here so that we get
589 # the same modifications as the data received from the client. Without
590 # this, the comparison afterwards might fail without the data being
591 # significantly different.
592 # TODO: we just deserialized from disk, investigate how to make sure that
593 # the job info and log entries are compatible to avoid this further step.
594 # TODO: Doing something like in testutils.py:UnifyValueType might be more
595 # efficient, though floats will be tricky
596 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
597 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
599 # Don't even try to wait if the job is no longer running, there will be
601 if (status not in (constants.JOB_STATUS_QUEUED,
602 constants.JOB_STATUS_RUNNING,
603 constants.JOB_STATUS_WAITLOCK) or
604 job_info != self._prev_job_info or
605 (log_entries and self._prev_log_serial != log_entries[0][0])):
606 logging.debug("Job %s changed", job.id)
607 return (job_info, log_entries)
612 class _JobFileChangesWaiter(object):
613 def __init__(self, filename):
614 """Initializes this class.
616 @type filename: string
617 @param filename: Path to job file
618 @raises errors.InotifyError: if the notifier cannot be setup
621 self._wm = pyinotify.WatchManager()
622 self._inotify_handler = \
623 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
625 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
627 self._inotify_handler.enable()
629 # pyinotify doesn't close file descriptors automatically
630 self._notifier.stop()
633 def _OnInotify(self, notifier_enabled):
634 """Callback for inotify.
637 if not notifier_enabled:
638 self._inotify_handler.enable()
640 def Wait(self, timeout):
641 """Waits for the job file to change.
644 @param timeout: Timeout in seconds
645 @return: Whether there have been events
649 have_events = self._notifier.check_events(timeout * 1000)
651 self._notifier.read_events()
652 self._notifier.process_events()
656 """Closes underlying notifier and its file descriptor.
659 self._notifier.stop()
662 class _JobChangesWaiter(object):
663 def __init__(self, filename):
664 """Initializes this class.
666 @type filename: string
667 @param filename: Path to job file
670 self._filewaiter = None
671 self._filename = filename
673 def Wait(self, timeout):
674 """Waits for a job to change.
677 @param timeout: Timeout in seconds
678 @return: Whether there have been events
682 return self._filewaiter.Wait(timeout)
684 # Lazy setup: Avoid inotify setup cost when job file has already changed.
685 # If this point is reached, return immediately and let caller check the job
686 # file again in case there were changes since the last check. This avoids a
688 self._filewaiter = _JobFileChangesWaiter(self._filename)
693 """Closes underlying waiter.
697 self._filewaiter.Close()
700 class _WaitForJobChangesHelper(object):
701 """Helper class using inotify to wait for changes in a job file.
703 This class takes a previous job status and serial, and alerts the client when
704 the current job status has changed.
708 def _CheckForChanges(job_load_fn, check_fn):
711 raise errors.JobLost()
713 result = check_fn(job)
715 raise utils.RetryAgain()
719 def __call__(self, filename, job_load_fn,
720 fields, prev_job_info, prev_log_serial, timeout):
721 """Waits for changes on a job.
723 @type filename: string
724 @param filename: File on which to wait for changes
725 @type job_load_fn: callable
726 @param job_load_fn: Function to load job
727 @type fields: list of strings
728 @param fields: Which fields to check for changes
729 @type prev_job_info: list or None
730 @param prev_job_info: Last job information returned
731 @type prev_log_serial: int
732 @param prev_log_serial: Last job message serial number
734 @param timeout: maximum time to wait in seconds
738 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
739 waiter = _JobChangesWaiter(filename)
741 return utils.Retry(compat.partial(self._CheckForChanges,
742 job_load_fn, check_fn),
743 utils.RETRY_REMAINING_TIME, timeout,
747 except (errors.InotifyError, errors.JobLost):
749 except utils.RetryTimeout:
750 return constants.JOB_NOTCHANGED
753 def _EncodeOpError(err):
754 """Encodes an error which occurred while processing an opcode.
757 if isinstance(err, errors.GenericError):
760 to_encode = errors.OpExecError(str(err))
762 return errors.EncodeException(to_encode)
765 class _TimeoutStrategyWrapper:
766 def __init__(self, fn):
767 """Initializes this class.
774 """Gets the next timeout if necessary.
777 if self._next is None:
778 self._next = self._fn()
781 """Returns the next timeout.
788 """Returns the current timeout and advances the internal state.
797 class _OpExecContext:
798 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
799 """Initializes this class.
804 self.log_prefix = log_prefix
805 self.summary = op.input.Summary()
807 self._timeout_strategy_factory = timeout_strategy_factory
808 self._ResetTimeoutStrategy()
810 def _ResetTimeoutStrategy(self):
811 """Creates a new timeout strategy.
814 self._timeout_strategy = \
815 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
817 def CheckPriorityIncrease(self):
818 """Checks whether priority can and should be increased.
820 Called when locks couldn't be acquired.
825 # Exhausted all retries and next round should not use blocking acquire
827 if (self._timeout_strategy.Peek() is None and
828 op.priority > constants.OP_PRIO_HIGHEST):
829 logging.debug("Increasing priority")
831 self._ResetTimeoutStrategy()
836 def GetNextLockTimeout(self):
837 """Returns the next lock acquire timeout.
840 return self._timeout_strategy.Next()
843 class _JobProcessor(object):
844 def __init__(self, queue, opexec_fn, job,
845 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
846 """Initializes this class.
850 self.opexec_fn = opexec_fn
852 self._timeout_strategy_factory = _timeout_strategy_factory
855 def _FindNextOpcode(job, timeout_strategy_factory):
856 """Locates the next opcode to run.
858 @type job: L{_QueuedJob}
859 @param job: Job object
860 @param timeout_strategy_factory: Callable to create new timeout strategy
863 # Create some sort of a cache to speed up locating next opcode for future
865 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
866 # pending and one for processed ops.
867 if job.ops_iter is None:
868 job.ops_iter = enumerate(job.ops)
870 # Find next opcode to run
873 (idx, op) = job.ops_iter.next()
874 except StopIteration:
875 raise errors.ProgrammerError("Called for a finished job")
877 if op.status == constants.OP_STATUS_RUNNING:
878 # Found an opcode already marked as running
879 raise errors.ProgrammerError("Called for job marked as running")
881 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
882 timeout_strategy_factory)
884 if op.status not in constants.OPS_FINALIZED:
887 # This is a job that was partially completed before master daemon
888 # shutdown, so it can be expected that some opcodes are already
889 # completed successfully (if any did error out, then the whole job
890 # should have been aborted and not resubmitted for processing).
891 logging.info("%s: opcode %s already processed, skipping",
892 opctx.log_prefix, opctx.summary)
895 def _MarkWaitlock(job, op):
896 """Marks an opcode as waiting for locks.
898 The job's start timestamp is also set if necessary.
900 @type job: L{_QueuedJob}
901 @param job: Job object
902 @type op: L{_QueuedOpCode}
903 @param op: Opcode object
907 assert op.status in (constants.OP_STATUS_QUEUED,
908 constants.OP_STATUS_WAITLOCK)
914 if op.status == constants.OP_STATUS_QUEUED:
915 op.status = constants.OP_STATUS_WAITLOCK
918 if op.start_timestamp is None:
919 op.start_timestamp = TimeStampNow()
922 if job.start_timestamp is None:
923 job.start_timestamp = op.start_timestamp
926 assert op.status == constants.OP_STATUS_WAITLOCK
930 def _ExecOpCodeUnlocked(self, opctx):
931 """Processes one opcode and returns the result.
936 assert op.status == constants.OP_STATUS_WAITLOCK
938 timeout = opctx.GetNextLockTimeout()
941 # Make sure not to hold queue lock while calling ExecOpCode
942 result = self.opexec_fn(op.input,
943 _OpExecCallbacks(self.queue, self.job, op),
944 timeout=timeout, priority=op.priority)
945 except mcpu.LockAcquireTimeout:
946 assert timeout is not None, "Received timeout for blocking acquire"
947 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
949 assert op.status in (constants.OP_STATUS_WAITLOCK,
950 constants.OP_STATUS_CANCELING)
952 # Was job cancelled while we were waiting for the lock?
953 if op.status == constants.OP_STATUS_CANCELING:
954 return (constants.OP_STATUS_CANCELING, None)
956 # Stay in waitlock while trying to re-acquire lock
957 return (constants.OP_STATUS_WAITLOCK, None)
959 logging.exception("%s: Canceling job", opctx.log_prefix)
960 assert op.status == constants.OP_STATUS_CANCELING
961 return (constants.OP_STATUS_CANCELING, None)
962 except Exception, err: # pylint: disable-msg=W0703
963 logging.exception("%s: Caught exception in %s",
964 opctx.log_prefix, opctx.summary)
965 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
967 logging.debug("%s: %s successful",
968 opctx.log_prefix, opctx.summary)
969 return (constants.OP_STATUS_SUCCESS, result)
971 def __call__(self, _nextop_fn=None):
972 """Continues execution of a job.
974 @param _nextop_fn: Callback function for tests
976 @return: True if job is finished, False if processor needs to be called
983 logging.debug("Processing job %s", job.id)
985 queue.acquire(shared=1)
987 opcount = len(job.ops)
989 # Don't do anything for finalized jobs
990 if job.CalcStatus() in constants.JOBS_FINALIZED:
993 # Is a previous opcode still pending?
995 opctx = job.cur_opctx
998 if __debug__ and _nextop_fn:
1000 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1005 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1006 constants.OP_STATUS_CANCELING)
1007 for i in job.ops[opctx.index + 1:])
1009 assert op.status in (constants.OP_STATUS_QUEUED,
1010 constants.OP_STATUS_WAITLOCK,
1011 constants.OP_STATUS_CANCELING)
1013 assert (op.priority <= constants.OP_PRIO_LOWEST and
1014 op.priority >= constants.OP_PRIO_HIGHEST)
1016 if op.status != constants.OP_STATUS_CANCELING:
1017 assert op.status in (constants.OP_STATUS_QUEUED,
1018 constants.OP_STATUS_WAITLOCK)
1020 # Prepare to start opcode
1021 if self._MarkWaitlock(job, op):
1023 queue.UpdateJobUnlocked(job)
1025 assert op.status == constants.OP_STATUS_WAITLOCK
1026 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1027 assert job.start_timestamp and op.start_timestamp
1029 logging.info("%s: opcode %s waiting for locks",
1030 opctx.log_prefix, opctx.summary)
1034 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1036 queue.acquire(shared=1)
1038 op.status = op_status
1039 op.result = op_result
1041 if op.status == constants.OP_STATUS_WAITLOCK:
1042 # Couldn't get locks in time
1043 assert not op.end_timestamp
1046 op.end_timestamp = TimeStampNow()
1048 if op.status == constants.OP_STATUS_CANCELING:
1049 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1050 for i in job.ops[opctx.index:])
1052 assert op.status in constants.OPS_FINALIZED
1054 if op.status == constants.OP_STATUS_WAITLOCK:
1057 if opctx.CheckPriorityIncrease():
1058 # Priority was changed, need to update on-disk file
1059 queue.UpdateJobUnlocked(job)
1061 # Keep around for another round
1062 job.cur_opctx = opctx
1064 assert (op.priority <= constants.OP_PRIO_LOWEST and
1065 op.priority >= constants.OP_PRIO_HIGHEST)
1067 # In no case must the status be finalized here
1068 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1071 # Ensure all opcodes so far have been successful
1072 assert (opctx.index == 0 or
1073 compat.all(i.status == constants.OP_STATUS_SUCCESS
1074 for i in job.ops[:opctx.index]))
1077 job.cur_opctx = None
1079 if op.status == constants.OP_STATUS_SUCCESS:
1082 elif op.status == constants.OP_STATUS_ERROR:
1083 # Ensure failed opcode has an exception as its result
1084 assert errors.GetEncodedError(job.ops[opctx.index].result)
1086 to_encode = errors.OpExecError("Preceding opcode failed")
1087 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1088 _EncodeOpError(to_encode))
1092 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1093 errors.GetEncodedError(i.result)
1094 for i in job.ops[opctx.index:])
1096 elif op.status == constants.OP_STATUS_CANCELING:
1097 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1098 "Job canceled by request")
1102 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1104 if opctx.index == (opcount - 1):
1105 # Finalize on last opcode
1109 # All opcodes have been run, finalize job
1112 # Write to disk. If the job status is final, this is the final write
1113 # allowed. Once the file has been written, it can be archived anytime.
1114 queue.UpdateJobUnlocked(job)
1117 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1125 class _JobQueueWorker(workerpool.BaseWorker):
1126 """The actual job workers.
1129 def RunTask(self, job): # pylint: disable-msg=W0221
1132 This functions processes a job. It is closely tied to the L{_QueuedJob} and
1133 L{_QueuedOpCode} classes.
1135 @type job: L{_QueuedJob}
1136 @param job: the job to be processed
1140 assert queue == self.pool.queue
1142 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1145 proc = mcpu.Processor(queue.context, job.id)
1147 # Create wrapper for setting thread name
1148 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1151 if not _JobProcessor(queue, wrap_execop_fn, job)():
1153 raise workerpool.DeferTask(priority=job.CalcPriority())
1156 def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1157 """Updates the worker thread name to include a short summary of the opcode.
1159 @param setname_fn: Callable setting worker thread name
1160 @param execop_fn: Callable for executing opcode (usually
1161 L{mcpu.Processor.ExecOpCode})
1166 return execop_fn(op, *args, **kwargs)
1171 def _GetWorkerName(job, op):
1172 """Sets the worker thread name.
1174 @type job: L{_QueuedJob}
1175 @type op: L{opcodes.OpCode}
1178 parts = ["Job%s" % job.id]
1181 parts.append(op.TinySummary())
1183 return "/".join(parts)
1186 class _JobQueueWorkerPool(workerpool.WorkerPool):
1187 """Simple class implementing a job-processing workerpool.
1190 def __init__(self, queue):
1191 super(_JobQueueWorkerPool, self).__init__("Jq",
1197 def _RequireOpenQueue(fn):
1198 """Decorator for "public" functions.
1200 This function should be used for all 'public' functions. That is,
1201 functions usually called from other classes. Note that this should
1202 be applied only to methods (not plain functions), since it expects
1203 that the decorated function is called with a first argument that has
1204 a '_queue_filelock' argument.
1206 @warning: Use this decorator only after locking.ssynchronized
1209 @locking.ssynchronized(_LOCK)
1215 def wrapper(self, *args, **kwargs):
1216 # pylint: disable-msg=W0212
1217 assert self._queue_filelock is not None, "Queue should be open"
1218 return fn(self, *args, **kwargs)
1222 class JobQueue(object):
1223 """Queue used to manage the jobs.
1225 @cvar _RE_JOB_FILE: regex matching the valid job file names
1228 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1230 def __init__(self, context):
1231 """Constructor for JobQueue.
1233 The constructor will initialize the job queue object and then
1234 start loading the current jobs from disk, either for starting them
1235 (if they were queue) or for aborting them (if they were already
1238 @type context: GanetiContext
1239 @param context: the context object for access to the configuration
1240 data and other ganeti objects
1243 self.context = context
1244 self._memcache = weakref.WeakValueDictionary()
1245 self._my_hostname = netutils.Hostname.GetSysName()
1247 # The Big JobQueue lock. If a code block or method acquires it in shared
1248 # mode safe it must guarantee concurrency with all the code acquiring it in
1249 # shared mode, including itself. In order not to acquire it at all
1250 # concurrency must be guaranteed with all code acquiring it in shared mode
1251 # and all code acquiring it exclusively.
1252 self._lock = locking.SharedLock("JobQueue")
1254 self.acquire = self._lock.acquire
1255 self.release = self._lock.release
1257 # Initialize the queue, and acquire the filelock.
1258 # This ensures no other process is working on the job queue.
1259 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1262 self._last_serial = jstore.ReadSerial()
1263 assert self._last_serial is not None, ("Serial file was modified between"
1264 " check in jstore and here")
1266 # Get initial list of nodes
1267 self._nodes = dict((n.name, n.primary_ip)
1268 for n in self.context.cfg.GetAllNodesInfo().values()
1269 if n.master_candidate)
1271 # Remove master node
1272 self._nodes.pop(self._my_hostname, None)
1274 # TODO: Check consistency across nodes
1276 self._queue_size = 0
1277 self._UpdateQueueSizeUnlocked()
1278 self._drained = jstore.CheckDrainFlag()
1281 self._wpool = _JobQueueWorkerPool(self)
1283 self._InspectQueue()
1285 self._wpool.TerminateWorkers()
1288 @locking.ssynchronized(_LOCK)
1290 def _InspectQueue(self):
1291 """Loads the whole job queue and resumes unfinished jobs.
1293 This function needs the lock here because WorkerPool.AddTask() may start a
1294 job while we're still doing our work.
1297 logging.info("Inspecting job queue")
1301 all_job_ids = self._GetJobIDsUnlocked()
1302 jobs_count = len(all_job_ids)
1303 lastinfo = time.time()
1304 for idx, job_id in enumerate(all_job_ids):
1305 # Give an update every 1000 jobs or 10 seconds
1306 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1307 idx == (jobs_count - 1)):
1308 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1309 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1310 lastinfo = time.time()
1312 job = self._LoadJobUnlocked(job_id)
1314 # a failure in loading the job can cause 'None' to be returned
1318 status = job.CalcStatus()
1320 if status == constants.JOB_STATUS_QUEUED:
1321 restartjobs.append(job)
1323 elif status in (constants.JOB_STATUS_RUNNING,
1324 constants.JOB_STATUS_WAITLOCK,
1325 constants.JOB_STATUS_CANCELING):
1326 logging.warning("Unfinished job %s found: %s", job.id, job)
1328 if status == constants.JOB_STATUS_WAITLOCK:
1330 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1331 restartjobs.append(job)
1333 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1334 "Unclean master daemon shutdown")
1336 self.UpdateJobUnlocked(job)
1339 logging.info("Restarting %s jobs", len(restartjobs))
1340 self._EnqueueJobs(restartjobs)
1342 logging.info("Job queue inspection finished")
1344 @locking.ssynchronized(_LOCK)
1346 def AddNode(self, node):
1347 """Register a new node with the queue.
1349 @type node: L{objects.Node}
1350 @param node: the node object to be added
1353 node_name = node.name
1354 assert node_name != self._my_hostname
1356 # Clean queue directory on added node
1357 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1358 msg = result.fail_msg
1360 logging.warning("Cannot cleanup queue directory on node %s: %s",
1363 if not node.master_candidate:
1364 # remove if existing, ignoring errors
1365 self._nodes.pop(node_name, None)
1366 # and skip the replication of the job ids
1369 # Upload the whole queue excluding archived jobs
1370 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1372 # Upload current serial file
1373 files.append(constants.JOB_QUEUE_SERIAL_FILE)
1375 for file_name in files:
1377 content = utils.ReadFile(file_name)
1379 result = rpc.RpcRunner.call_jobqueue_update([node_name],
1382 msg = result[node_name].fail_msg
1384 logging.error("Failed to upload file %s to node %s: %s",
1385 file_name, node_name, msg)
1387 self._nodes[node_name] = node.primary_ip
1389 @locking.ssynchronized(_LOCK)
1391 def RemoveNode(self, node_name):
1392 """Callback called when removing nodes from the cluster.
1394 @type node_name: str
1395 @param node_name: the name of the node to remove
1398 self._nodes.pop(node_name, None)
1401 def _CheckRpcResult(result, nodes, failmsg):
1402 """Verifies the status of an RPC call.
1404 Since we aim to keep consistency should this node (the current
1405 master) fail, we will log errors if our rpc fail, and especially
1406 log the case when more than half of the nodes fails.
1408 @param result: the data as returned from the rpc call
1410 @param nodes: the list of nodes we made the call to
1412 @param failmsg: the identifier to be used for logging
1419 msg = result[node].fail_msg
1422 logging.error("RPC call %s (%s) failed on node %s: %s",
1423 result[node].call, failmsg, node, msg)
1425 success.append(node)
1427 # +1 for the master node
1428 if (len(success) + 1) < len(failed):
1429 # TODO: Handle failing nodes
1430 logging.error("More than half of the nodes failed")
1432 def _GetNodeIp(self):
1433 """Helper for returning the node name/ip list.
1435 @rtype: (list, list)
1436 @return: a tuple of two lists, the first one with the node
1437 names and the second one with the node addresses
1440 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1441 name_list = self._nodes.keys()
1442 addr_list = [self._nodes[name] for name in name_list]
1443 return name_list, addr_list
1445 def _UpdateJobQueueFile(self, file_name, data, replicate):
1446 """Writes a file locally and then replicates it to all nodes.
1448 This function will replace the contents of a file on the local
1449 node and then replicate it to all the other nodes we have.
1451 @type file_name: str
1452 @param file_name: the path of the file to be replicated
1454 @param data: the new contents of the file
1455 @type replicate: boolean
1456 @param replicate: whether to spread the changes to the remote nodes
1459 getents = runtime.GetEnts()
1460 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1461 gid=getents.masterd_gid)
1464 names, addrs = self._GetNodeIp()
1465 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1466 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1468 def _RenameFilesUnlocked(self, rename):
1469 """Renames a file locally and then replicate the change.
1471 This function will rename a file in the local queue directory
1472 and then replicate this rename to all the other nodes we have.
1474 @type rename: list of (old, new)
1475 @param rename: List containing tuples mapping old to new names
1478 # Rename them locally
1479 for old, new in rename:
1480 utils.RenameFile(old, new, mkdir=True)
1482 # ... and on all nodes
1483 names, addrs = self._GetNodeIp()
1484 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1485 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1488 def _FormatJobID(job_id):
1489 """Convert a job ID to string format.
1491 Currently this just does C{str(job_id)} after performing some
1492 checks, but if we want to change the job id format this will
1493 abstract this change.
1495 @type job_id: int or long
1496 @param job_id: the numeric job id
1498 @return: the formatted job id
1501 if not isinstance(job_id, (int, long)):
1502 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1504 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1509 def _GetArchiveDirectory(cls, job_id):
1510 """Returns the archive directory for a job.
1513 @param job_id: Job identifier
1515 @return: Directory name
1518 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1520 def _NewSerialsUnlocked(self, count):
1521 """Generates a new job identifier.
1523 Job identifiers are unique during the lifetime of a cluster.
1525 @type count: integer
1526 @param count: how many serials to return
1528 @return: a string representing the job identifier.
1533 serial = self._last_serial + count
1536 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1537 "%s\n" % serial, True)
1539 result = [self._FormatJobID(v)
1540 for v in range(self._last_serial, serial + 1)]
1541 # Keep it only if we were able to write the file
1542 self._last_serial = serial
1547 def _GetJobPath(job_id):
1548 """Returns the job file for a given job id.
1551 @param job_id: the job identifier
1553 @return: the path to the job file
1556 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1559 def _GetArchivedJobPath(cls, job_id):
1560 """Returns the archived job file for a give job id.
1563 @param job_id: the job identifier
1565 @return: the path to the archived job file
1568 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1569 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1571 def _GetJobIDsUnlocked(self, sort=True):
1572 """Return all known job IDs.
1574 The method only looks at disk because it's a requirement that all
1575 jobs are present on disk (so in the _memcache we don't have any
1579 @param sort: perform sorting on the returned job ids
1581 @return: the list of job IDs
1585 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1586 m = self._RE_JOB_FILE.match(filename)
1588 jlist.append(m.group(1))
1590 jlist = utils.NiceSort(jlist)
1593 def _LoadJobUnlocked(self, job_id):
1594 """Loads a job from the disk or memory.
1596 Given a job id, this will return the cached job object if
1597 existing, or try to load the job from the disk. If loading from
1598 disk, it will also add the job to the cache.
1600 @param job_id: the job id
1601 @rtype: L{_QueuedJob} or None
1602 @return: either None or the job object
1605 job = self._memcache.get(job_id, None)
1607 logging.debug("Found job %s in memcache", job_id)
1611 job = self._LoadJobFromDisk(job_id, False)
1614 except errors.JobFileCorrupted:
1615 old_path = self._GetJobPath(job_id)
1616 new_path = self._GetArchivedJobPath(job_id)
1617 if old_path == new_path:
1618 # job already archived (future case)
1619 logging.exception("Can't parse job %s", job_id)
1622 logging.exception("Can't parse job %s, will archive.", job_id)
1623 self._RenameFilesUnlocked([(old_path, new_path)])
1626 self._memcache[job_id] = job
1627 logging.debug("Added job %s to the cache", job_id)
1630 def _LoadJobFromDisk(self, job_id, try_archived):
1631 """Load the given job file from disk.
1633 Given a job file, read, load and restore it in a _QueuedJob format.
1635 @type job_id: string
1636 @param job_id: job identifier
1637 @type try_archived: bool
1638 @param try_archived: Whether to try loading an archived job
1639 @rtype: L{_QueuedJob} or None
1640 @return: either None or the job object
1643 path_functions = [self._GetJobPath]
1646 path_functions.append(self._GetArchivedJobPath)
1650 for fn in path_functions:
1651 filepath = fn(job_id)
1652 logging.debug("Loading job from %s", filepath)
1654 raw_data = utils.ReadFile(filepath)
1655 except EnvironmentError, err:
1656 if err.errno != errno.ENOENT:
1665 data = serializer.LoadJson(raw_data)
1666 job = _QueuedJob.Restore(self, data)
1667 except Exception, err: # pylint: disable-msg=W0703
1668 raise errors.JobFileCorrupted(err)
1672 def SafeLoadJobFromDisk(self, job_id, try_archived):
1673 """Load the given job file from disk.
1675 Given a job file, read, load and restore it in a _QueuedJob format.
1676 In case of error reading the job, it gets returned as None, and the
1677 exception is logged.
1679 @type job_id: string
1680 @param job_id: job identifier
1681 @type try_archived: bool
1682 @param try_archived: Whether to try loading an archived job
1683 @rtype: L{_QueuedJob} or None
1684 @return: either None or the job object
1688 return self._LoadJobFromDisk(job_id, try_archived)
1689 except (errors.JobFileCorrupted, EnvironmentError):
1690 logging.exception("Can't load/parse job %s", job_id)
1693 def _UpdateQueueSizeUnlocked(self):
1694 """Update the queue size.
1697 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1699 @locking.ssynchronized(_LOCK)
1701 def SetDrainFlag(self, drain_flag):
1702 """Sets the drain flag for the queue.
1704 @type drain_flag: boolean
1705 @param drain_flag: Whether to set or unset the drain flag
1708 jstore.SetDrainFlag(drain_flag)
1710 self._drained = drain_flag
1715 def _SubmitJobUnlocked(self, job_id, ops):
1716 """Create and store a new job.
1718 This enters the job into our job queue and also puts it on the new
1719 queue, in order for it to be picked up by the queue processors.
1721 @type job_id: job ID
1722 @param job_id: the job ID for the new job
1724 @param ops: The list of OpCodes that will become the new job.
1725 @rtype: L{_QueuedJob}
1726 @return: the job object to be queued
1727 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1728 @raise errors.JobQueueFull: if the job queue has too many jobs in it
1729 @raise errors.GenericError: If an opcode is not valid
1732 # Ok when sharing the big job queue lock, as the drain file is created when
1733 # the lock is exclusive.
1735 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1737 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1738 raise errors.JobQueueFull()
1740 job = _QueuedJob(self, job_id, ops)
1743 for idx, op in enumerate(job.ops):
1744 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1745 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1746 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1747 " are %s" % (idx, op.priority, allowed))
1750 self.UpdateJobUnlocked(job)
1752 self._queue_size += 1
1754 logging.debug("Adding new job %s to the cache", job_id)
1755 self._memcache[job_id] = job
1759 @locking.ssynchronized(_LOCK)
1761 def SubmitJob(self, ops):
1762 """Create and store a new job.
1764 @see: L{_SubmitJobUnlocked}
1767 job_id = self._NewSerialsUnlocked(1)[0]
1768 self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1771 @locking.ssynchronized(_LOCK)
1773 def SubmitManyJobs(self, jobs):
1774 """Create and store multiple jobs.
1776 @see: L{_SubmitJobUnlocked}
1781 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1782 for job_id, ops in zip(all_job_ids, jobs):
1784 added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1787 except errors.GenericError, err:
1788 data = ("%s; opcodes %s" %
1789 (err, utils.CommaJoin(op.Summary() for op in ops)))
1791 results.append((status, data))
1793 self._EnqueueJobs(added_jobs)
1797 def _EnqueueJobs(self, jobs):
1798 """Helper function to add jobs to worker pool's queue.
1801 @param jobs: List of all jobs
1804 self._wpool.AddManyTasks([(job, ) for job in jobs],
1805 priority=[job.CalcPriority() for job in jobs])
1808 def UpdateJobUnlocked(self, job, replicate=True):
1809 """Update a job's on disk storage.
1811 After a job has been modified, this function needs to be called in
1812 order to write the changes to disk and replicate them to the other
1815 @type job: L{_QueuedJob}
1816 @param job: the changed job
1817 @type replicate: boolean
1818 @param replicate: whether to replicate the change to remote nodes
1822 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1823 assert (finalized ^ (job.end_timestamp is None))
1825 filename = self._GetJobPath(job.id)
1826 data = serializer.DumpJson(job.Serialize(), indent=False)
1827 logging.debug("Writing job %s to %s", job.id, filename)
1828 self._UpdateJobQueueFile(filename, data, replicate)
1830 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1832 """Waits for changes in a job.
1834 @type job_id: string
1835 @param job_id: Job identifier
1836 @type fields: list of strings
1837 @param fields: Which fields to check for changes
1838 @type prev_job_info: list or None
1839 @param prev_job_info: Last job information returned
1840 @type prev_log_serial: int
1841 @param prev_log_serial: Last job message serial number
1842 @type timeout: float
1843 @param timeout: maximum time to wait in seconds
1844 @rtype: tuple (job info, log entries)
1845 @return: a tuple of the job information as required via
1846 the fields parameter, and the log entries as a list
1848 if the job has not changed and the timeout has expired,
1849 we instead return a special value,
1850 L{constants.JOB_NOTCHANGED}, which should be interpreted
1851 as such by the clients
1854 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False)
1856 helper = _WaitForJobChangesHelper()
1858 return helper(self._GetJobPath(job_id), load_fn,
1859 fields, prev_job_info, prev_log_serial, timeout)
1861 @locking.ssynchronized(_LOCK)
1863 def CancelJob(self, job_id):
1866 This will only succeed if the job has not started yet.
1868 @type job_id: string
1869 @param job_id: job ID of job to be cancelled.
1872 logging.info("Cancelling job %s", job_id)
1874 job = self._LoadJobUnlocked(job_id)
1876 logging.debug("Job %s not found", job_id)
1877 return (False, "Job %s not found" % job_id)
1879 (success, msg) = job.Cancel()
1882 # If the job was finalized (e.g. cancelled), this is the final write
1883 # allowed. The job can be archived anytime.
1884 self.UpdateJobUnlocked(job)
1886 return (success, msg)
1889 def _ArchiveJobsUnlocked(self, jobs):
1892 @type jobs: list of L{_QueuedJob}
1893 @param jobs: Job objects
1895 @return: Number of archived jobs
1901 if job.CalcStatus() not in constants.JOBS_FINALIZED:
1902 logging.debug("Job %s is not yet done", job.id)
1905 archive_jobs.append(job)
1907 old = self._GetJobPath(job.id)
1908 new = self._GetArchivedJobPath(job.id)
1909 rename_files.append((old, new))
1911 # TODO: What if 1..n files fail to rename?
1912 self._RenameFilesUnlocked(rename_files)
1914 logging.debug("Successfully archived job(s) %s",
1915 utils.CommaJoin(job.id for job in archive_jobs))
1917 # Since we haven't quite checked, above, if we succeeded or failed renaming
1918 # the files, we update the cached queue size from the filesystem. When we
1919 # get around to fix the TODO: above, we can use the number of actually
1920 # archived jobs to fix this.
1921 self._UpdateQueueSizeUnlocked()
1922 return len(archive_jobs)
1924 @locking.ssynchronized(_LOCK)
1926 def ArchiveJob(self, job_id):
1929 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1931 @type job_id: string
1932 @param job_id: Job ID of job to be archived.
1934 @return: Whether job was archived
1937 logging.info("Archiving job %s", job_id)
1939 job = self._LoadJobUnlocked(job_id)
1941 logging.debug("Job %s not found", job_id)
1944 return self._ArchiveJobsUnlocked([job]) == 1
1946 @locking.ssynchronized(_LOCK)
1948 def AutoArchiveJobs(self, age, timeout):
1949 """Archives all jobs based on age.
1951 The method will archive all jobs which are older than the age
1952 parameter. For jobs that don't have an end timestamp, the start
1953 timestamp will be considered. The special '-1' age will cause
1954 archival of all jobs (that are not running or queued).
1957 @param age: the minimum age in seconds
1960 logging.info("Archiving jobs with age more than %s seconds", age)
1963 end_time = now + timeout
1967 all_job_ids = self._GetJobIDsUnlocked()
1969 for idx, job_id in enumerate(all_job_ids):
1970 last_touched = idx + 1
1972 # Not optimal because jobs could be pending
1973 # TODO: Measure average duration for job archival and take number of
1974 # pending jobs into account.
1975 if time.time() > end_time:
1978 # Returns None if the job failed to load
1979 job = self._LoadJobUnlocked(job_id)
1981 if job.end_timestamp is None:
1982 if job.start_timestamp is None:
1983 job_age = job.received_timestamp
1985 job_age = job.start_timestamp
1987 job_age = job.end_timestamp
1989 if age == -1 or now - job_age[0] > age:
1992 # Archive 10 jobs at a time
1993 if len(pending) >= 10:
1994 archived_count += self._ArchiveJobsUnlocked(pending)
1998 archived_count += self._ArchiveJobsUnlocked(pending)
2000 return (archived_count, len(all_job_ids) - last_touched)
2002 def QueryJobs(self, job_ids, fields):
2003 """Returns a list of jobs in queue.
2006 @param job_ids: sequence of job identifiers or None for all
2008 @param fields: names of fields to return
2010 @return: list one element per job, each element being list with
2011 the requested fields
2017 # Since files are added to/removed from the queue atomically, there's no
2018 # risk of getting the job ids in an inconsistent state.
2019 job_ids = self._GetJobIDsUnlocked()
2022 for job_id in job_ids:
2023 job = self.SafeLoadJobFromDisk(job_id, True)
2025 jobs.append(job.GetInfo(fields))
2031 @locking.ssynchronized(_LOCK)
2034 """Stops the job queue.
2036 This shutdowns all the worker threads an closes the queue.
2039 self._wpool.TerminateWorkers()
2041 self._queue_filelock.Close()
2042 self._queue_filelock = None