4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 # pylint: disable-msg=E0611
41 from pyinotify import pyinotify
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
62 JOBS_PER_ARCHIVE_DIRECTORY = 10000
64 # member lock names to be passed to @ssynchronized decorator
69 class CancelJob(Exception):
70 """Special exception to cancel a job.
76 """Returns the current timestamp.
79 @return: the current time in the (seconds, microseconds) format
82 return utils.SplitTime(time.time())
85 class _QueuedOpCode(object):
86 """Encapsulates an opcode object.
88 @ivar log: holds the execution log and consists of tuples
89 of the form C{(log_serial, timestamp, level, message)}
90 @ivar input: the OpCode we encapsulate
91 @ivar status: the current status
92 @ivar result: the result of the LU execution
93 @ivar start_timestamp: timestamp for the start of the execution
94 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95 @ivar stop_timestamp: timestamp for the end of the execution
98 __slots__ = ["input", "status", "result", "log", "priority",
99 "start_timestamp", "exec_timestamp", "end_timestamp",
102 def __init__(self, op):
103 """Constructor for the _QuededOpCode.
105 @type op: L{opcodes.OpCode}
106 @param op: the opcode we encapsulate
110 self.status = constants.OP_STATUS_QUEUED
113 self.start_timestamp = None
114 self.exec_timestamp = None
115 self.end_timestamp = None
117 # Get initial priority (it might change during the lifetime of this opcode)
118 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
121 def Restore(cls, state):
122 """Restore the _QueuedOpCode from the serialized form.
125 @param state: the serialized state
126 @rtype: _QueuedOpCode
127 @return: a new _QueuedOpCode instance
130 obj = _QueuedOpCode.__new__(cls)
131 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132 obj.status = state["status"]
133 obj.result = state["result"]
134 obj.log = state["log"]
135 obj.start_timestamp = state.get("start_timestamp", None)
136 obj.exec_timestamp = state.get("exec_timestamp", None)
137 obj.end_timestamp = state.get("end_timestamp", None)
138 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
142 """Serializes this _QueuedOpCode.
145 @return: the dictionary holding the serialized state
149 "input": self.input.__getstate__(),
150 "status": self.status,
151 "result": self.result,
153 "start_timestamp": self.start_timestamp,
154 "exec_timestamp": self.exec_timestamp,
155 "end_timestamp": self.end_timestamp,
156 "priority": self.priority,
160 class _QueuedJob(object):
161 """In-memory job representation.
163 This is what we use to track the user-submitted jobs. Locking must
164 be taken care of by users of this class.
166 @type queue: L{JobQueue}
167 @ivar queue: the parent queue
170 @ivar ops: the list of _QueuedOpCode that constitute the job
171 @type log_serial: int
172 @ivar log_serial: holds the index for the next log entry
173 @ivar received_timestamp: the timestamp for when the job was received
174 @ivar start_timestmap: the timestamp for start of execution
175 @ivar end_timestamp: the timestamp for end of execution
178 # pylint: disable-msg=W0212
179 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
180 "received_timestamp", "start_timestamp", "end_timestamp",
183 def __init__(self, queue, job_id, ops):
184 """Constructor for the _QueuedJob.
186 @type queue: L{JobQueue}
187 @param queue: our parent queue
189 @param job_id: our job id
191 @param ops: the list of opcodes we hold, which will be encapsulated
196 raise errors.GenericError("A job needs at least one opcode")
200 self.ops = [_QueuedOpCode(op) for op in ops]
202 self.received_timestamp = TimeStampNow()
203 self.start_timestamp = None
204 self.end_timestamp = None
206 self._InitInMemory(self)
209 def _InitInMemory(obj):
210 """Initializes in-memory variables.
217 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
219 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
221 return "<%s at %#x>" % (" ".join(status), id(self))
224 def Restore(cls, queue, state):
225 """Restore a _QueuedJob from serialized state:
227 @type queue: L{JobQueue}
228 @param queue: to which queue the restored job belongs
230 @param state: the serialized state
232 @return: the restored _JobQueue instance
235 obj = _QueuedJob.__new__(cls)
238 obj.received_timestamp = state.get("received_timestamp", None)
239 obj.start_timestamp = state.get("start_timestamp", None)
240 obj.end_timestamp = state.get("end_timestamp", None)
244 for op_state in state["ops"]:
245 op = _QueuedOpCode.Restore(op_state)
246 for log_entry in op.log:
247 obj.log_serial = max(obj.log_serial, log_entry[0])
250 cls._InitInMemory(obj)
255 """Serialize the _JobQueue instance.
258 @return: the serialized state
263 "ops": [op.Serialize() for op in self.ops],
264 "start_timestamp": self.start_timestamp,
265 "end_timestamp": self.end_timestamp,
266 "received_timestamp": self.received_timestamp,
269 def CalcStatus(self):
270 """Compute the status of this job.
272 This function iterates over all the _QueuedOpCodes in the job and
273 based on their status, computes the job status.
276 - if we find a cancelled, or finished with error, the job
277 status will be the same
278 - otherwise, the last opcode with the status one of:
283 will determine the job status
285 - otherwise, it means either all opcodes are queued, or success,
286 and the job status will be the same
288 @return: the job status
291 status = constants.JOB_STATUS_QUEUED
295 if op.status == constants.OP_STATUS_SUCCESS:
300 if op.status == constants.OP_STATUS_QUEUED:
302 elif op.status == constants.OP_STATUS_WAITLOCK:
303 status = constants.JOB_STATUS_WAITLOCK
304 elif op.status == constants.OP_STATUS_RUNNING:
305 status = constants.JOB_STATUS_RUNNING
306 elif op.status == constants.OP_STATUS_CANCELING:
307 status = constants.JOB_STATUS_CANCELING
309 elif op.status == constants.OP_STATUS_ERROR:
310 status = constants.JOB_STATUS_ERROR
311 # The whole job fails if one opcode failed
313 elif op.status == constants.OP_STATUS_CANCELED:
314 status = constants.OP_STATUS_CANCELED
318 status = constants.JOB_STATUS_SUCCESS
322 def CalcPriority(self):
323 """Gets the current priority for this job.
325 Only unfinished opcodes are considered. When all are done, the default
331 priorities = [op.priority for op in self.ops
332 if op.status not in constants.OPS_FINALIZED]
335 # All opcodes are done, assume default priority
336 return constants.OP_PRIO_DEFAULT
338 return min(priorities)
340 def GetLogEntries(self, newer_than):
341 """Selectively returns the log entries.
343 @type newer_than: None or int
344 @param newer_than: if this is None, return all log entries,
345 otherwise return only the log entries with serial higher
348 @return: the list of the log entries selected
351 if newer_than is None:
358 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
362 def GetInfo(self, fields):
363 """Returns information about a job.
366 @param fields: names of fields to return
368 @return: list with one element for each field
369 @raise errors.OpExecError: when an invalid field
377 elif fname == "status":
378 row.append(self.CalcStatus())
379 elif fname == "priority":
380 row.append(self.CalcPriority())
382 row.append([op.input.__getstate__() for op in self.ops])
383 elif fname == "opresult":
384 row.append([op.result for op in self.ops])
385 elif fname == "opstatus":
386 row.append([op.status for op in self.ops])
387 elif fname == "oplog":
388 row.append([op.log for op in self.ops])
389 elif fname == "opstart":
390 row.append([op.start_timestamp for op in self.ops])
391 elif fname == "opexec":
392 row.append([op.exec_timestamp for op in self.ops])
393 elif fname == "opend":
394 row.append([op.end_timestamp for op in self.ops])
395 elif fname == "oppriority":
396 row.append([op.priority for op in self.ops])
397 elif fname == "received_ts":
398 row.append(self.received_timestamp)
399 elif fname == "start_ts":
400 row.append(self.start_timestamp)
401 elif fname == "end_ts":
402 row.append(self.end_timestamp)
403 elif fname == "summary":
404 row.append([op.input.Summary() for op in self.ops])
406 raise errors.OpExecError("Invalid self query field '%s'" % fname)
409 def MarkUnfinishedOps(self, status, result):
410 """Mark unfinished opcodes with a given status and result.
412 This is an utility function for marking all running or waiting to
413 be run opcodes with a given status. Opcodes which are already
414 finalised are not changed.
416 @param status: a given opcode status
417 @param result: the opcode result
422 if op.status in constants.OPS_FINALIZED:
423 assert not_marked, "Finalized opcodes found after non-finalized ones"
430 """Marks job as canceled/-ing if possible.
432 @rtype: tuple; (bool, string)
433 @return: Boolean describing whether job was successfully canceled or marked
434 as canceling and a text message
437 status = self.CalcStatus()
439 if status == constants.JOB_STATUS_QUEUED:
440 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
441 "Job canceled by request")
442 return (True, "Job %s canceled" % self.id)
444 elif status == constants.JOB_STATUS_WAITLOCK:
445 # The worker will notice the new status and cancel the job
446 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
447 return (True, "Job %s will be canceled" % self.id)
450 logging.debug("Job %s is no longer waiting in the queue", self.id)
451 return (False, "Job %s is no longer waiting in the queue" % self.id)
454 class _OpExecCallbacks(mcpu.OpExecCbBase):
455 def __init__(self, queue, job, op):
456 """Initializes this class.
458 @type queue: L{JobQueue}
459 @param queue: Job queue
460 @type job: L{_QueuedJob}
461 @param job: Job object
462 @type op: L{_QueuedOpCode}
466 assert queue, "Queue is missing"
467 assert job, "Job is missing"
468 assert op, "Opcode is missing"
474 def _CheckCancel(self):
475 """Raises an exception to cancel the job if asked to.
478 # Cancel here if we were asked to
479 if self._op.status == constants.OP_STATUS_CANCELING:
480 logging.debug("Canceling opcode")
483 @locking.ssynchronized(_QUEUE, shared=1)
484 def NotifyStart(self):
485 """Mark the opcode as running, not lock-waiting.
487 This is called from the mcpu code as a notifier function, when the LU is
488 finally about to start the Exec() method. Of course, to have end-user
489 visible results, the opcode must be initially (before calling into
490 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
493 assert self._op in self._job.ops
494 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
495 constants.OP_STATUS_CANCELING)
497 # Cancel here if we were asked to
500 logging.debug("Opcode is now running")
502 self._op.status = constants.OP_STATUS_RUNNING
503 self._op.exec_timestamp = TimeStampNow()
505 # And finally replicate the job status
506 self._queue.UpdateJobUnlocked(self._job)
508 @locking.ssynchronized(_QUEUE, shared=1)
509 def _AppendFeedback(self, timestamp, log_type, log_msg):
510 """Internal feedback append function, with locks
513 self._job.log_serial += 1
514 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
515 self._queue.UpdateJobUnlocked(self._job, replicate=False)
517 def Feedback(self, *args):
518 """Append a log entry.
524 log_type = constants.ELOG_MESSAGE
527 (log_type, log_msg) = args
529 # The time is split to make serialization easier and not lose
531 timestamp = utils.SplitTime(time.time())
532 self._AppendFeedback(timestamp, log_type, log_msg)
534 def CheckCancel(self):
535 """Check whether job has been cancelled.
538 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
539 constants.OP_STATUS_CANCELING)
541 # Cancel here if we were asked to
545 class _JobChangesChecker(object):
546 def __init__(self, fields, prev_job_info, prev_log_serial):
547 """Initializes this class.
549 @type fields: list of strings
550 @param fields: Fields requested by LUXI client
551 @type prev_job_info: string
552 @param prev_job_info: previous job info, as passed by the LUXI client
553 @type prev_log_serial: string
554 @param prev_log_serial: previous job serial, as passed by the LUXI client
557 self._fields = fields
558 self._prev_job_info = prev_job_info
559 self._prev_log_serial = prev_log_serial
561 def __call__(self, job):
562 """Checks whether job has changed.
564 @type job: L{_QueuedJob}
565 @param job: Job object
568 status = job.CalcStatus()
569 job_info = job.GetInfo(self._fields)
570 log_entries = job.GetLogEntries(self._prev_log_serial)
572 # Serializing and deserializing data can cause type changes (e.g. from
573 # tuple to list) or precision loss. We're doing it here so that we get
574 # the same modifications as the data received from the client. Without
575 # this, the comparison afterwards might fail without the data being
576 # significantly different.
577 # TODO: we just deserialized from disk, investigate how to make sure that
578 # the job info and log entries are compatible to avoid this further step.
579 # TODO: Doing something like in testutils.py:UnifyValueType might be more
580 # efficient, though floats will be tricky
581 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
582 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
584 # Don't even try to wait if the job is no longer running, there will be
586 if (status not in (constants.JOB_STATUS_QUEUED,
587 constants.JOB_STATUS_RUNNING,
588 constants.JOB_STATUS_WAITLOCK) or
589 job_info != self._prev_job_info or
590 (log_entries and self._prev_log_serial != log_entries[0][0])):
591 logging.debug("Job %s changed", job.id)
592 return (job_info, log_entries)
597 class _JobFileChangesWaiter(object):
598 def __init__(self, filename):
599 """Initializes this class.
601 @type filename: string
602 @param filename: Path to job file
603 @raises errors.InotifyError: if the notifier cannot be setup
606 self._wm = pyinotify.WatchManager()
607 self._inotify_handler = \
608 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
610 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
612 self._inotify_handler.enable()
614 # pyinotify doesn't close file descriptors automatically
615 self._notifier.stop()
618 def _OnInotify(self, notifier_enabled):
619 """Callback for inotify.
622 if not notifier_enabled:
623 self._inotify_handler.enable()
625 def Wait(self, timeout):
626 """Waits for the job file to change.
629 @param timeout: Timeout in seconds
630 @return: Whether there have been events
634 have_events = self._notifier.check_events(timeout * 1000)
636 self._notifier.read_events()
637 self._notifier.process_events()
641 """Closes underlying notifier and its file descriptor.
644 self._notifier.stop()
647 class _JobChangesWaiter(object):
648 def __init__(self, filename):
649 """Initializes this class.
651 @type filename: string
652 @param filename: Path to job file
655 self._filewaiter = None
656 self._filename = filename
658 def Wait(self, timeout):
659 """Waits for a job to change.
662 @param timeout: Timeout in seconds
663 @return: Whether there have been events
667 return self._filewaiter.Wait(timeout)
669 # Lazy setup: Avoid inotify setup cost when job file has already changed.
670 # If this point is reached, return immediately and let caller check the job
671 # file again in case there were changes since the last check. This avoids a
673 self._filewaiter = _JobFileChangesWaiter(self._filename)
678 """Closes underlying waiter.
682 self._filewaiter.Close()
685 class _WaitForJobChangesHelper(object):
686 """Helper class using inotify to wait for changes in a job file.
688 This class takes a previous job status and serial, and alerts the client when
689 the current job status has changed.
693 def _CheckForChanges(job_load_fn, check_fn):
696 raise errors.JobLost()
698 result = check_fn(job)
700 raise utils.RetryAgain()
704 def __call__(self, filename, job_load_fn,
705 fields, prev_job_info, prev_log_serial, timeout):
706 """Waits for changes on a job.
708 @type filename: string
709 @param filename: File on which to wait for changes
710 @type job_load_fn: callable
711 @param job_load_fn: Function to load job
712 @type fields: list of strings
713 @param fields: Which fields to check for changes
714 @type prev_job_info: list or None
715 @param prev_job_info: Last job information returned
716 @type prev_log_serial: int
717 @param prev_log_serial: Last job message serial number
719 @param timeout: maximum time to wait in seconds
723 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
724 waiter = _JobChangesWaiter(filename)
726 return utils.Retry(compat.partial(self._CheckForChanges,
727 job_load_fn, check_fn),
728 utils.RETRY_REMAINING_TIME, timeout,
732 except (errors.InotifyError, errors.JobLost):
734 except utils.RetryTimeout:
735 return constants.JOB_NOTCHANGED
738 def _EncodeOpError(err):
739 """Encodes an error which occurred while processing an opcode.
742 if isinstance(err, errors.GenericError):
745 to_encode = errors.OpExecError(str(err))
747 return errors.EncodeException(to_encode)
750 class _TimeoutStrategyWrapper:
751 def __init__(self, fn):
752 """Initializes this class.
759 """Gets the next timeout if necessary.
762 if self._next is None:
763 self._next = self._fn()
766 """Returns the next timeout.
773 """Returns the current timeout and advances the internal state.
782 class _OpExecContext:
783 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
784 """Initializes this class.
789 self.log_prefix = log_prefix
790 self.summary = op.input.Summary()
792 self._timeout_strategy_factory = timeout_strategy_factory
793 self._ResetTimeoutStrategy()
795 def _ResetTimeoutStrategy(self):
796 """Creates a new timeout strategy.
799 self._timeout_strategy = \
800 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
802 def CheckPriorityIncrease(self):
803 """Checks whether priority can and should be increased.
805 Called when locks couldn't be acquired.
810 # Exhausted all retries and next round should not use blocking acquire
812 if (self._timeout_strategy.Peek() is None and
813 op.priority > constants.OP_PRIO_HIGHEST):
814 logging.debug("Increasing priority")
816 self._ResetTimeoutStrategy()
821 def GetNextLockTimeout(self):
822 """Returns the next lock acquire timeout.
825 return self._timeout_strategy.Next()
828 class _JobProcessor(object):
829 def __init__(self, queue, opexec_fn, job,
830 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
831 """Initializes this class.
835 self.opexec_fn = opexec_fn
837 self._timeout_strategy_factory = _timeout_strategy_factory
840 def _FindNextOpcode(job, timeout_strategy_factory):
841 """Locates the next opcode to run.
843 @type job: L{_QueuedJob}
844 @param job: Job object
845 @param timeout_strategy_factory: Callable to create new timeout strategy
848 # Create some sort of a cache to speed up locating next opcode for future
850 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
851 # pending and one for processed ops.
852 if job.ops_iter is None:
853 job.ops_iter = enumerate(job.ops)
855 # Find next opcode to run
858 (idx, op) = job.ops_iter.next()
859 except StopIteration:
860 raise errors.ProgrammerError("Called for a finished job")
862 if op.status == constants.OP_STATUS_RUNNING:
863 # Found an opcode already marked as running
864 raise errors.ProgrammerError("Called for job marked as running")
866 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
867 timeout_strategy_factory)
869 if op.status == constants.OP_STATUS_CANCELED:
870 # Cancelled jobs are handled by the caller
871 assert not compat.any(i.status != constants.OP_STATUS_CANCELED
872 for i in job.ops[idx:])
874 elif op.status in constants.OPS_FINALIZED:
875 # This is a job that was partially completed before master daemon
876 # shutdown, so it can be expected that some opcodes are already
877 # completed successfully (if any did error out, then the whole job
878 # should have been aborted and not resubmitted for processing).
879 logging.info("%s: opcode %s already processed, skipping",
880 opctx.log_prefix, opctx.summary)
886 def _MarkWaitlock(job, op):
887 """Marks an opcode as waiting for locks.
889 The job's start timestamp is also set if necessary.
891 @type job: L{_QueuedJob}
892 @param job: Job object
893 @type op: L{_QueuedOpCode}
894 @param op: Opcode object
899 op.status = constants.OP_STATUS_WAITLOCK
901 op.start_timestamp = TimeStampNow()
903 if job.start_timestamp is None:
904 job.start_timestamp = op.start_timestamp
906 def _ExecOpCodeUnlocked(self, opctx):
907 """Processes one opcode and returns the result.
912 assert op.status == constants.OP_STATUS_WAITLOCK
914 timeout = opctx.GetNextLockTimeout()
917 # Make sure not to hold queue lock while calling ExecOpCode
918 result = self.opexec_fn(op.input,
919 _OpExecCallbacks(self.queue, self.job, op),
920 timeout=timeout, priority=op.priority)
921 except mcpu.LockAcquireTimeout:
922 assert timeout is not None, "Received timeout for blocking acquire"
923 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
925 assert op.status in (constants.OP_STATUS_WAITLOCK,
926 constants.OP_STATUS_CANCELING)
928 # Was job cancelled while we were waiting for the lock?
929 if op.status == constants.OP_STATUS_CANCELING:
930 return (constants.OP_STATUS_CANCELING, None)
932 return (constants.OP_STATUS_QUEUED, None)
934 logging.exception("%s: Canceling job", opctx.log_prefix)
935 assert op.status == constants.OP_STATUS_CANCELING
936 return (constants.OP_STATUS_CANCELING, None)
937 except Exception, err: # pylint: disable-msg=W0703
938 logging.exception("%s: Caught exception in %s",
939 opctx.log_prefix, opctx.summary)
940 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
942 logging.debug("%s: %s successful",
943 opctx.log_prefix, opctx.summary)
944 return (constants.OP_STATUS_SUCCESS, result)
946 def __call__(self, _nextop_fn=None):
947 """Continues execution of a job.
949 @param _nextop_fn: Callback function for tests
951 @return: True if job is finished, False if processor needs to be called
958 logging.debug("Processing job %s", job.id)
960 queue.acquire(shared=1)
962 opcount = len(job.ops)
964 # Is a previous opcode still pending?
966 opctx = job.cur_opctx
968 if __debug__ and _nextop_fn:
970 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
975 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
976 constants.OP_STATUS_CANCELED)
977 for i in job.ops[opctx.index:])
979 assert op.status in (constants.OP_STATUS_QUEUED,
980 constants.OP_STATUS_WAITLOCK,
981 constants.OP_STATUS_CANCELED)
983 assert (op.priority <= constants.OP_PRIO_LOWEST and
984 op.priority >= constants.OP_PRIO_HIGHEST)
986 if op.status != constants.OP_STATUS_CANCELED:
987 # Prepare to start opcode
988 self._MarkWaitlock(job, op)
990 assert op.status == constants.OP_STATUS_WAITLOCK
991 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
994 queue.UpdateJobUnlocked(job)
996 logging.info("%s: opcode %s waiting for locks",
997 opctx.log_prefix, opctx.summary)
1001 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1003 queue.acquire(shared=1)
1005 op.status = op_status
1006 op.result = op_result
1008 if op.status == constants.OP_STATUS_QUEUED:
1009 # Couldn't get locks in time
1010 assert not op.end_timestamp
1013 op.end_timestamp = TimeStampNow()
1015 if op.status == constants.OP_STATUS_CANCELING:
1016 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1017 for i in job.ops[opctx.index:])
1019 assert op.status in constants.OPS_FINALIZED
1021 if op.status == constants.OP_STATUS_QUEUED:
1024 opctx.CheckPriorityIncrease()
1026 # Keep around for another round
1027 job.cur_opctx = opctx
1029 assert (op.priority <= constants.OP_PRIO_LOWEST and
1030 op.priority >= constants.OP_PRIO_HIGHEST)
1032 # In no case must the status be finalized here
1033 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1035 queue.UpdateJobUnlocked(job)
1038 # Ensure all opcodes so far have been successful
1039 assert (opctx.index == 0 or
1040 compat.all(i.status == constants.OP_STATUS_SUCCESS
1041 for i in job.ops[:opctx.index]))
1044 job.cur_opctx = None
1046 if op.status == constants.OP_STATUS_SUCCESS:
1049 elif op.status == constants.OP_STATUS_ERROR:
1050 # Ensure failed opcode has an exception as its result
1051 assert errors.GetEncodedError(job.ops[opctx.index].result)
1053 to_encode = errors.OpExecError("Preceding opcode failed")
1054 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1055 _EncodeOpError(to_encode))
1059 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1060 errors.GetEncodedError(i.result)
1061 for i in job.ops[opctx.index:])
1063 elif op.status == constants.OP_STATUS_CANCELING:
1064 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1065 "Job canceled by request")
1068 elif op.status == constants.OP_STATUS_CANCELED:
1072 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1074 # Finalizing or last opcode?
1075 if finalize or opctx.index == (opcount - 1):
1076 # All opcodes have been run, finalize job
1077 job.end_timestamp = TimeStampNow()
1079 # Write to disk. If the job status is final, this is the final write
1080 # allowed. Once the file has been written, it can be archived anytime.
1081 queue.UpdateJobUnlocked(job)
1083 if finalize or opctx.index == (opcount - 1):
1084 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1092 class _JobQueueWorker(workerpool.BaseWorker):
1093 """The actual job workers.
1096 def RunTask(self, job): # pylint: disable-msg=W0221
1099 This functions processes a job. It is closely tied to the L{_QueuedJob} and
1100 L{_QueuedOpCode} classes.
1102 @type job: L{_QueuedJob}
1103 @param job: the job to be processed
1107 assert queue == self.pool.queue
1109 self.SetTaskName("Job%s" % job.id)
1111 proc = mcpu.Processor(queue.context, job.id)
1113 if not _JobProcessor(queue, proc.ExecOpCode, job)():
1115 raise workerpool.DeferTask(priority=job.CalcPriority())
1118 class _JobQueueWorkerPool(workerpool.WorkerPool):
1119 """Simple class implementing a job-processing workerpool.
1122 def __init__(self, queue):
1123 super(_JobQueueWorkerPool, self).__init__("JobQueue",
1129 def _RequireOpenQueue(fn):
1130 """Decorator for "public" functions.
1132 This function should be used for all 'public' functions. That is,
1133 functions usually called from other classes. Note that this should
1134 be applied only to methods (not plain functions), since it expects
1135 that the decorated function is called with a first argument that has
1136 a '_queue_filelock' argument.
1138 @warning: Use this decorator only after locking.ssynchronized
1141 @locking.ssynchronized(_LOCK)
1147 def wrapper(self, *args, **kwargs):
1148 # pylint: disable-msg=W0212
1149 assert self._queue_filelock is not None, "Queue should be open"
1150 return fn(self, *args, **kwargs)
1154 class JobQueue(object):
1155 """Queue used to manage the jobs.
1157 @cvar _RE_JOB_FILE: regex matching the valid job file names
1160 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1162 def __init__(self, context):
1163 """Constructor for JobQueue.
1165 The constructor will initialize the job queue object and then
1166 start loading the current jobs from disk, either for starting them
1167 (if they were queue) or for aborting them (if they were already
1170 @type context: GanetiContext
1171 @param context: the context object for access to the configuration
1172 data and other ganeti objects
1175 self.context = context
1176 self._memcache = weakref.WeakValueDictionary()
1177 self._my_hostname = netutils.Hostname.GetSysName()
1179 # The Big JobQueue lock. If a code block or method acquires it in shared
1180 # mode safe it must guarantee concurrency with all the code acquiring it in
1181 # shared mode, including itself. In order not to acquire it at all
1182 # concurrency must be guaranteed with all code acquiring it in shared mode
1183 # and all code acquiring it exclusively.
1184 self._lock = locking.SharedLock("JobQueue")
1186 self.acquire = self._lock.acquire
1187 self.release = self._lock.release
1189 # Initialize the queue, and acquire the filelock.
1190 # This ensures no other process is working on the job queue.
1191 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1194 self._last_serial = jstore.ReadSerial()
1195 assert self._last_serial is not None, ("Serial file was modified between"
1196 " check in jstore and here")
1198 # Get initial list of nodes
1199 self._nodes = dict((n.name, n.primary_ip)
1200 for n in self.context.cfg.GetAllNodesInfo().values()
1201 if n.master_candidate)
1203 # Remove master node
1204 self._nodes.pop(self._my_hostname, None)
1206 # TODO: Check consistency across nodes
1208 self._queue_size = 0
1209 self._UpdateQueueSizeUnlocked()
1210 self._drained = self._IsQueueMarkedDrain()
1213 self._wpool = _JobQueueWorkerPool(self)
1215 self._InspectQueue()
1217 self._wpool.TerminateWorkers()
1220 @locking.ssynchronized(_LOCK)
1222 def _InspectQueue(self):
1223 """Loads the whole job queue and resumes unfinished jobs.
1225 This function needs the lock here because WorkerPool.AddTask() may start a
1226 job while we're still doing our work.
1229 logging.info("Inspecting job queue")
1233 all_job_ids = self._GetJobIDsUnlocked()
1234 jobs_count = len(all_job_ids)
1235 lastinfo = time.time()
1236 for idx, job_id in enumerate(all_job_ids):
1237 # Give an update every 1000 jobs or 10 seconds
1238 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1239 idx == (jobs_count - 1)):
1240 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1241 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1242 lastinfo = time.time()
1244 job = self._LoadJobUnlocked(job_id)
1246 # a failure in loading the job can cause 'None' to be returned
1250 status = job.CalcStatus()
1252 if status == constants.JOB_STATUS_QUEUED:
1253 restartjobs.append(job)
1255 elif status in (constants.JOB_STATUS_RUNNING,
1256 constants.JOB_STATUS_WAITLOCK,
1257 constants.JOB_STATUS_CANCELING):
1258 logging.warning("Unfinished job %s found: %s", job.id, job)
1260 if status == constants.JOB_STATUS_WAITLOCK:
1262 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1263 restartjobs.append(job)
1265 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1266 "Unclean master daemon shutdown")
1268 self.UpdateJobUnlocked(job)
1271 logging.info("Restarting %s jobs", len(restartjobs))
1272 self._EnqueueJobs(restartjobs)
1274 logging.info("Job queue inspection finished")
1276 @locking.ssynchronized(_LOCK)
1278 def AddNode(self, node):
1279 """Register a new node with the queue.
1281 @type node: L{objects.Node}
1282 @param node: the node object to be added
1285 node_name = node.name
1286 assert node_name != self._my_hostname
1288 # Clean queue directory on added node
1289 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1290 msg = result.fail_msg
1292 logging.warning("Cannot cleanup queue directory on node %s: %s",
1295 if not node.master_candidate:
1296 # remove if existing, ignoring errors
1297 self._nodes.pop(node_name, None)
1298 # and skip the replication of the job ids
1301 # Upload the whole queue excluding archived jobs
1302 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1304 # Upload current serial file
1305 files.append(constants.JOB_QUEUE_SERIAL_FILE)
1307 for file_name in files:
1309 content = utils.ReadFile(file_name)
1311 result = rpc.RpcRunner.call_jobqueue_update([node_name],
1314 msg = result[node_name].fail_msg
1316 logging.error("Failed to upload file %s to node %s: %s",
1317 file_name, node_name, msg)
1319 self._nodes[node_name] = node.primary_ip
1321 @locking.ssynchronized(_LOCK)
1323 def RemoveNode(self, node_name):
1324 """Callback called when removing nodes from the cluster.
1326 @type node_name: str
1327 @param node_name: the name of the node to remove
1330 self._nodes.pop(node_name, None)
1333 def _CheckRpcResult(result, nodes, failmsg):
1334 """Verifies the status of an RPC call.
1336 Since we aim to keep consistency should this node (the current
1337 master) fail, we will log errors if our rpc fail, and especially
1338 log the case when more than half of the nodes fails.
1340 @param result: the data as returned from the rpc call
1342 @param nodes: the list of nodes we made the call to
1344 @param failmsg: the identifier to be used for logging
1351 msg = result[node].fail_msg
1354 logging.error("RPC call %s (%s) failed on node %s: %s",
1355 result[node].call, failmsg, node, msg)
1357 success.append(node)
1359 # +1 for the master node
1360 if (len(success) + 1) < len(failed):
1361 # TODO: Handle failing nodes
1362 logging.error("More than half of the nodes failed")
1364 def _GetNodeIp(self):
1365 """Helper for returning the node name/ip list.
1367 @rtype: (list, list)
1368 @return: a tuple of two lists, the first one with the node
1369 names and the second one with the node addresses
1372 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1373 name_list = self._nodes.keys()
1374 addr_list = [self._nodes[name] for name in name_list]
1375 return name_list, addr_list
1377 def _UpdateJobQueueFile(self, file_name, data, replicate):
1378 """Writes a file locally and then replicates it to all nodes.
1380 This function will replace the contents of a file on the local
1381 node and then replicate it to all the other nodes we have.
1383 @type file_name: str
1384 @param file_name: the path of the file to be replicated
1386 @param data: the new contents of the file
1387 @type replicate: boolean
1388 @param replicate: whether to spread the changes to the remote nodes
1391 getents = runtime.GetEnts()
1392 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1393 gid=getents.masterd_gid)
1396 names, addrs = self._GetNodeIp()
1397 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1398 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1400 def _RenameFilesUnlocked(self, rename):
1401 """Renames a file locally and then replicate the change.
1403 This function will rename a file in the local queue directory
1404 and then replicate this rename to all the other nodes we have.
1406 @type rename: list of (old, new)
1407 @param rename: List containing tuples mapping old to new names
1410 # Rename them locally
1411 for old, new in rename:
1412 utils.RenameFile(old, new, mkdir=True)
1414 # ... and on all nodes
1415 names, addrs = self._GetNodeIp()
1416 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1417 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1420 def _FormatJobID(job_id):
1421 """Convert a job ID to string format.
1423 Currently this just does C{str(job_id)} after performing some
1424 checks, but if we want to change the job id format this will
1425 abstract this change.
1427 @type job_id: int or long
1428 @param job_id: the numeric job id
1430 @return: the formatted job id
1433 if not isinstance(job_id, (int, long)):
1434 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1436 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1441 def _GetArchiveDirectory(cls, job_id):
1442 """Returns the archive directory for a job.
1445 @param job_id: Job identifier
1447 @return: Directory name
1450 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1452 def _NewSerialsUnlocked(self, count):
1453 """Generates a new job identifier.
1455 Job identifiers are unique during the lifetime of a cluster.
1457 @type count: integer
1458 @param count: how many serials to return
1460 @return: a string representing the job identifier.
1465 serial = self._last_serial + count
1468 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1469 "%s\n" % serial, True)
1471 result = [self._FormatJobID(v)
1472 for v in range(self._last_serial, serial + 1)]
1473 # Keep it only if we were able to write the file
1474 self._last_serial = serial
1479 def _GetJobPath(job_id):
1480 """Returns the job file for a given job id.
1483 @param job_id: the job identifier
1485 @return: the path to the job file
1488 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1491 def _GetArchivedJobPath(cls, job_id):
1492 """Returns the archived job file for a give job id.
1495 @param job_id: the job identifier
1497 @return: the path to the archived job file
1500 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1501 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1503 def _GetJobIDsUnlocked(self, sort=True):
1504 """Return all known job IDs.
1506 The method only looks at disk because it's a requirement that all
1507 jobs are present on disk (so in the _memcache we don't have any
1511 @param sort: perform sorting on the returned job ids
1513 @return: the list of job IDs
1517 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1518 m = self._RE_JOB_FILE.match(filename)
1520 jlist.append(m.group(1))
1522 jlist = utils.NiceSort(jlist)
1525 def _LoadJobUnlocked(self, job_id):
1526 """Loads a job from the disk or memory.
1528 Given a job id, this will return the cached job object if
1529 existing, or try to load the job from the disk. If loading from
1530 disk, it will also add the job to the cache.
1532 @param job_id: the job id
1533 @rtype: L{_QueuedJob} or None
1534 @return: either None or the job object
1537 job = self._memcache.get(job_id, None)
1539 logging.debug("Found job %s in memcache", job_id)
1543 job = self._LoadJobFromDisk(job_id)
1546 except errors.JobFileCorrupted:
1547 old_path = self._GetJobPath(job_id)
1548 new_path = self._GetArchivedJobPath(job_id)
1549 if old_path == new_path:
1550 # job already archived (future case)
1551 logging.exception("Can't parse job %s", job_id)
1554 logging.exception("Can't parse job %s, will archive.", job_id)
1555 self._RenameFilesUnlocked([(old_path, new_path)])
1558 self._memcache[job_id] = job
1559 logging.debug("Added job %s to the cache", job_id)
1562 def _LoadJobFromDisk(self, job_id):
1563 """Load the given job file from disk.
1565 Given a job file, read, load and restore it in a _QueuedJob format.
1567 @type job_id: string
1568 @param job_id: job identifier
1569 @rtype: L{_QueuedJob} or None
1570 @return: either None or the job object
1573 filepath = self._GetJobPath(job_id)
1574 logging.debug("Loading job from %s", filepath)
1576 raw_data = utils.ReadFile(filepath)
1577 except EnvironmentError, err:
1578 if err.errno in (errno.ENOENT, ):
1583 data = serializer.LoadJson(raw_data)
1584 job = _QueuedJob.Restore(self, data)
1585 except Exception, err: # pylint: disable-msg=W0703
1586 raise errors.JobFileCorrupted(err)
1590 def SafeLoadJobFromDisk(self, job_id):
1591 """Load the given job file from disk.
1593 Given a job file, read, load and restore it in a _QueuedJob format.
1594 In case of error reading the job, it gets returned as None, and the
1595 exception is logged.
1597 @type job_id: string
1598 @param job_id: job identifier
1599 @rtype: L{_QueuedJob} or None
1600 @return: either None or the job object
1604 return self._LoadJobFromDisk(job_id)
1605 except (errors.JobFileCorrupted, EnvironmentError):
1606 logging.exception("Can't load/parse job %s", job_id)
1610 def _IsQueueMarkedDrain():
1611 """Check if the queue is marked from drain.
1613 This currently uses the queue drain file, which makes it a
1614 per-node flag. In the future this can be moved to the config file.
1617 @return: True of the job queue is marked for draining
1620 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1622 def _UpdateQueueSizeUnlocked(self):
1623 """Update the queue size.
1626 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1628 @locking.ssynchronized(_LOCK)
1630 def SetDrainFlag(self, drain_flag):
1631 """Sets the drain flag for the queue.
1633 @type drain_flag: boolean
1634 @param drain_flag: Whether to set or unset the drain flag
1637 getents = runtime.GetEnts()
1640 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1641 uid=getents.masterd_uid, gid=getents.masterd_gid)
1643 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1645 self._drained = drain_flag
1650 def _SubmitJobUnlocked(self, job_id, ops):
1651 """Create and store a new job.
1653 This enters the job into our job queue and also puts it on the new
1654 queue, in order for it to be picked up by the queue processors.
1656 @type job_id: job ID
1657 @param job_id: the job ID for the new job
1659 @param ops: The list of OpCodes that will become the new job.
1660 @rtype: L{_QueuedJob}
1661 @return: the job object to be queued
1662 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1663 @raise errors.JobQueueFull: if the job queue has too many jobs in it
1664 @raise errors.GenericError: If an opcode is not valid
1667 # Ok when sharing the big job queue lock, as the drain file is created when
1668 # the lock is exclusive.
1670 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1672 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1673 raise errors.JobQueueFull()
1675 job = _QueuedJob(self, job_id, ops)
1678 for idx, op in enumerate(job.ops):
1679 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1680 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1681 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1682 " are %s" % (idx, op.priority, allowed))
1685 self.UpdateJobUnlocked(job)
1687 self._queue_size += 1
1689 logging.debug("Adding new job %s to the cache", job_id)
1690 self._memcache[job_id] = job
1694 @locking.ssynchronized(_LOCK)
1696 def SubmitJob(self, ops):
1697 """Create and store a new job.
1699 @see: L{_SubmitJobUnlocked}
1702 job_id = self._NewSerialsUnlocked(1)[0]
1703 self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1706 @locking.ssynchronized(_LOCK)
1708 def SubmitManyJobs(self, jobs):
1709 """Create and store multiple jobs.
1711 @see: L{_SubmitJobUnlocked}
1716 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1717 for job_id, ops in zip(all_job_ids, jobs):
1719 added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1722 except errors.GenericError, err:
1725 results.append((status, data))
1727 self._EnqueueJobs(added_jobs)
1731 def _EnqueueJobs(self, jobs):
1732 """Helper function to add jobs to worker pool's queue.
1735 @param jobs: List of all jobs
1738 self._wpool.AddManyTasks([(job, ) for job in jobs],
1739 priority=[job.CalcPriority() for job in jobs])
1742 def UpdateJobUnlocked(self, job, replicate=True):
1743 """Update a job's on disk storage.
1745 After a job has been modified, this function needs to be called in
1746 order to write the changes to disk and replicate them to the other
1749 @type job: L{_QueuedJob}
1750 @param job: the changed job
1751 @type replicate: boolean
1752 @param replicate: whether to replicate the change to remote nodes
1755 filename = self._GetJobPath(job.id)
1756 data = serializer.DumpJson(job.Serialize(), indent=False)
1757 logging.debug("Writing job %s to %s", job.id, filename)
1758 self._UpdateJobQueueFile(filename, data, replicate)
1760 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1762 """Waits for changes in a job.
1764 @type job_id: string
1765 @param job_id: Job identifier
1766 @type fields: list of strings
1767 @param fields: Which fields to check for changes
1768 @type prev_job_info: list or None
1769 @param prev_job_info: Last job information returned
1770 @type prev_log_serial: int
1771 @param prev_log_serial: Last job message serial number
1772 @type timeout: float
1773 @param timeout: maximum time to wait in seconds
1774 @rtype: tuple (job info, log entries)
1775 @return: a tuple of the job information as required via
1776 the fields parameter, and the log entries as a list
1778 if the job has not changed and the timeout has expired,
1779 we instead return a special value,
1780 L{constants.JOB_NOTCHANGED}, which should be interpreted
1781 as such by the clients
1784 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1786 helper = _WaitForJobChangesHelper()
1788 return helper(self._GetJobPath(job_id), load_fn,
1789 fields, prev_job_info, prev_log_serial, timeout)
1791 @locking.ssynchronized(_LOCK)
1793 def CancelJob(self, job_id):
1796 This will only succeed if the job has not started yet.
1798 @type job_id: string
1799 @param job_id: job ID of job to be cancelled.
1802 logging.info("Cancelling job %s", job_id)
1804 job = self._LoadJobUnlocked(job_id)
1806 logging.debug("Job %s not found", job_id)
1807 return (False, "Job %s not found" % job_id)
1809 (success, msg) = job.Cancel()
1812 self.UpdateJobUnlocked(job)
1814 return (success, msg)
1817 def _ArchiveJobsUnlocked(self, jobs):
1820 @type jobs: list of L{_QueuedJob}
1821 @param jobs: Job objects
1823 @return: Number of archived jobs
1829 if job.CalcStatus() not in constants.JOBS_FINALIZED:
1830 logging.debug("Job %s is not yet done", job.id)
1833 archive_jobs.append(job)
1835 old = self._GetJobPath(job.id)
1836 new = self._GetArchivedJobPath(job.id)
1837 rename_files.append((old, new))
1839 # TODO: What if 1..n files fail to rename?
1840 self._RenameFilesUnlocked(rename_files)
1842 logging.debug("Successfully archived job(s) %s",
1843 utils.CommaJoin(job.id for job in archive_jobs))
1845 # Since we haven't quite checked, above, if we succeeded or failed renaming
1846 # the files, we update the cached queue size from the filesystem. When we
1847 # get around to fix the TODO: above, we can use the number of actually
1848 # archived jobs to fix this.
1849 self._UpdateQueueSizeUnlocked()
1850 return len(archive_jobs)
1852 @locking.ssynchronized(_LOCK)
1854 def ArchiveJob(self, job_id):
1857 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1859 @type job_id: string
1860 @param job_id: Job ID of job to be archived.
1862 @return: Whether job was archived
1865 logging.info("Archiving job %s", job_id)
1867 job = self._LoadJobUnlocked(job_id)
1869 logging.debug("Job %s not found", job_id)
1872 return self._ArchiveJobsUnlocked([job]) == 1
1874 @locking.ssynchronized(_LOCK)
1876 def AutoArchiveJobs(self, age, timeout):
1877 """Archives all jobs based on age.
1879 The method will archive all jobs which are older than the age
1880 parameter. For jobs that don't have an end timestamp, the start
1881 timestamp will be considered. The special '-1' age will cause
1882 archival of all jobs (that are not running or queued).
1885 @param age: the minimum age in seconds
1888 logging.info("Archiving jobs with age more than %s seconds", age)
1891 end_time = now + timeout
1895 all_job_ids = self._GetJobIDsUnlocked()
1897 for idx, job_id in enumerate(all_job_ids):
1898 last_touched = idx + 1
1900 # Not optimal because jobs could be pending
1901 # TODO: Measure average duration for job archival and take number of
1902 # pending jobs into account.
1903 if time.time() > end_time:
1906 # Returns None if the job failed to load
1907 job = self._LoadJobUnlocked(job_id)
1909 if job.end_timestamp is None:
1910 if job.start_timestamp is None:
1911 job_age = job.received_timestamp
1913 job_age = job.start_timestamp
1915 job_age = job.end_timestamp
1917 if age == -1 or now - job_age[0] > age:
1920 # Archive 10 jobs at a time
1921 if len(pending) >= 10:
1922 archived_count += self._ArchiveJobsUnlocked(pending)
1926 archived_count += self._ArchiveJobsUnlocked(pending)
1928 return (archived_count, len(all_job_ids) - last_touched)
1930 def QueryJobs(self, job_ids, fields):
1931 """Returns a list of jobs in queue.
1934 @param job_ids: sequence of job identifiers or None for all
1936 @param fields: names of fields to return
1938 @return: list one element per job, each element being list with
1939 the requested fields
1945 # Since files are added to/removed from the queue atomically, there's no
1946 # risk of getting the job ids in an inconsistent state.
1947 job_ids = self._GetJobIDsUnlocked()
1950 for job_id in job_ids:
1951 job = self.SafeLoadJobFromDisk(job_id)
1953 jobs.append(job.GetInfo(fields))
1959 @locking.ssynchronized(_LOCK)
1962 """Stops the job queue.
1964 This shutdowns all the worker threads an closes the queue.
1967 self._wpool.TerminateWorkers()
1969 self._queue_filelock.Close()
1970 self._queue_filelock = None