4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
39 # pylint: disable-msg=E0611
40 from pyinotify import pyinotify
44 from ganeti import asyncnotifier
45 from ganeti import constants
46 from ganeti import serializer
47 from ganeti import workerpool
48 from ganeti import locking
49 from ganeti import opcodes
50 from ganeti import errors
51 from ganeti import mcpu
52 from ganeti import utils
53 from ganeti import jstore
54 from ganeti import rpc
55 from ganeti import runtime
56 from ganeti import netutils
57 from ganeti import compat
61 JOBS_PER_ARCHIVE_DIRECTORY = 10000
63 # member lock names to be passed to @ssynchronized decorator
68 class CancelJob(Exception):
69 """Special exception to cancel a job.
75 """Returns the current timestamp.
78 @return: the current time in the (seconds, microseconds) format
81 return utils.SplitTime(time.time())
84 class _QueuedOpCode(object):
85 """Encapsulates an opcode object.
87 @ivar log: holds the execution log and consists of tuples
88 of the form C{(log_serial, timestamp, level, message)}
89 @ivar input: the OpCode we encapsulate
90 @ivar status: the current status
91 @ivar result: the result of the LU execution
92 @ivar start_timestamp: timestamp for the start of the execution
93 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94 @ivar stop_timestamp: timestamp for the end of the execution
97 __slots__ = ["input", "status", "result", "log", "priority",
98 "start_timestamp", "exec_timestamp", "end_timestamp",
101 def __init__(self, op):
102 """Constructor for the _QuededOpCode.
104 @type op: L{opcodes.OpCode}
105 @param op: the opcode we encapsulate
109 self.status = constants.OP_STATUS_QUEUED
112 self.start_timestamp = None
113 self.exec_timestamp = None
114 self.end_timestamp = None
116 # Get initial priority (it might change during the lifetime of this opcode)
117 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
120 def Restore(cls, state):
121 """Restore the _QueuedOpCode from the serialized form.
124 @param state: the serialized state
125 @rtype: _QueuedOpCode
126 @return: a new _QueuedOpCode instance
129 obj = _QueuedOpCode.__new__(cls)
130 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
131 obj.status = state["status"]
132 obj.result = state["result"]
133 obj.log = state["log"]
134 obj.start_timestamp = state.get("start_timestamp", None)
135 obj.exec_timestamp = state.get("exec_timestamp", None)
136 obj.end_timestamp = state.get("end_timestamp", None)
137 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
141 """Serializes this _QueuedOpCode.
144 @return: the dictionary holding the serialized state
148 "input": self.input.__getstate__(),
149 "status": self.status,
150 "result": self.result,
152 "start_timestamp": self.start_timestamp,
153 "exec_timestamp": self.exec_timestamp,
154 "end_timestamp": self.end_timestamp,
155 "priority": self.priority,
159 class _QueuedJob(object):
160 """In-memory job representation.
162 This is what we use to track the user-submitted jobs. Locking must
163 be taken care of by users of this class.
165 @type queue: L{JobQueue}
166 @ivar queue: the parent queue
169 @ivar ops: the list of _QueuedOpCode that constitute the job
170 @type log_serial: int
171 @ivar log_serial: holds the index for the next log entry
172 @ivar received_timestamp: the timestamp for when the job was received
173 @ivar start_timestmap: the timestamp for start of execution
174 @ivar end_timestamp: the timestamp for end of execution
177 # pylint: disable-msg=W0212
178 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
179 "received_timestamp", "start_timestamp", "end_timestamp",
182 def __init__(self, queue, job_id, ops):
183 """Constructor for the _QueuedJob.
185 @type queue: L{JobQueue}
186 @param queue: our parent queue
188 @param job_id: our job id
190 @param ops: the list of opcodes we hold, which will be encapsulated
195 raise errors.GenericError("A job needs at least one opcode")
199 self.ops = [_QueuedOpCode(op) for op in ops]
201 self.received_timestamp = TimeStampNow()
202 self.start_timestamp = None
203 self.end_timestamp = None
205 self._InitInMemory(self)
208 def _InitInMemory(obj):
209 """Initializes in-memory variables.
216 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
218 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
220 return "<%s at %#x>" % (" ".join(status), id(self))
223 def Restore(cls, queue, state):
224 """Restore a _QueuedJob from serialized state:
226 @type queue: L{JobQueue}
227 @param queue: to which queue the restored job belongs
229 @param state: the serialized state
231 @return: the restored _JobQueue instance
234 obj = _QueuedJob.__new__(cls)
237 obj.received_timestamp = state.get("received_timestamp", None)
238 obj.start_timestamp = state.get("start_timestamp", None)
239 obj.end_timestamp = state.get("end_timestamp", None)
243 for op_state in state["ops"]:
244 op = _QueuedOpCode.Restore(op_state)
245 for log_entry in op.log:
246 obj.log_serial = max(obj.log_serial, log_entry[0])
249 cls._InitInMemory(obj)
254 """Serialize the _JobQueue instance.
257 @return: the serialized state
262 "ops": [op.Serialize() for op in self.ops],
263 "start_timestamp": self.start_timestamp,
264 "end_timestamp": self.end_timestamp,
265 "received_timestamp": self.received_timestamp,
268 def CalcStatus(self):
269 """Compute the status of this job.
271 This function iterates over all the _QueuedOpCodes in the job and
272 based on their status, computes the job status.
275 - if we find a cancelled, or finished with error, the job
276 status will be the same
277 - otherwise, the last opcode with the status one of:
282 will determine the job status
284 - otherwise, it means either all opcodes are queued, or success,
285 and the job status will be the same
287 @return: the job status
290 status = constants.JOB_STATUS_QUEUED
294 if op.status == constants.OP_STATUS_SUCCESS:
299 if op.status == constants.OP_STATUS_QUEUED:
301 elif op.status == constants.OP_STATUS_WAITLOCK:
302 status = constants.JOB_STATUS_WAITLOCK
303 elif op.status == constants.OP_STATUS_RUNNING:
304 status = constants.JOB_STATUS_RUNNING
305 elif op.status == constants.OP_STATUS_CANCELING:
306 status = constants.JOB_STATUS_CANCELING
308 elif op.status == constants.OP_STATUS_ERROR:
309 status = constants.JOB_STATUS_ERROR
310 # The whole job fails if one opcode failed
312 elif op.status == constants.OP_STATUS_CANCELED:
313 status = constants.OP_STATUS_CANCELED
317 status = constants.JOB_STATUS_SUCCESS
321 def CalcPriority(self):
322 """Gets the current priority for this job.
324 Only unfinished opcodes are considered. When all are done, the default
330 priorities = [op.priority for op in self.ops
331 if op.status not in constants.OPS_FINALIZED]
334 # All opcodes are done, assume default priority
335 return constants.OP_PRIO_DEFAULT
337 return min(priorities)
339 def GetLogEntries(self, newer_than):
340 """Selectively returns the log entries.
342 @type newer_than: None or int
343 @param newer_than: if this is None, return all log entries,
344 otherwise return only the log entries with serial higher
347 @return: the list of the log entries selected
350 if newer_than is None:
357 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
361 def GetInfo(self, fields):
362 """Returns information about a job.
365 @param fields: names of fields to return
367 @return: list with one element for each field
368 @raise errors.OpExecError: when an invalid field
376 elif fname == "status":
377 row.append(self.CalcStatus())
378 elif fname == "priority":
379 row.append(self.CalcPriority())
381 row.append([op.input.__getstate__() for op in self.ops])
382 elif fname == "opresult":
383 row.append([op.result for op in self.ops])
384 elif fname == "opstatus":
385 row.append([op.status for op in self.ops])
386 elif fname == "oplog":
387 row.append([op.log for op in self.ops])
388 elif fname == "opstart":
389 row.append([op.start_timestamp for op in self.ops])
390 elif fname == "opexec":
391 row.append([op.exec_timestamp for op in self.ops])
392 elif fname == "opend":
393 row.append([op.end_timestamp for op in self.ops])
394 elif fname == "oppriority":
395 row.append([op.priority for op in self.ops])
396 elif fname == "received_ts":
397 row.append(self.received_timestamp)
398 elif fname == "start_ts":
399 row.append(self.start_timestamp)
400 elif fname == "end_ts":
401 row.append(self.end_timestamp)
402 elif fname == "summary":
403 row.append([op.input.Summary() for op in self.ops])
405 raise errors.OpExecError("Invalid self query field '%s'" % fname)
408 def MarkUnfinishedOps(self, status, result):
409 """Mark unfinished opcodes with a given status and result.
411 This is an utility function for marking all running or waiting to
412 be run opcodes with a given status. Opcodes which are already
413 finalised are not changed.
415 @param status: a given opcode status
416 @param result: the opcode result
421 if op.status in constants.OPS_FINALIZED:
422 assert not_marked, "Finalized opcodes found after non-finalized ones"
429 """Marks job as canceled/-ing if possible.
431 @rtype: tuple; (bool, string)
432 @return: Boolean describing whether job was successfully canceled or marked
433 as canceling and a text message
436 status = self.CalcStatus()
438 if status == constants.JOB_STATUS_QUEUED:
439 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
440 "Job canceled by request")
441 return (True, "Job %s canceled" % self.id)
443 elif status == constants.JOB_STATUS_WAITLOCK:
444 # The worker will notice the new status and cancel the job
445 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
446 return (True, "Job %s will be canceled" % self.id)
449 logging.debug("Job %s is no longer waiting in the queue", self.id)
450 return (False, "Job %s is no longer waiting in the queue" % self.id)
453 class _OpExecCallbacks(mcpu.OpExecCbBase):
454 def __init__(self, queue, job, op):
455 """Initializes this class.
457 @type queue: L{JobQueue}
458 @param queue: Job queue
459 @type job: L{_QueuedJob}
460 @param job: Job object
461 @type op: L{_QueuedOpCode}
465 assert queue, "Queue is missing"
466 assert job, "Job is missing"
467 assert op, "Opcode is missing"
473 def _CheckCancel(self):
474 """Raises an exception to cancel the job if asked to.
477 # Cancel here if we were asked to
478 if self._op.status == constants.OP_STATUS_CANCELING:
479 logging.debug("Canceling opcode")
482 @locking.ssynchronized(_QUEUE, shared=1)
483 def NotifyStart(self):
484 """Mark the opcode as running, not lock-waiting.
486 This is called from the mcpu code as a notifier function, when the LU is
487 finally about to start the Exec() method. Of course, to have end-user
488 visible results, the opcode must be initially (before calling into
489 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
492 assert self._op in self._job.ops
493 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
494 constants.OP_STATUS_CANCELING)
496 # Cancel here if we were asked to
499 logging.debug("Opcode is now running")
501 self._op.status = constants.OP_STATUS_RUNNING
502 self._op.exec_timestamp = TimeStampNow()
504 # And finally replicate the job status
505 self._queue.UpdateJobUnlocked(self._job)
507 @locking.ssynchronized(_QUEUE, shared=1)
508 def _AppendFeedback(self, timestamp, log_type, log_msg):
509 """Internal feedback append function, with locks
512 self._job.log_serial += 1
513 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
514 self._queue.UpdateJobUnlocked(self._job, replicate=False)
516 def Feedback(self, *args):
517 """Append a log entry.
523 log_type = constants.ELOG_MESSAGE
526 (log_type, log_msg) = args
528 # The time is split to make serialization easier and not lose
530 timestamp = utils.SplitTime(time.time())
531 self._AppendFeedback(timestamp, log_type, log_msg)
533 def CheckCancel(self):
534 """Check whether job has been cancelled.
537 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
538 constants.OP_STATUS_CANCELING)
540 # Cancel here if we were asked to
544 class _JobChangesChecker(object):
545 def __init__(self, fields, prev_job_info, prev_log_serial):
546 """Initializes this class.
548 @type fields: list of strings
549 @param fields: Fields requested by LUXI client
550 @type prev_job_info: string
551 @param prev_job_info: previous job info, as passed by the LUXI client
552 @type prev_log_serial: string
553 @param prev_log_serial: previous job serial, as passed by the LUXI client
556 self._fields = fields
557 self._prev_job_info = prev_job_info
558 self._prev_log_serial = prev_log_serial
560 def __call__(self, job):
561 """Checks whether job has changed.
563 @type job: L{_QueuedJob}
564 @param job: Job object
567 status = job.CalcStatus()
568 job_info = job.GetInfo(self._fields)
569 log_entries = job.GetLogEntries(self._prev_log_serial)
571 # Serializing and deserializing data can cause type changes (e.g. from
572 # tuple to list) or precision loss. We're doing it here so that we get
573 # the same modifications as the data received from the client. Without
574 # this, the comparison afterwards might fail without the data being
575 # significantly different.
576 # TODO: we just deserialized from disk, investigate how to make sure that
577 # the job info and log entries are compatible to avoid this further step.
578 # TODO: Doing something like in testutils.py:UnifyValueType might be more
579 # efficient, though floats will be tricky
580 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
581 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
583 # Don't even try to wait if the job is no longer running, there will be
585 if (status not in (constants.JOB_STATUS_QUEUED,
586 constants.JOB_STATUS_RUNNING,
587 constants.JOB_STATUS_WAITLOCK) or
588 job_info != self._prev_job_info or
589 (log_entries and self._prev_log_serial != log_entries[0][0])):
590 logging.debug("Job %s changed", job.id)
591 return (job_info, log_entries)
596 class _JobFileChangesWaiter(object):
597 def __init__(self, filename):
598 """Initializes this class.
600 @type filename: string
601 @param filename: Path to job file
602 @raises errors.InotifyError: if the notifier cannot be setup
605 self._wm = pyinotify.WatchManager()
606 self._inotify_handler = \
607 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
609 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
611 self._inotify_handler.enable()
613 # pyinotify doesn't close file descriptors automatically
614 self._notifier.stop()
617 def _OnInotify(self, notifier_enabled):
618 """Callback for inotify.
621 if not notifier_enabled:
622 self._inotify_handler.enable()
624 def Wait(self, timeout):
625 """Waits for the job file to change.
628 @param timeout: Timeout in seconds
629 @return: Whether there have been events
633 have_events = self._notifier.check_events(timeout * 1000)
635 self._notifier.read_events()
636 self._notifier.process_events()
640 """Closes underlying notifier and its file descriptor.
643 self._notifier.stop()
646 class _JobChangesWaiter(object):
647 def __init__(self, filename):
648 """Initializes this class.
650 @type filename: string
651 @param filename: Path to job file
654 self._filewaiter = None
655 self._filename = filename
657 def Wait(self, timeout):
658 """Waits for a job to change.
661 @param timeout: Timeout in seconds
662 @return: Whether there have been events
666 return self._filewaiter.Wait(timeout)
668 # Lazy setup: Avoid inotify setup cost when job file has already changed.
669 # If this point is reached, return immediately and let caller check the job
670 # file again in case there were changes since the last check. This avoids a
672 self._filewaiter = _JobFileChangesWaiter(self._filename)
677 """Closes underlying waiter.
681 self._filewaiter.Close()
684 class _WaitForJobChangesHelper(object):
685 """Helper class using inotify to wait for changes in a job file.
687 This class takes a previous job status and serial, and alerts the client when
688 the current job status has changed.
692 def _CheckForChanges(job_load_fn, check_fn):
695 raise errors.JobLost()
697 result = check_fn(job)
699 raise utils.RetryAgain()
703 def __call__(self, filename, job_load_fn,
704 fields, prev_job_info, prev_log_serial, timeout):
705 """Waits for changes on a job.
707 @type filename: string
708 @param filename: File on which to wait for changes
709 @type job_load_fn: callable
710 @param job_load_fn: Function to load job
711 @type fields: list of strings
712 @param fields: Which fields to check for changes
713 @type prev_job_info: list or None
714 @param prev_job_info: Last job information returned
715 @type prev_log_serial: int
716 @param prev_log_serial: Last job message serial number
718 @param timeout: maximum time to wait in seconds
722 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
723 waiter = _JobChangesWaiter(filename)
725 return utils.Retry(compat.partial(self._CheckForChanges,
726 job_load_fn, check_fn),
727 utils.RETRY_REMAINING_TIME, timeout,
731 except (errors.InotifyError, errors.JobLost):
733 except utils.RetryTimeout:
734 return constants.JOB_NOTCHANGED
737 def _EncodeOpError(err):
738 """Encodes an error which occurred while processing an opcode.
741 if isinstance(err, errors.GenericError):
744 to_encode = errors.OpExecError(str(err))
746 return errors.EncodeException(to_encode)
749 class _TimeoutStrategyWrapper:
750 def __init__(self, fn):
751 """Initializes this class.
758 """Gets the next timeout if necessary.
761 if self._next is None:
762 self._next = self._fn()
765 """Returns the next timeout.
772 """Returns the current timeout and advances the internal state.
781 class _OpExecContext:
782 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
783 """Initializes this class.
788 self.log_prefix = log_prefix
789 self.summary = op.input.Summary()
791 self._timeout_strategy_factory = timeout_strategy_factory
792 self._ResetTimeoutStrategy()
794 def _ResetTimeoutStrategy(self):
795 """Creates a new timeout strategy.
798 self._timeout_strategy = \
799 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
801 def CheckPriorityIncrease(self):
802 """Checks whether priority can and should be increased.
804 Called when locks couldn't be acquired.
809 # Exhausted all retries and next round should not use blocking acquire
811 if (self._timeout_strategy.Peek() is None and
812 op.priority > constants.OP_PRIO_HIGHEST):
813 logging.debug("Increasing priority")
815 self._ResetTimeoutStrategy()
820 def GetNextLockTimeout(self):
821 """Returns the next lock acquire timeout.
824 return self._timeout_strategy.Next()
827 class _JobProcessor(object):
828 def __init__(self, queue, opexec_fn, job,
829 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
830 """Initializes this class.
834 self.opexec_fn = opexec_fn
836 self._timeout_strategy_factory = _timeout_strategy_factory
839 def _FindNextOpcode(job, timeout_strategy_factory):
840 """Locates the next opcode to run.
842 @type job: L{_QueuedJob}
843 @param job: Job object
844 @param timeout_strategy_factory: Callable to create new timeout strategy
847 # Create some sort of a cache to speed up locating next opcode for future
849 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
850 # pending and one for processed ops.
851 if job.ops_iter is None:
852 job.ops_iter = enumerate(job.ops)
854 # Find next opcode to run
857 (idx, op) = job.ops_iter.next()
858 except StopIteration:
859 raise errors.ProgrammerError("Called for a finished job")
861 if op.status == constants.OP_STATUS_RUNNING:
862 # Found an opcode already marked as running
863 raise errors.ProgrammerError("Called for job marked as running")
865 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
866 timeout_strategy_factory)
868 if op.status == constants.OP_STATUS_CANCELED:
869 # Cancelled jobs are handled by the caller
870 assert not compat.any(i.status != constants.OP_STATUS_CANCELED
871 for i in job.ops[idx:])
873 elif op.status in constants.OPS_FINALIZED:
874 # This is a job that was partially completed before master daemon
875 # shutdown, so it can be expected that some opcodes are already
876 # completed successfully (if any did error out, then the whole job
877 # should have been aborted and not resubmitted for processing).
878 logging.info("%s: opcode %s already processed, skipping",
879 opctx.log_prefix, opctx.summary)
885 def _MarkWaitlock(job, op):
886 """Marks an opcode as waiting for locks.
888 The job's start timestamp is also set if necessary.
890 @type job: L{_QueuedJob}
891 @param job: Job object
892 @type op: L{_QueuedOpCode}
893 @param op: Opcode object
897 assert op.status in (constants.OP_STATUS_QUEUED,
898 constants.OP_STATUS_WAITLOCK)
904 if op.status == constants.OP_STATUS_QUEUED:
905 op.status = constants.OP_STATUS_WAITLOCK
908 if op.start_timestamp is None:
909 op.start_timestamp = TimeStampNow()
912 if job.start_timestamp is None:
913 job.start_timestamp = op.start_timestamp
916 assert op.status == constants.OP_STATUS_WAITLOCK
920 def _ExecOpCodeUnlocked(self, opctx):
921 """Processes one opcode and returns the result.
926 assert op.status == constants.OP_STATUS_WAITLOCK
928 timeout = opctx.GetNextLockTimeout()
931 # Make sure not to hold queue lock while calling ExecOpCode
932 result = self.opexec_fn(op.input,
933 _OpExecCallbacks(self.queue, self.job, op),
934 timeout=timeout, priority=op.priority)
935 except mcpu.LockAcquireTimeout:
936 assert timeout is not None, "Received timeout for blocking acquire"
937 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
939 assert op.status in (constants.OP_STATUS_WAITLOCK,
940 constants.OP_STATUS_CANCELING)
942 # Was job cancelled while we were waiting for the lock?
943 if op.status == constants.OP_STATUS_CANCELING:
944 return (constants.OP_STATUS_CANCELING, None)
946 # Stay in waitlock while trying to re-acquire lock
947 return (constants.OP_STATUS_WAITLOCK, None)
949 logging.exception("%s: Canceling job", opctx.log_prefix)
950 assert op.status == constants.OP_STATUS_CANCELING
951 return (constants.OP_STATUS_CANCELING, None)
952 except Exception, err: # pylint: disable-msg=W0703
953 logging.exception("%s: Caught exception in %s",
954 opctx.log_prefix, opctx.summary)
955 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
957 logging.debug("%s: %s successful",
958 opctx.log_prefix, opctx.summary)
959 return (constants.OP_STATUS_SUCCESS, result)
961 def __call__(self, _nextop_fn=None):
962 """Continues execution of a job.
964 @param _nextop_fn: Callback function for tests
966 @return: True if job is finished, False if processor needs to be called
973 logging.debug("Processing job %s", job.id)
975 queue.acquire(shared=1)
977 opcount = len(job.ops)
979 # Is a previous opcode still pending?
981 opctx = job.cur_opctx
984 if __debug__ and _nextop_fn:
986 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
991 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
992 constants.OP_STATUS_CANCELING,
993 constants.OP_STATUS_CANCELED)
994 for i in job.ops[opctx.index + 1:])
996 assert op.status in (constants.OP_STATUS_QUEUED,
997 constants.OP_STATUS_WAITLOCK,
998 constants.OP_STATUS_CANCELING,
999 constants.OP_STATUS_CANCELED)
1001 assert (op.priority <= constants.OP_PRIO_LOWEST and
1002 op.priority >= constants.OP_PRIO_HIGHEST)
1004 if op.status not in (constants.OP_STATUS_CANCELING,
1005 constants.OP_STATUS_CANCELED):
1006 assert op.status in (constants.OP_STATUS_QUEUED,
1007 constants.OP_STATUS_WAITLOCK)
1009 # Prepare to start opcode
1010 if self._MarkWaitlock(job, op):
1012 queue.UpdateJobUnlocked(job)
1014 assert op.status == constants.OP_STATUS_WAITLOCK
1015 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1016 assert job.start_timestamp and op.start_timestamp
1018 logging.info("%s: opcode %s waiting for locks",
1019 opctx.log_prefix, opctx.summary)
1023 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1025 queue.acquire(shared=1)
1027 op.status = op_status
1028 op.result = op_result
1030 if op.status == constants.OP_STATUS_WAITLOCK:
1031 # Couldn't get locks in time
1032 assert not op.end_timestamp
1035 op.end_timestamp = TimeStampNow()
1037 if op.status == constants.OP_STATUS_CANCELING:
1038 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1039 for i in job.ops[opctx.index:])
1041 assert op.status in constants.OPS_FINALIZED
1043 if op.status == constants.OP_STATUS_WAITLOCK:
1046 if opctx.CheckPriorityIncrease():
1047 # Priority was changed, need to update on-disk file
1048 queue.UpdateJobUnlocked(job)
1050 # Keep around for another round
1051 job.cur_opctx = opctx
1053 assert (op.priority <= constants.OP_PRIO_LOWEST and
1054 op.priority >= constants.OP_PRIO_HIGHEST)
1056 # In no case must the status be finalized here
1057 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1060 # Ensure all opcodes so far have been successful
1061 assert (opctx.index == 0 or
1062 compat.all(i.status == constants.OP_STATUS_SUCCESS
1063 for i in job.ops[:opctx.index]))
1066 job.cur_opctx = None
1068 if op.status == constants.OP_STATUS_SUCCESS:
1071 elif op.status == constants.OP_STATUS_ERROR:
1072 # Ensure failed opcode has an exception as its result
1073 assert errors.GetEncodedError(job.ops[opctx.index].result)
1075 to_encode = errors.OpExecError("Preceding opcode failed")
1076 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1077 _EncodeOpError(to_encode))
1081 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1082 errors.GetEncodedError(i.result)
1083 for i in job.ops[opctx.index:])
1085 elif op.status == constants.OP_STATUS_CANCELING:
1086 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1087 "Job canceled by request")
1090 elif op.status == constants.OP_STATUS_CANCELED:
1094 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1096 # Finalizing or last opcode?
1097 if finalize or opctx.index == (opcount - 1):
1098 # All opcodes have been run, finalize job
1099 job.end_timestamp = TimeStampNow()
1101 # Write to disk. If the job status is final, this is the final write
1102 # allowed. Once the file has been written, it can be archived anytime.
1103 queue.UpdateJobUnlocked(job)
1105 if finalize or opctx.index == (opcount - 1):
1106 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1114 class _JobQueueWorker(workerpool.BaseWorker):
1115 """The actual job workers.
1118 def RunTask(self, job): # pylint: disable-msg=W0221
1121 This functions processes a job. It is closely tied to the L{_QueuedJob} and
1122 L{_QueuedOpCode} classes.
1124 @type job: L{_QueuedJob}
1125 @param job: the job to be processed
1129 assert queue == self.pool.queue
1131 self.SetTaskName("Job%s" % job.id)
1133 proc = mcpu.Processor(queue.context, job.id)
1135 if not _JobProcessor(queue, proc.ExecOpCode, job)():
1137 raise workerpool.DeferTask(priority=job.CalcPriority())
1140 class _JobQueueWorkerPool(workerpool.WorkerPool):
1141 """Simple class implementing a job-processing workerpool.
1144 def __init__(self, queue):
1145 super(_JobQueueWorkerPool, self).__init__("JobQueue",
1151 def _RequireOpenQueue(fn):
1152 """Decorator for "public" functions.
1154 This function should be used for all 'public' functions. That is,
1155 functions usually called from other classes. Note that this should
1156 be applied only to methods (not plain functions), since it expects
1157 that the decorated function is called with a first argument that has
1158 a '_queue_filelock' argument.
1160 @warning: Use this decorator only after locking.ssynchronized
1163 @locking.ssynchronized(_LOCK)
1169 def wrapper(self, *args, **kwargs):
1170 # pylint: disable-msg=W0212
1171 assert self._queue_filelock is not None, "Queue should be open"
1172 return fn(self, *args, **kwargs)
1176 class JobQueue(object):
1177 """Queue used to manage the jobs.
1179 @cvar _RE_JOB_FILE: regex matching the valid job file names
1182 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1184 def __init__(self, context):
1185 """Constructor for JobQueue.
1187 The constructor will initialize the job queue object and then
1188 start loading the current jobs from disk, either for starting them
1189 (if they were queue) or for aborting them (if they were already
1192 @type context: GanetiContext
1193 @param context: the context object for access to the configuration
1194 data and other ganeti objects
1197 self.context = context
1198 self._memcache = weakref.WeakValueDictionary()
1199 self._my_hostname = netutils.Hostname.GetSysName()
1201 # The Big JobQueue lock. If a code block or method acquires it in shared
1202 # mode safe it must guarantee concurrency with all the code acquiring it in
1203 # shared mode, including itself. In order not to acquire it at all
1204 # concurrency must be guaranteed with all code acquiring it in shared mode
1205 # and all code acquiring it exclusively.
1206 self._lock = locking.SharedLock("JobQueue")
1208 self.acquire = self._lock.acquire
1209 self.release = self._lock.release
1211 # Initialize the queue, and acquire the filelock.
1212 # This ensures no other process is working on the job queue.
1213 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1216 self._last_serial = jstore.ReadSerial()
1217 assert self._last_serial is not None, ("Serial file was modified between"
1218 " check in jstore and here")
1220 # Get initial list of nodes
1221 self._nodes = dict((n.name, n.primary_ip)
1222 for n in self.context.cfg.GetAllNodesInfo().values()
1223 if n.master_candidate)
1225 # Remove master node
1226 self._nodes.pop(self._my_hostname, None)
1228 # TODO: Check consistency across nodes
1230 self._queue_size = 0
1231 self._UpdateQueueSizeUnlocked()
1232 self._drained = jstore.CheckDrainFlag()
1235 self._wpool = _JobQueueWorkerPool(self)
1237 self._InspectQueue()
1239 self._wpool.TerminateWorkers()
1242 @locking.ssynchronized(_LOCK)
1244 def _InspectQueue(self):
1245 """Loads the whole job queue and resumes unfinished jobs.
1247 This function needs the lock here because WorkerPool.AddTask() may start a
1248 job while we're still doing our work.
1251 logging.info("Inspecting job queue")
1255 all_job_ids = self._GetJobIDsUnlocked()
1256 jobs_count = len(all_job_ids)
1257 lastinfo = time.time()
1258 for idx, job_id in enumerate(all_job_ids):
1259 # Give an update every 1000 jobs or 10 seconds
1260 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1261 idx == (jobs_count - 1)):
1262 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1263 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1264 lastinfo = time.time()
1266 job = self._LoadJobUnlocked(job_id)
1268 # a failure in loading the job can cause 'None' to be returned
1272 status = job.CalcStatus()
1274 if status == constants.JOB_STATUS_QUEUED:
1275 restartjobs.append(job)
1277 elif status in (constants.JOB_STATUS_RUNNING,
1278 constants.JOB_STATUS_WAITLOCK,
1279 constants.JOB_STATUS_CANCELING):
1280 logging.warning("Unfinished job %s found: %s", job.id, job)
1282 if status == constants.JOB_STATUS_WAITLOCK:
1284 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1285 restartjobs.append(job)
1287 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1288 "Unclean master daemon shutdown")
1290 self.UpdateJobUnlocked(job)
1293 logging.info("Restarting %s jobs", len(restartjobs))
1294 self._EnqueueJobs(restartjobs)
1296 logging.info("Job queue inspection finished")
1298 @locking.ssynchronized(_LOCK)
1300 def AddNode(self, node):
1301 """Register a new node with the queue.
1303 @type node: L{objects.Node}
1304 @param node: the node object to be added
1307 node_name = node.name
1308 assert node_name != self._my_hostname
1310 # Clean queue directory on added node
1311 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1312 msg = result.fail_msg
1314 logging.warning("Cannot cleanup queue directory on node %s: %s",
1317 if not node.master_candidate:
1318 # remove if existing, ignoring errors
1319 self._nodes.pop(node_name, None)
1320 # and skip the replication of the job ids
1323 # Upload the whole queue excluding archived jobs
1324 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1326 # Upload current serial file
1327 files.append(constants.JOB_QUEUE_SERIAL_FILE)
1329 for file_name in files:
1331 content = utils.ReadFile(file_name)
1333 result = rpc.RpcRunner.call_jobqueue_update([node_name],
1336 msg = result[node_name].fail_msg
1338 logging.error("Failed to upload file %s to node %s: %s",
1339 file_name, node_name, msg)
1341 self._nodes[node_name] = node.primary_ip
1343 @locking.ssynchronized(_LOCK)
1345 def RemoveNode(self, node_name):
1346 """Callback called when removing nodes from the cluster.
1348 @type node_name: str
1349 @param node_name: the name of the node to remove
1352 self._nodes.pop(node_name, None)
1355 def _CheckRpcResult(result, nodes, failmsg):
1356 """Verifies the status of an RPC call.
1358 Since we aim to keep consistency should this node (the current
1359 master) fail, we will log errors if our rpc fail, and especially
1360 log the case when more than half of the nodes fails.
1362 @param result: the data as returned from the rpc call
1364 @param nodes: the list of nodes we made the call to
1366 @param failmsg: the identifier to be used for logging
1373 msg = result[node].fail_msg
1376 logging.error("RPC call %s (%s) failed on node %s: %s",
1377 result[node].call, failmsg, node, msg)
1379 success.append(node)
1381 # +1 for the master node
1382 if (len(success) + 1) < len(failed):
1383 # TODO: Handle failing nodes
1384 logging.error("More than half of the nodes failed")
1386 def _GetNodeIp(self):
1387 """Helper for returning the node name/ip list.
1389 @rtype: (list, list)
1390 @return: a tuple of two lists, the first one with the node
1391 names and the second one with the node addresses
1394 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1395 name_list = self._nodes.keys()
1396 addr_list = [self._nodes[name] for name in name_list]
1397 return name_list, addr_list
1399 def _UpdateJobQueueFile(self, file_name, data, replicate):
1400 """Writes a file locally and then replicates it to all nodes.
1402 This function will replace the contents of a file on the local
1403 node and then replicate it to all the other nodes we have.
1405 @type file_name: str
1406 @param file_name: the path of the file to be replicated
1408 @param data: the new contents of the file
1409 @type replicate: boolean
1410 @param replicate: whether to spread the changes to the remote nodes
1413 getents = runtime.GetEnts()
1414 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1415 gid=getents.masterd_gid)
1418 names, addrs = self._GetNodeIp()
1419 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1420 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1422 def _RenameFilesUnlocked(self, rename):
1423 """Renames a file locally and then replicate the change.
1425 This function will rename a file in the local queue directory
1426 and then replicate this rename to all the other nodes we have.
1428 @type rename: list of (old, new)
1429 @param rename: List containing tuples mapping old to new names
1432 # Rename them locally
1433 for old, new in rename:
1434 utils.RenameFile(old, new, mkdir=True)
1436 # ... and on all nodes
1437 names, addrs = self._GetNodeIp()
1438 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1439 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1442 def _FormatJobID(job_id):
1443 """Convert a job ID to string format.
1445 Currently this just does C{str(job_id)} after performing some
1446 checks, but if we want to change the job id format this will
1447 abstract this change.
1449 @type job_id: int or long
1450 @param job_id: the numeric job id
1452 @return: the formatted job id
1455 if not isinstance(job_id, (int, long)):
1456 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1458 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1463 def _GetArchiveDirectory(cls, job_id):
1464 """Returns the archive directory for a job.
1467 @param job_id: Job identifier
1469 @return: Directory name
1472 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1474 def _NewSerialsUnlocked(self, count):
1475 """Generates a new job identifier.
1477 Job identifiers are unique during the lifetime of a cluster.
1479 @type count: integer
1480 @param count: how many serials to return
1482 @return: a string representing the job identifier.
1487 serial = self._last_serial + count
1490 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1491 "%s\n" % serial, True)
1493 result = [self._FormatJobID(v)
1494 for v in range(self._last_serial, serial + 1)]
1495 # Keep it only if we were able to write the file
1496 self._last_serial = serial
1501 def _GetJobPath(job_id):
1502 """Returns the job file for a given job id.
1505 @param job_id: the job identifier
1507 @return: the path to the job file
1510 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1513 def _GetArchivedJobPath(cls, job_id):
1514 """Returns the archived job file for a give job id.
1517 @param job_id: the job identifier
1519 @return: the path to the archived job file
1522 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1523 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1525 def _GetJobIDsUnlocked(self, sort=True):
1526 """Return all known job IDs.
1528 The method only looks at disk because it's a requirement that all
1529 jobs are present on disk (so in the _memcache we don't have any
1533 @param sort: perform sorting on the returned job ids
1535 @return: the list of job IDs
1539 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1540 m = self._RE_JOB_FILE.match(filename)
1542 jlist.append(m.group(1))
1544 jlist = utils.NiceSort(jlist)
1547 def _LoadJobUnlocked(self, job_id):
1548 """Loads a job from the disk or memory.
1550 Given a job id, this will return the cached job object if
1551 existing, or try to load the job from the disk. If loading from
1552 disk, it will also add the job to the cache.
1554 @param job_id: the job id
1555 @rtype: L{_QueuedJob} or None
1556 @return: either None or the job object
1559 job = self._memcache.get(job_id, None)
1561 logging.debug("Found job %s in memcache", job_id)
1565 job = self._LoadJobFromDisk(job_id)
1568 except errors.JobFileCorrupted:
1569 old_path = self._GetJobPath(job_id)
1570 new_path = self._GetArchivedJobPath(job_id)
1571 if old_path == new_path:
1572 # job already archived (future case)
1573 logging.exception("Can't parse job %s", job_id)
1576 logging.exception("Can't parse job %s, will archive.", job_id)
1577 self._RenameFilesUnlocked([(old_path, new_path)])
1580 self._memcache[job_id] = job
1581 logging.debug("Added job %s to the cache", job_id)
1584 def _LoadJobFromDisk(self, job_id):
1585 """Load the given job file from disk.
1587 Given a job file, read, load and restore it in a _QueuedJob format.
1589 @type job_id: string
1590 @param job_id: job identifier
1591 @rtype: L{_QueuedJob} or None
1592 @return: either None or the job object
1595 filepath = self._GetJobPath(job_id)
1596 logging.debug("Loading job from %s", filepath)
1598 raw_data = utils.ReadFile(filepath)
1599 except EnvironmentError, err:
1600 if err.errno in (errno.ENOENT, ):
1605 data = serializer.LoadJson(raw_data)
1606 job = _QueuedJob.Restore(self, data)
1607 except Exception, err: # pylint: disable-msg=W0703
1608 raise errors.JobFileCorrupted(err)
1612 def SafeLoadJobFromDisk(self, job_id):
1613 """Load the given job file from disk.
1615 Given a job file, read, load and restore it in a _QueuedJob format.
1616 In case of error reading the job, it gets returned as None, and the
1617 exception is logged.
1619 @type job_id: string
1620 @param job_id: job identifier
1621 @rtype: L{_QueuedJob} or None
1622 @return: either None or the job object
1626 return self._LoadJobFromDisk(job_id)
1627 except (errors.JobFileCorrupted, EnvironmentError):
1628 logging.exception("Can't load/parse job %s", job_id)
1631 def _UpdateQueueSizeUnlocked(self):
1632 """Update the queue size.
1635 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1637 @locking.ssynchronized(_LOCK)
1639 def SetDrainFlag(self, drain_flag):
1640 """Sets the drain flag for the queue.
1642 @type drain_flag: boolean
1643 @param drain_flag: Whether to set or unset the drain flag
1646 jstore.SetDrainFlag(drain_flag)
1648 self._drained = drain_flag
1653 def _SubmitJobUnlocked(self, job_id, ops):
1654 """Create and store a new job.
1656 This enters the job into our job queue and also puts it on the new
1657 queue, in order for it to be picked up by the queue processors.
1659 @type job_id: job ID
1660 @param job_id: the job ID for the new job
1662 @param ops: The list of OpCodes that will become the new job.
1663 @rtype: L{_QueuedJob}
1664 @return: the job object to be queued
1665 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1666 @raise errors.JobQueueFull: if the job queue has too many jobs in it
1667 @raise errors.GenericError: If an opcode is not valid
1670 # Ok when sharing the big job queue lock, as the drain file is created when
1671 # the lock is exclusive.
1673 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1675 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1676 raise errors.JobQueueFull()
1678 job = _QueuedJob(self, job_id, ops)
1681 for idx, op in enumerate(job.ops):
1682 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1683 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1684 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1685 " are %s" % (idx, op.priority, allowed))
1688 self.UpdateJobUnlocked(job)
1690 self._queue_size += 1
1692 logging.debug("Adding new job %s to the cache", job_id)
1693 self._memcache[job_id] = job
1697 @locking.ssynchronized(_LOCK)
1699 def SubmitJob(self, ops):
1700 """Create and store a new job.
1702 @see: L{_SubmitJobUnlocked}
1705 job_id = self._NewSerialsUnlocked(1)[0]
1706 self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1709 @locking.ssynchronized(_LOCK)
1711 def SubmitManyJobs(self, jobs):
1712 """Create and store multiple jobs.
1714 @see: L{_SubmitJobUnlocked}
1719 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1720 for job_id, ops in zip(all_job_ids, jobs):
1722 added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1725 except errors.GenericError, err:
1728 results.append((status, data))
1730 self._EnqueueJobs(added_jobs)
1734 def _EnqueueJobs(self, jobs):
1735 """Helper function to add jobs to worker pool's queue.
1738 @param jobs: List of all jobs
1741 self._wpool.AddManyTasks([(job, ) for job in jobs],
1742 priority=[job.CalcPriority() for job in jobs])
1745 def UpdateJobUnlocked(self, job, replicate=True):
1746 """Update a job's on disk storage.
1748 After a job has been modified, this function needs to be called in
1749 order to write the changes to disk and replicate them to the other
1752 @type job: L{_QueuedJob}
1753 @param job: the changed job
1754 @type replicate: boolean
1755 @param replicate: whether to replicate the change to remote nodes
1758 filename = self._GetJobPath(job.id)
1759 data = serializer.DumpJson(job.Serialize(), indent=False)
1760 logging.debug("Writing job %s to %s", job.id, filename)
1761 self._UpdateJobQueueFile(filename, data, replicate)
1763 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1765 """Waits for changes in a job.
1767 @type job_id: string
1768 @param job_id: Job identifier
1769 @type fields: list of strings
1770 @param fields: Which fields to check for changes
1771 @type prev_job_info: list or None
1772 @param prev_job_info: Last job information returned
1773 @type prev_log_serial: int
1774 @param prev_log_serial: Last job message serial number
1775 @type timeout: float
1776 @param timeout: maximum time to wait in seconds
1777 @rtype: tuple (job info, log entries)
1778 @return: a tuple of the job information as required via
1779 the fields parameter, and the log entries as a list
1781 if the job has not changed and the timeout has expired,
1782 we instead return a special value,
1783 L{constants.JOB_NOTCHANGED}, which should be interpreted
1784 as such by the clients
1787 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1789 helper = _WaitForJobChangesHelper()
1791 return helper(self._GetJobPath(job_id), load_fn,
1792 fields, prev_job_info, prev_log_serial, timeout)
1794 @locking.ssynchronized(_LOCK)
1796 def CancelJob(self, job_id):
1799 This will only succeed if the job has not started yet.
1801 @type job_id: string
1802 @param job_id: job ID of job to be cancelled.
1805 logging.info("Cancelling job %s", job_id)
1807 job = self._LoadJobUnlocked(job_id)
1809 logging.debug("Job %s not found", job_id)
1810 return (False, "Job %s not found" % job_id)
1812 (success, msg) = job.Cancel()
1815 self.UpdateJobUnlocked(job)
1817 return (success, msg)
1820 def _ArchiveJobsUnlocked(self, jobs):
1823 @type jobs: list of L{_QueuedJob}
1824 @param jobs: Job objects
1826 @return: Number of archived jobs
1832 if job.CalcStatus() not in constants.JOBS_FINALIZED:
1833 logging.debug("Job %s is not yet done", job.id)
1836 archive_jobs.append(job)
1838 old = self._GetJobPath(job.id)
1839 new = self._GetArchivedJobPath(job.id)
1840 rename_files.append((old, new))
1842 # TODO: What if 1..n files fail to rename?
1843 self._RenameFilesUnlocked(rename_files)
1845 logging.debug("Successfully archived job(s) %s",
1846 utils.CommaJoin(job.id for job in archive_jobs))
1848 # Since we haven't quite checked, above, if we succeeded or failed renaming
1849 # the files, we update the cached queue size from the filesystem. When we
1850 # get around to fix the TODO: above, we can use the number of actually
1851 # archived jobs to fix this.
1852 self._UpdateQueueSizeUnlocked()
1853 return len(archive_jobs)
1855 @locking.ssynchronized(_LOCK)
1857 def ArchiveJob(self, job_id):
1860 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1862 @type job_id: string
1863 @param job_id: Job ID of job to be archived.
1865 @return: Whether job was archived
1868 logging.info("Archiving job %s", job_id)
1870 job = self._LoadJobUnlocked(job_id)
1872 logging.debug("Job %s not found", job_id)
1875 return self._ArchiveJobsUnlocked([job]) == 1
1877 @locking.ssynchronized(_LOCK)
1879 def AutoArchiveJobs(self, age, timeout):
1880 """Archives all jobs based on age.
1882 The method will archive all jobs which are older than the age
1883 parameter. For jobs that don't have an end timestamp, the start
1884 timestamp will be considered. The special '-1' age will cause
1885 archival of all jobs (that are not running or queued).
1888 @param age: the minimum age in seconds
1891 logging.info("Archiving jobs with age more than %s seconds", age)
1894 end_time = now + timeout
1898 all_job_ids = self._GetJobIDsUnlocked()
1900 for idx, job_id in enumerate(all_job_ids):
1901 last_touched = idx + 1
1903 # Not optimal because jobs could be pending
1904 # TODO: Measure average duration for job archival and take number of
1905 # pending jobs into account.
1906 if time.time() > end_time:
1909 # Returns None if the job failed to load
1910 job = self._LoadJobUnlocked(job_id)
1912 if job.end_timestamp is None:
1913 if job.start_timestamp is None:
1914 job_age = job.received_timestamp
1916 job_age = job.start_timestamp
1918 job_age = job.end_timestamp
1920 if age == -1 or now - job_age[0] > age:
1923 # Archive 10 jobs at a time
1924 if len(pending) >= 10:
1925 archived_count += self._ArchiveJobsUnlocked(pending)
1929 archived_count += self._ArchiveJobsUnlocked(pending)
1931 return (archived_count, len(all_job_ids) - last_touched)
1933 def QueryJobs(self, job_ids, fields):
1934 """Returns a list of jobs in queue.
1937 @param job_ids: sequence of job identifiers or None for all
1939 @param fields: names of fields to return
1941 @return: list one element per job, each element being list with
1942 the requested fields
1948 # Since files are added to/removed from the queue atomically, there's no
1949 # risk of getting the job ids in an inconsistent state.
1950 job_ids = self._GetJobIDsUnlocked()
1953 for job_id in job_ids:
1954 job = self.SafeLoadJobFromDisk(job_id)
1956 jobs.append(job.GetInfo(fields))
1962 @locking.ssynchronized(_LOCK)
1965 """Stops the job queue.
1967 This shutdowns all the worker threads an closes the queue.
1970 self._wpool.TerminateWorkers()
1972 self._queue_filelock.Close()
1973 self._queue_filelock = None