4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 # pylint: disable-msg=E0611
41 from pyinotify import pyinotify
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
62 JOBS_PER_ARCHIVE_DIRECTORY = 10000
64 # member lock names to be passed to @ssynchronized decorator
69 class CancelJob(Exception):
70 """Special exception to cancel a job.
76 """Returns the current timestamp.
79 @return: the current time in the (seconds, microseconds) format
82 return utils.SplitTime(time.time())
85 class _QueuedOpCode(object):
86 """Encapsulates an opcode object.
88 @ivar log: holds the execution log and consists of tuples
89 of the form C{(log_serial, timestamp, level, message)}
90 @ivar input: the OpCode we encapsulate
91 @ivar status: the current status
92 @ivar result: the result of the LU execution
93 @ivar start_timestamp: timestamp for the start of the execution
94 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95 @ivar stop_timestamp: timestamp for the end of the execution
98 __slots__ = ["input", "status", "result", "log", "priority",
99 "start_timestamp", "exec_timestamp", "end_timestamp",
102 def __init__(self, op):
103 """Constructor for the _QuededOpCode.
105 @type op: L{opcodes.OpCode}
106 @param op: the opcode we encapsulate
110 self.status = constants.OP_STATUS_QUEUED
113 self.start_timestamp = None
114 self.exec_timestamp = None
115 self.end_timestamp = None
117 # Get initial priority (it might change during the lifetime of this opcode)
118 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
121 def Restore(cls, state):
122 """Restore the _QueuedOpCode from the serialized form.
125 @param state: the serialized state
126 @rtype: _QueuedOpCode
127 @return: a new _QueuedOpCode instance
130 obj = _QueuedOpCode.__new__(cls)
131 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132 obj.status = state["status"]
133 obj.result = state["result"]
134 obj.log = state["log"]
135 obj.start_timestamp = state.get("start_timestamp", None)
136 obj.exec_timestamp = state.get("exec_timestamp", None)
137 obj.end_timestamp = state.get("end_timestamp", None)
138 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
142 """Serializes this _QueuedOpCode.
145 @return: the dictionary holding the serialized state
149 "input": self.input.__getstate__(),
150 "status": self.status,
151 "result": self.result,
153 "start_timestamp": self.start_timestamp,
154 "exec_timestamp": self.exec_timestamp,
155 "end_timestamp": self.end_timestamp,
156 "priority": self.priority,
160 class _QueuedJob(object):
161 """In-memory job representation.
163 This is what we use to track the user-submitted jobs. Locking must
164 be taken care of by users of this class.
166 @type queue: L{JobQueue}
167 @ivar queue: the parent queue
170 @ivar ops: the list of _QueuedOpCode that constitute the job
171 @type log_serial: int
172 @ivar log_serial: holds the index for the next log entry
173 @ivar received_timestamp: the timestamp for when the job was received
174 @ivar start_timestmap: the timestamp for start of execution
175 @ivar end_timestamp: the timestamp for end of execution
178 # pylint: disable-msg=W0212
179 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
180 "received_timestamp", "start_timestamp", "end_timestamp",
183 def __init__(self, queue, job_id, ops):
184 """Constructor for the _QueuedJob.
186 @type queue: L{JobQueue}
187 @param queue: our parent queue
189 @param job_id: our job id
191 @param ops: the list of opcodes we hold, which will be encapsulated
196 raise errors.GenericError("A job needs at least one opcode")
200 self.ops = [_QueuedOpCode(op) for op in ops]
202 self.received_timestamp = TimeStampNow()
203 self.start_timestamp = None
204 self.end_timestamp = None
206 self._InitInMemory(self)
209 def _InitInMemory(obj):
210 """Initializes in-memory variables.
217 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
219 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
221 return "<%s at %#x>" % (" ".join(status), id(self))
224 def Restore(cls, queue, state):
225 """Restore a _QueuedJob from serialized state:
227 @type queue: L{JobQueue}
228 @param queue: to which queue the restored job belongs
230 @param state: the serialized state
232 @return: the restored _JobQueue instance
235 obj = _QueuedJob.__new__(cls)
238 obj.received_timestamp = state.get("received_timestamp", None)
239 obj.start_timestamp = state.get("start_timestamp", None)
240 obj.end_timestamp = state.get("end_timestamp", None)
244 for op_state in state["ops"]:
245 op = _QueuedOpCode.Restore(op_state)
246 for log_entry in op.log:
247 obj.log_serial = max(obj.log_serial, log_entry[0])
250 cls._InitInMemory(obj)
255 """Serialize the _JobQueue instance.
258 @return: the serialized state
263 "ops": [op.Serialize() for op in self.ops],
264 "start_timestamp": self.start_timestamp,
265 "end_timestamp": self.end_timestamp,
266 "received_timestamp": self.received_timestamp,
269 def CalcStatus(self):
270 """Compute the status of this job.
272 This function iterates over all the _QueuedOpCodes in the job and
273 based on their status, computes the job status.
276 - if we find a cancelled, or finished with error, the job
277 status will be the same
278 - otherwise, the last opcode with the status one of:
283 will determine the job status
285 - otherwise, it means either all opcodes are queued, or success,
286 and the job status will be the same
288 @return: the job status
291 status = constants.JOB_STATUS_QUEUED
295 if op.status == constants.OP_STATUS_SUCCESS:
300 if op.status == constants.OP_STATUS_QUEUED:
302 elif op.status == constants.OP_STATUS_WAITLOCK:
303 status = constants.JOB_STATUS_WAITLOCK
304 elif op.status == constants.OP_STATUS_RUNNING:
305 status = constants.JOB_STATUS_RUNNING
306 elif op.status == constants.OP_STATUS_CANCELING:
307 status = constants.JOB_STATUS_CANCELING
309 elif op.status == constants.OP_STATUS_ERROR:
310 status = constants.JOB_STATUS_ERROR
311 # The whole job fails if one opcode failed
313 elif op.status == constants.OP_STATUS_CANCELED:
314 status = constants.OP_STATUS_CANCELED
318 status = constants.JOB_STATUS_SUCCESS
322 def CalcPriority(self):
323 """Gets the current priority for this job.
325 Only unfinished opcodes are considered. When all are done, the default
331 priorities = [op.priority for op in self.ops
332 if op.status not in constants.OPS_FINALIZED]
335 # All opcodes are done, assume default priority
336 return constants.OP_PRIO_DEFAULT
338 return min(priorities)
340 def GetLogEntries(self, newer_than):
341 """Selectively returns the log entries.
343 @type newer_than: None or int
344 @param newer_than: if this is None, return all log entries,
345 otherwise return only the log entries with serial higher
348 @return: the list of the log entries selected
351 if newer_than is None:
358 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
362 def GetInfo(self, fields):
363 """Returns information about a job.
366 @param fields: names of fields to return
368 @return: list with one element for each field
369 @raise errors.OpExecError: when an invalid field
377 elif fname == "status":
378 row.append(self.CalcStatus())
379 elif fname == "priority":
380 row.append(self.CalcPriority())
382 row.append([op.input.__getstate__() for op in self.ops])
383 elif fname == "opresult":
384 row.append([op.result for op in self.ops])
385 elif fname == "opstatus":
386 row.append([op.status for op in self.ops])
387 elif fname == "oplog":
388 row.append([op.log for op in self.ops])
389 elif fname == "opstart":
390 row.append([op.start_timestamp for op in self.ops])
391 elif fname == "opexec":
392 row.append([op.exec_timestamp for op in self.ops])
393 elif fname == "opend":
394 row.append([op.end_timestamp for op in self.ops])
395 elif fname == "oppriority":
396 row.append([op.priority for op in self.ops])
397 elif fname == "received_ts":
398 row.append(self.received_timestamp)
399 elif fname == "start_ts":
400 row.append(self.start_timestamp)
401 elif fname == "end_ts":
402 row.append(self.end_timestamp)
403 elif fname == "summary":
404 row.append([op.input.Summary() for op in self.ops])
406 raise errors.OpExecError("Invalid self query field '%s'" % fname)
409 def MarkUnfinishedOps(self, status, result):
410 """Mark unfinished opcodes with a given status and result.
412 This is an utility function for marking all running or waiting to
413 be run opcodes with a given status. Opcodes which are already
414 finalised are not changed.
416 @param status: a given opcode status
417 @param result: the opcode result
422 if op.status in constants.OPS_FINALIZED:
423 assert not_marked, "Finalized opcodes found after non-finalized ones"
430 """Marks job as canceled/-ing if possible.
432 @rtype: tuple; (bool, string)
433 @return: Boolean describing whether job was successfully canceled or marked
434 as canceling and a text message
437 status = self.CalcStatus()
439 if status == constants.JOB_STATUS_QUEUED:
440 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
441 "Job canceled by request")
442 return (True, "Job %s canceled" % self.id)
444 elif status == constants.JOB_STATUS_WAITLOCK:
445 # The worker will notice the new status and cancel the job
446 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
447 return (True, "Job %s will be canceled" % self.id)
450 logging.debug("Job %s is no longer waiting in the queue", self.id)
451 return (False, "Job %s is no longer waiting in the queue" % self.id)
454 class _OpExecCallbacks(mcpu.OpExecCbBase):
455 def __init__(self, queue, job, op):
456 """Initializes this class.
458 @type queue: L{JobQueue}
459 @param queue: Job queue
460 @type job: L{_QueuedJob}
461 @param job: Job object
462 @type op: L{_QueuedOpCode}
466 assert queue, "Queue is missing"
467 assert job, "Job is missing"
468 assert op, "Opcode is missing"
474 def _CheckCancel(self):
475 """Raises an exception to cancel the job if asked to.
478 # Cancel here if we were asked to
479 if self._op.status == constants.OP_STATUS_CANCELING:
480 logging.debug("Canceling opcode")
483 @locking.ssynchronized(_QUEUE, shared=1)
484 def NotifyStart(self):
485 """Mark the opcode as running, not lock-waiting.
487 This is called from the mcpu code as a notifier function, when the LU is
488 finally about to start the Exec() method. Of course, to have end-user
489 visible results, the opcode must be initially (before calling into
490 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
493 assert self._op in self._job.ops
494 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
495 constants.OP_STATUS_CANCELING)
497 # Cancel here if we were asked to
500 logging.debug("Opcode is now running")
502 self._op.status = constants.OP_STATUS_RUNNING
503 self._op.exec_timestamp = TimeStampNow()
505 # And finally replicate the job status
506 self._queue.UpdateJobUnlocked(self._job)
508 @locking.ssynchronized(_QUEUE, shared=1)
509 def _AppendFeedback(self, timestamp, log_type, log_msg):
510 """Internal feedback append function, with locks
513 self._job.log_serial += 1
514 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
515 self._queue.UpdateJobUnlocked(self._job, replicate=False)
517 def Feedback(self, *args):
518 """Append a log entry.
524 log_type = constants.ELOG_MESSAGE
527 (log_type, log_msg) = args
529 # The time is split to make serialization easier and not lose
531 timestamp = utils.SplitTime(time.time())
532 self._AppendFeedback(timestamp, log_type, log_msg)
534 def CheckCancel(self):
535 """Check whether job has been cancelled.
538 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
539 constants.OP_STATUS_CANCELING)
541 # Cancel here if we were asked to
545 class _JobChangesChecker(object):
546 def __init__(self, fields, prev_job_info, prev_log_serial):
547 """Initializes this class.
549 @type fields: list of strings
550 @param fields: Fields requested by LUXI client
551 @type prev_job_info: string
552 @param prev_job_info: previous job info, as passed by the LUXI client
553 @type prev_log_serial: string
554 @param prev_log_serial: previous job serial, as passed by the LUXI client
557 self._fields = fields
558 self._prev_job_info = prev_job_info
559 self._prev_log_serial = prev_log_serial
561 def __call__(self, job):
562 """Checks whether job has changed.
564 @type job: L{_QueuedJob}
565 @param job: Job object
568 status = job.CalcStatus()
569 job_info = job.GetInfo(self._fields)
570 log_entries = job.GetLogEntries(self._prev_log_serial)
572 # Serializing and deserializing data can cause type changes (e.g. from
573 # tuple to list) or precision loss. We're doing it here so that we get
574 # the same modifications as the data received from the client. Without
575 # this, the comparison afterwards might fail without the data being
576 # significantly different.
577 # TODO: we just deserialized from disk, investigate how to make sure that
578 # the job info and log entries are compatible to avoid this further step.
579 # TODO: Doing something like in testutils.py:UnifyValueType might be more
580 # efficient, though floats will be tricky
581 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
582 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
584 # Don't even try to wait if the job is no longer running, there will be
586 if (status not in (constants.JOB_STATUS_QUEUED,
587 constants.JOB_STATUS_RUNNING,
588 constants.JOB_STATUS_WAITLOCK) or
589 job_info != self._prev_job_info or
590 (log_entries and self._prev_log_serial != log_entries[0][0])):
591 logging.debug("Job %s changed", job.id)
592 return (job_info, log_entries)
597 class _JobFileChangesWaiter(object):
598 def __init__(self, filename):
599 """Initializes this class.
601 @type filename: string
602 @param filename: Path to job file
603 @raises errors.InotifyError: if the notifier cannot be setup
606 self._wm = pyinotify.WatchManager()
607 self._inotify_handler = \
608 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
610 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
612 self._inotify_handler.enable()
614 # pyinotify doesn't close file descriptors automatically
615 self._notifier.stop()
618 def _OnInotify(self, notifier_enabled):
619 """Callback for inotify.
622 if not notifier_enabled:
623 self._inotify_handler.enable()
625 def Wait(self, timeout):
626 """Waits for the job file to change.
629 @param timeout: Timeout in seconds
630 @return: Whether there have been events
634 have_events = self._notifier.check_events(timeout * 1000)
636 self._notifier.read_events()
637 self._notifier.process_events()
641 """Closes underlying notifier and its file descriptor.
644 self._notifier.stop()
647 class _JobChangesWaiter(object):
648 def __init__(self, filename):
649 """Initializes this class.
651 @type filename: string
652 @param filename: Path to job file
655 self._filewaiter = None
656 self._filename = filename
658 def Wait(self, timeout):
659 """Waits for a job to change.
662 @param timeout: Timeout in seconds
663 @return: Whether there have been events
667 return self._filewaiter.Wait(timeout)
669 # Lazy setup: Avoid inotify setup cost when job file has already changed.
670 # If this point is reached, return immediately and let caller check the job
671 # file again in case there were changes since the last check. This avoids a
673 self._filewaiter = _JobFileChangesWaiter(self._filename)
678 """Closes underlying waiter.
682 self._filewaiter.Close()
685 class _WaitForJobChangesHelper(object):
686 """Helper class using inotify to wait for changes in a job file.
688 This class takes a previous job status and serial, and alerts the client when
689 the current job status has changed.
693 def _CheckForChanges(job_load_fn, check_fn):
696 raise errors.JobLost()
698 result = check_fn(job)
700 raise utils.RetryAgain()
704 def __call__(self, filename, job_load_fn,
705 fields, prev_job_info, prev_log_serial, timeout):
706 """Waits for changes on a job.
708 @type filename: string
709 @param filename: File on which to wait for changes
710 @type job_load_fn: callable
711 @param job_load_fn: Function to load job
712 @type fields: list of strings
713 @param fields: Which fields to check for changes
714 @type prev_job_info: list or None
715 @param prev_job_info: Last job information returned
716 @type prev_log_serial: int
717 @param prev_log_serial: Last job message serial number
719 @param timeout: maximum time to wait in seconds
723 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
724 waiter = _JobChangesWaiter(filename)
726 return utils.Retry(compat.partial(self._CheckForChanges,
727 job_load_fn, check_fn),
728 utils.RETRY_REMAINING_TIME, timeout,
732 except (errors.InotifyError, errors.JobLost):
734 except utils.RetryTimeout:
735 return constants.JOB_NOTCHANGED
738 def _EncodeOpError(err):
739 """Encodes an error which occurred while processing an opcode.
742 if isinstance(err, errors.GenericError):
745 to_encode = errors.OpExecError(str(err))
747 return errors.EncodeException(to_encode)
750 class _TimeoutStrategyWrapper:
751 def __init__(self, fn):
752 """Initializes this class.
759 """Gets the next timeout if necessary.
762 if self._next is None:
763 self._next = self._fn()
766 """Returns the next timeout.
773 """Returns the current timeout and advances the internal state.
782 class _OpExecContext:
783 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
784 """Initializes this class.
789 self.log_prefix = log_prefix
790 self.summary = op.input.Summary()
792 self._timeout_strategy_factory = timeout_strategy_factory
793 self._ResetTimeoutStrategy()
795 def _ResetTimeoutStrategy(self):
796 """Creates a new timeout strategy.
799 self._timeout_strategy = \
800 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
802 def CheckPriorityIncrease(self):
803 """Checks whether priority can and should be increased.
805 Called when locks couldn't be acquired.
810 # Exhausted all retries and next round should not use blocking acquire
812 if (self._timeout_strategy.Peek() is None and
813 op.priority > constants.OP_PRIO_HIGHEST):
814 logging.debug("Increasing priority")
816 self._ResetTimeoutStrategy()
821 def GetNextLockTimeout(self):
822 """Returns the next lock acquire timeout.
825 return self._timeout_strategy.Next()
828 class _JobProcessor(object):
829 def __init__(self, queue, opexec_fn, job,
830 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
831 """Initializes this class.
835 self.opexec_fn = opexec_fn
837 self._timeout_strategy_factory = _timeout_strategy_factory
840 def _FindNextOpcode(job, timeout_strategy_factory):
841 """Locates the next opcode to run.
843 @type job: L{_QueuedJob}
844 @param job: Job object
845 @param timeout_strategy_factory: Callable to create new timeout strategy
848 # Create some sort of a cache to speed up locating next opcode for future
850 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
851 # pending and one for processed ops.
852 if job.ops_iter is None:
853 job.ops_iter = enumerate(job.ops)
855 # Find next opcode to run
858 (idx, op) = job.ops_iter.next()
859 except StopIteration:
860 raise errors.ProgrammerError("Called for a finished job")
862 if op.status == constants.OP_STATUS_RUNNING:
863 # Found an opcode already marked as running
864 raise errors.ProgrammerError("Called for job marked as running")
866 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
867 timeout_strategy_factory)
869 if op.status == constants.OP_STATUS_CANCELED:
870 # Cancelled jobs are handled by the caller
871 assert not compat.any(i.status != constants.OP_STATUS_CANCELED
872 for i in job.ops[idx:])
874 elif op.status in constants.OPS_FINALIZED:
875 # This is a job that was partially completed before master daemon
876 # shutdown, so it can be expected that some opcodes are already
877 # completed successfully (if any did error out, then the whole job
878 # should have been aborted and not resubmitted for processing).
879 logging.info("%s: opcode %s already processed, skipping",
880 opctx.log_prefix, opctx.summary)
886 def _MarkWaitlock(job, op):
887 """Marks an opcode as waiting for locks.
889 The job's start timestamp is also set if necessary.
891 @type job: L{_QueuedJob}
892 @param job: Job object
893 @type op: L{_QueuedOpCode}
894 @param op: Opcode object
898 assert op.status in (constants.OP_STATUS_QUEUED,
899 constants.OP_STATUS_WAITLOCK)
905 if op.status == constants.OP_STATUS_QUEUED:
906 op.status = constants.OP_STATUS_WAITLOCK
909 if op.start_timestamp is None:
910 op.start_timestamp = TimeStampNow()
913 if job.start_timestamp is None:
914 job.start_timestamp = op.start_timestamp
917 assert op.status == constants.OP_STATUS_WAITLOCK
921 def _ExecOpCodeUnlocked(self, opctx):
922 """Processes one opcode and returns the result.
927 assert op.status == constants.OP_STATUS_WAITLOCK
929 timeout = opctx.GetNextLockTimeout()
932 # Make sure not to hold queue lock while calling ExecOpCode
933 result = self.opexec_fn(op.input,
934 _OpExecCallbacks(self.queue, self.job, op),
935 timeout=timeout, priority=op.priority)
936 except mcpu.LockAcquireTimeout:
937 assert timeout is not None, "Received timeout for blocking acquire"
938 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
940 assert op.status in (constants.OP_STATUS_WAITLOCK,
941 constants.OP_STATUS_CANCELING)
943 # Was job cancelled while we were waiting for the lock?
944 if op.status == constants.OP_STATUS_CANCELING:
945 return (constants.OP_STATUS_CANCELING, None)
947 # Stay in waitlock while trying to re-acquire lock
948 return (constants.OP_STATUS_WAITLOCK, None)
950 logging.exception("%s: Canceling job", opctx.log_prefix)
951 assert op.status == constants.OP_STATUS_CANCELING
952 return (constants.OP_STATUS_CANCELING, None)
953 except Exception, err: # pylint: disable-msg=W0703
954 logging.exception("%s: Caught exception in %s",
955 opctx.log_prefix, opctx.summary)
956 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
958 logging.debug("%s: %s successful",
959 opctx.log_prefix, opctx.summary)
960 return (constants.OP_STATUS_SUCCESS, result)
962 def __call__(self, _nextop_fn=None):
963 """Continues execution of a job.
965 @param _nextop_fn: Callback function for tests
967 @return: True if job is finished, False if processor needs to be called
974 logging.debug("Processing job %s", job.id)
976 queue.acquire(shared=1)
978 opcount = len(job.ops)
980 # Is a previous opcode still pending?
982 opctx = job.cur_opctx
985 if __debug__ and _nextop_fn:
987 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
992 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
993 constants.OP_STATUS_CANCELING,
994 constants.OP_STATUS_CANCELED)
995 for i in job.ops[opctx.index + 1:])
997 assert op.status in (constants.OP_STATUS_QUEUED,
998 constants.OP_STATUS_WAITLOCK,
999 constants.OP_STATUS_CANCELING,
1000 constants.OP_STATUS_CANCELED)
1002 assert (op.priority <= constants.OP_PRIO_LOWEST and
1003 op.priority >= constants.OP_PRIO_HIGHEST)
1005 if op.status not in (constants.OP_STATUS_CANCELING,
1006 constants.OP_STATUS_CANCELED):
1007 assert op.status in (constants.OP_STATUS_QUEUED,
1008 constants.OP_STATUS_WAITLOCK)
1010 # Prepare to start opcode
1011 if self._MarkWaitlock(job, op):
1013 queue.UpdateJobUnlocked(job)
1015 assert op.status == constants.OP_STATUS_WAITLOCK
1016 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1017 assert job.start_timestamp and op.start_timestamp
1019 logging.info("%s: opcode %s waiting for locks",
1020 opctx.log_prefix, opctx.summary)
1024 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1026 queue.acquire(shared=1)
1028 op.status = op_status
1029 op.result = op_result
1031 if op.status == constants.OP_STATUS_WAITLOCK:
1032 # Couldn't get locks in time
1033 assert not op.end_timestamp
1036 op.end_timestamp = TimeStampNow()
1038 if op.status == constants.OP_STATUS_CANCELING:
1039 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1040 for i in job.ops[opctx.index:])
1042 assert op.status in constants.OPS_FINALIZED
1044 if op.status == constants.OP_STATUS_WAITLOCK:
1047 if opctx.CheckPriorityIncrease():
1048 # Priority was changed, need to update on-disk file
1049 queue.UpdateJobUnlocked(job)
1051 # Keep around for another round
1052 job.cur_opctx = opctx
1054 assert (op.priority <= constants.OP_PRIO_LOWEST and
1055 op.priority >= constants.OP_PRIO_HIGHEST)
1057 # In no case must the status be finalized here
1058 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1061 # Ensure all opcodes so far have been successful
1062 assert (opctx.index == 0 or
1063 compat.all(i.status == constants.OP_STATUS_SUCCESS
1064 for i in job.ops[:opctx.index]))
1067 job.cur_opctx = None
1069 if op.status == constants.OP_STATUS_SUCCESS:
1072 elif op.status == constants.OP_STATUS_ERROR:
1073 # Ensure failed opcode has an exception as its result
1074 assert errors.GetEncodedError(job.ops[opctx.index].result)
1076 to_encode = errors.OpExecError("Preceding opcode failed")
1077 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1078 _EncodeOpError(to_encode))
1082 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1083 errors.GetEncodedError(i.result)
1084 for i in job.ops[opctx.index:])
1086 elif op.status == constants.OP_STATUS_CANCELING:
1087 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1088 "Job canceled by request")
1091 elif op.status == constants.OP_STATUS_CANCELED:
1095 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1097 # Finalizing or last opcode?
1098 if finalize or opctx.index == (opcount - 1):
1099 # All opcodes have been run, finalize job
1100 job.end_timestamp = TimeStampNow()
1102 # Write to disk. If the job status is final, this is the final write
1103 # allowed. Once the file has been written, it can be archived anytime.
1104 queue.UpdateJobUnlocked(job)
1106 if finalize or opctx.index == (opcount - 1):
1107 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1115 class _JobQueueWorker(workerpool.BaseWorker):
1116 """The actual job workers.
1119 def RunTask(self, job): # pylint: disable-msg=W0221
1122 This functions processes a job. It is closely tied to the L{_QueuedJob} and
1123 L{_QueuedOpCode} classes.
1125 @type job: L{_QueuedJob}
1126 @param job: the job to be processed
1130 assert queue == self.pool.queue
1132 self.SetTaskName("Job%s" % job.id)
1134 proc = mcpu.Processor(queue.context, job.id)
1136 if not _JobProcessor(queue, proc.ExecOpCode, job)():
1138 raise workerpool.DeferTask(priority=job.CalcPriority())
1141 class _JobQueueWorkerPool(workerpool.WorkerPool):
1142 """Simple class implementing a job-processing workerpool.
1145 def __init__(self, queue):
1146 super(_JobQueueWorkerPool, self).__init__("JobQueue",
1152 def _RequireOpenQueue(fn):
1153 """Decorator for "public" functions.
1155 This function should be used for all 'public' functions. That is,
1156 functions usually called from other classes. Note that this should
1157 be applied only to methods (not plain functions), since it expects
1158 that the decorated function is called with a first argument that has
1159 a '_queue_filelock' argument.
1161 @warning: Use this decorator only after locking.ssynchronized
1164 @locking.ssynchronized(_LOCK)
1170 def wrapper(self, *args, **kwargs):
1171 # pylint: disable-msg=W0212
1172 assert self._queue_filelock is not None, "Queue should be open"
1173 return fn(self, *args, **kwargs)
1177 class JobQueue(object):
1178 """Queue used to manage the jobs.
1180 @cvar _RE_JOB_FILE: regex matching the valid job file names
1183 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1185 def __init__(self, context):
1186 """Constructor for JobQueue.
1188 The constructor will initialize the job queue object and then
1189 start loading the current jobs from disk, either for starting them
1190 (if they were queue) or for aborting them (if they were already
1193 @type context: GanetiContext
1194 @param context: the context object for access to the configuration
1195 data and other ganeti objects
1198 self.context = context
1199 self._memcache = weakref.WeakValueDictionary()
1200 self._my_hostname = netutils.Hostname.GetSysName()
1202 # The Big JobQueue lock. If a code block or method acquires it in shared
1203 # mode safe it must guarantee concurrency with all the code acquiring it in
1204 # shared mode, including itself. In order not to acquire it at all
1205 # concurrency must be guaranteed with all code acquiring it in shared mode
1206 # and all code acquiring it exclusively.
1207 self._lock = locking.SharedLock("JobQueue")
1209 self.acquire = self._lock.acquire
1210 self.release = self._lock.release
1212 # Initialize the queue, and acquire the filelock.
1213 # This ensures no other process is working on the job queue.
1214 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1217 self._last_serial = jstore.ReadSerial()
1218 assert self._last_serial is not None, ("Serial file was modified between"
1219 " check in jstore and here")
1221 # Get initial list of nodes
1222 self._nodes = dict((n.name, n.primary_ip)
1223 for n in self.context.cfg.GetAllNodesInfo().values()
1224 if n.master_candidate)
1226 # Remove master node
1227 self._nodes.pop(self._my_hostname, None)
1229 # TODO: Check consistency across nodes
1231 self._queue_size = 0
1232 self._UpdateQueueSizeUnlocked()
1233 self._drained = self._IsQueueMarkedDrain()
1236 self._wpool = _JobQueueWorkerPool(self)
1238 self._InspectQueue()
1240 self._wpool.TerminateWorkers()
1243 @locking.ssynchronized(_LOCK)
1245 def _InspectQueue(self):
1246 """Loads the whole job queue and resumes unfinished jobs.
1248 This function needs the lock here because WorkerPool.AddTask() may start a
1249 job while we're still doing our work.
1252 logging.info("Inspecting job queue")
1256 all_job_ids = self._GetJobIDsUnlocked()
1257 jobs_count = len(all_job_ids)
1258 lastinfo = time.time()
1259 for idx, job_id in enumerate(all_job_ids):
1260 # Give an update every 1000 jobs or 10 seconds
1261 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1262 idx == (jobs_count - 1)):
1263 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1264 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1265 lastinfo = time.time()
1267 job = self._LoadJobUnlocked(job_id)
1269 # a failure in loading the job can cause 'None' to be returned
1273 status = job.CalcStatus()
1275 if status == constants.JOB_STATUS_QUEUED:
1276 restartjobs.append(job)
1278 elif status in (constants.JOB_STATUS_RUNNING,
1279 constants.JOB_STATUS_WAITLOCK,
1280 constants.JOB_STATUS_CANCELING):
1281 logging.warning("Unfinished job %s found: %s", job.id, job)
1283 if status == constants.JOB_STATUS_WAITLOCK:
1285 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1286 restartjobs.append(job)
1288 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1289 "Unclean master daemon shutdown")
1291 self.UpdateJobUnlocked(job)
1294 logging.info("Restarting %s jobs", len(restartjobs))
1295 self._EnqueueJobs(restartjobs)
1297 logging.info("Job queue inspection finished")
1299 @locking.ssynchronized(_LOCK)
1301 def AddNode(self, node):
1302 """Register a new node with the queue.
1304 @type node: L{objects.Node}
1305 @param node: the node object to be added
1308 node_name = node.name
1309 assert node_name != self._my_hostname
1311 # Clean queue directory on added node
1312 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1313 msg = result.fail_msg
1315 logging.warning("Cannot cleanup queue directory on node %s: %s",
1318 if not node.master_candidate:
1319 # remove if existing, ignoring errors
1320 self._nodes.pop(node_name, None)
1321 # and skip the replication of the job ids
1324 # Upload the whole queue excluding archived jobs
1325 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1327 # Upload current serial file
1328 files.append(constants.JOB_QUEUE_SERIAL_FILE)
1330 for file_name in files:
1332 content = utils.ReadFile(file_name)
1334 result = rpc.RpcRunner.call_jobqueue_update([node_name],
1337 msg = result[node_name].fail_msg
1339 logging.error("Failed to upload file %s to node %s: %s",
1340 file_name, node_name, msg)
1342 self._nodes[node_name] = node.primary_ip
1344 @locking.ssynchronized(_LOCK)
1346 def RemoveNode(self, node_name):
1347 """Callback called when removing nodes from the cluster.
1349 @type node_name: str
1350 @param node_name: the name of the node to remove
1353 self._nodes.pop(node_name, None)
1356 def _CheckRpcResult(result, nodes, failmsg):
1357 """Verifies the status of an RPC call.
1359 Since we aim to keep consistency should this node (the current
1360 master) fail, we will log errors if our rpc fail, and especially
1361 log the case when more than half of the nodes fails.
1363 @param result: the data as returned from the rpc call
1365 @param nodes: the list of nodes we made the call to
1367 @param failmsg: the identifier to be used for logging
1374 msg = result[node].fail_msg
1377 logging.error("RPC call %s (%s) failed on node %s: %s",
1378 result[node].call, failmsg, node, msg)
1380 success.append(node)
1382 # +1 for the master node
1383 if (len(success) + 1) < len(failed):
1384 # TODO: Handle failing nodes
1385 logging.error("More than half of the nodes failed")
1387 def _GetNodeIp(self):
1388 """Helper for returning the node name/ip list.
1390 @rtype: (list, list)
1391 @return: a tuple of two lists, the first one with the node
1392 names and the second one with the node addresses
1395 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1396 name_list = self._nodes.keys()
1397 addr_list = [self._nodes[name] for name in name_list]
1398 return name_list, addr_list
1400 def _UpdateJobQueueFile(self, file_name, data, replicate):
1401 """Writes a file locally and then replicates it to all nodes.
1403 This function will replace the contents of a file on the local
1404 node and then replicate it to all the other nodes we have.
1406 @type file_name: str
1407 @param file_name: the path of the file to be replicated
1409 @param data: the new contents of the file
1410 @type replicate: boolean
1411 @param replicate: whether to spread the changes to the remote nodes
1414 getents = runtime.GetEnts()
1415 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1416 gid=getents.masterd_gid)
1419 names, addrs = self._GetNodeIp()
1420 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1421 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1423 def _RenameFilesUnlocked(self, rename):
1424 """Renames a file locally and then replicate the change.
1426 This function will rename a file in the local queue directory
1427 and then replicate this rename to all the other nodes we have.
1429 @type rename: list of (old, new)
1430 @param rename: List containing tuples mapping old to new names
1433 # Rename them locally
1434 for old, new in rename:
1435 utils.RenameFile(old, new, mkdir=True)
1437 # ... and on all nodes
1438 names, addrs = self._GetNodeIp()
1439 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1440 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1443 def _FormatJobID(job_id):
1444 """Convert a job ID to string format.
1446 Currently this just does C{str(job_id)} after performing some
1447 checks, but if we want to change the job id format this will
1448 abstract this change.
1450 @type job_id: int or long
1451 @param job_id: the numeric job id
1453 @return: the formatted job id
1456 if not isinstance(job_id, (int, long)):
1457 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1459 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1464 def _GetArchiveDirectory(cls, job_id):
1465 """Returns the archive directory for a job.
1468 @param job_id: Job identifier
1470 @return: Directory name
1473 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1475 def _NewSerialsUnlocked(self, count):
1476 """Generates a new job identifier.
1478 Job identifiers are unique during the lifetime of a cluster.
1480 @type count: integer
1481 @param count: how many serials to return
1483 @return: a string representing the job identifier.
1488 serial = self._last_serial + count
1491 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1492 "%s\n" % serial, True)
1494 result = [self._FormatJobID(v)
1495 for v in range(self._last_serial, serial + 1)]
1496 # Keep it only if we were able to write the file
1497 self._last_serial = serial
1502 def _GetJobPath(job_id):
1503 """Returns the job file for a given job id.
1506 @param job_id: the job identifier
1508 @return: the path to the job file
1511 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1514 def _GetArchivedJobPath(cls, job_id):
1515 """Returns the archived job file for a give job id.
1518 @param job_id: the job identifier
1520 @return: the path to the archived job file
1523 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1524 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1526 def _GetJobIDsUnlocked(self, sort=True):
1527 """Return all known job IDs.
1529 The method only looks at disk because it's a requirement that all
1530 jobs are present on disk (so in the _memcache we don't have any
1534 @param sort: perform sorting on the returned job ids
1536 @return: the list of job IDs
1540 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1541 m = self._RE_JOB_FILE.match(filename)
1543 jlist.append(m.group(1))
1545 jlist = utils.NiceSort(jlist)
1548 def _LoadJobUnlocked(self, job_id):
1549 """Loads a job from the disk or memory.
1551 Given a job id, this will return the cached job object if
1552 existing, or try to load the job from the disk. If loading from
1553 disk, it will also add the job to the cache.
1555 @param job_id: the job id
1556 @rtype: L{_QueuedJob} or None
1557 @return: either None or the job object
1560 job = self._memcache.get(job_id, None)
1562 logging.debug("Found job %s in memcache", job_id)
1566 job = self._LoadJobFromDisk(job_id)
1569 except errors.JobFileCorrupted:
1570 old_path = self._GetJobPath(job_id)
1571 new_path = self._GetArchivedJobPath(job_id)
1572 if old_path == new_path:
1573 # job already archived (future case)
1574 logging.exception("Can't parse job %s", job_id)
1577 logging.exception("Can't parse job %s, will archive.", job_id)
1578 self._RenameFilesUnlocked([(old_path, new_path)])
1581 self._memcache[job_id] = job
1582 logging.debug("Added job %s to the cache", job_id)
1585 def _LoadJobFromDisk(self, job_id):
1586 """Load the given job file from disk.
1588 Given a job file, read, load and restore it in a _QueuedJob format.
1590 @type job_id: string
1591 @param job_id: job identifier
1592 @rtype: L{_QueuedJob} or None
1593 @return: either None or the job object
1596 filepath = self._GetJobPath(job_id)
1597 logging.debug("Loading job from %s", filepath)
1599 raw_data = utils.ReadFile(filepath)
1600 except EnvironmentError, err:
1601 if err.errno in (errno.ENOENT, ):
1606 data = serializer.LoadJson(raw_data)
1607 job = _QueuedJob.Restore(self, data)
1608 except Exception, err: # pylint: disable-msg=W0703
1609 raise errors.JobFileCorrupted(err)
1613 def SafeLoadJobFromDisk(self, job_id):
1614 """Load the given job file from disk.
1616 Given a job file, read, load and restore it in a _QueuedJob format.
1617 In case of error reading the job, it gets returned as None, and the
1618 exception is logged.
1620 @type job_id: string
1621 @param job_id: job identifier
1622 @rtype: L{_QueuedJob} or None
1623 @return: either None or the job object
1627 return self._LoadJobFromDisk(job_id)
1628 except (errors.JobFileCorrupted, EnvironmentError):
1629 logging.exception("Can't load/parse job %s", job_id)
1633 def _IsQueueMarkedDrain():
1634 """Check if the queue is marked from drain.
1636 This currently uses the queue drain file, which makes it a
1637 per-node flag. In the future this can be moved to the config file.
1640 @return: True of the job queue is marked for draining
1643 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1645 def _UpdateQueueSizeUnlocked(self):
1646 """Update the queue size.
1649 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1651 @locking.ssynchronized(_LOCK)
1653 def SetDrainFlag(self, drain_flag):
1654 """Sets the drain flag for the queue.
1656 @type drain_flag: boolean
1657 @param drain_flag: Whether to set or unset the drain flag
1660 getents = runtime.GetEnts()
1663 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1664 uid=getents.masterd_uid, gid=getents.masterd_gid)
1666 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1668 self._drained = drain_flag
1673 def _SubmitJobUnlocked(self, job_id, ops):
1674 """Create and store a new job.
1676 This enters the job into our job queue and also puts it on the new
1677 queue, in order for it to be picked up by the queue processors.
1679 @type job_id: job ID
1680 @param job_id: the job ID for the new job
1682 @param ops: The list of OpCodes that will become the new job.
1683 @rtype: L{_QueuedJob}
1684 @return: the job object to be queued
1685 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1686 @raise errors.JobQueueFull: if the job queue has too many jobs in it
1687 @raise errors.GenericError: If an opcode is not valid
1690 # Ok when sharing the big job queue lock, as the drain file is created when
1691 # the lock is exclusive.
1693 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1695 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1696 raise errors.JobQueueFull()
1698 job = _QueuedJob(self, job_id, ops)
1701 for idx, op in enumerate(job.ops):
1702 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1703 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1704 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1705 " are %s" % (idx, op.priority, allowed))
1708 self.UpdateJobUnlocked(job)
1710 self._queue_size += 1
1712 logging.debug("Adding new job %s to the cache", job_id)
1713 self._memcache[job_id] = job
1717 @locking.ssynchronized(_LOCK)
1719 def SubmitJob(self, ops):
1720 """Create and store a new job.
1722 @see: L{_SubmitJobUnlocked}
1725 job_id = self._NewSerialsUnlocked(1)[0]
1726 self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1729 @locking.ssynchronized(_LOCK)
1731 def SubmitManyJobs(self, jobs):
1732 """Create and store multiple jobs.
1734 @see: L{_SubmitJobUnlocked}
1739 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1740 for job_id, ops in zip(all_job_ids, jobs):
1742 added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1745 except errors.GenericError, err:
1748 results.append((status, data))
1750 self._EnqueueJobs(added_jobs)
1754 def _EnqueueJobs(self, jobs):
1755 """Helper function to add jobs to worker pool's queue.
1758 @param jobs: List of all jobs
1761 self._wpool.AddManyTasks([(job, ) for job in jobs],
1762 priority=[job.CalcPriority() for job in jobs])
1765 def UpdateJobUnlocked(self, job, replicate=True):
1766 """Update a job's on disk storage.
1768 After a job has been modified, this function needs to be called in
1769 order to write the changes to disk and replicate them to the other
1772 @type job: L{_QueuedJob}
1773 @param job: the changed job
1774 @type replicate: boolean
1775 @param replicate: whether to replicate the change to remote nodes
1778 filename = self._GetJobPath(job.id)
1779 data = serializer.DumpJson(job.Serialize(), indent=False)
1780 logging.debug("Writing job %s to %s", job.id, filename)
1781 self._UpdateJobQueueFile(filename, data, replicate)
1783 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1785 """Waits for changes in a job.
1787 @type job_id: string
1788 @param job_id: Job identifier
1789 @type fields: list of strings
1790 @param fields: Which fields to check for changes
1791 @type prev_job_info: list or None
1792 @param prev_job_info: Last job information returned
1793 @type prev_log_serial: int
1794 @param prev_log_serial: Last job message serial number
1795 @type timeout: float
1796 @param timeout: maximum time to wait in seconds
1797 @rtype: tuple (job info, log entries)
1798 @return: a tuple of the job information as required via
1799 the fields parameter, and the log entries as a list
1801 if the job has not changed and the timeout has expired,
1802 we instead return a special value,
1803 L{constants.JOB_NOTCHANGED}, which should be interpreted
1804 as such by the clients
1807 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1809 helper = _WaitForJobChangesHelper()
1811 return helper(self._GetJobPath(job_id), load_fn,
1812 fields, prev_job_info, prev_log_serial, timeout)
1814 @locking.ssynchronized(_LOCK)
1816 def CancelJob(self, job_id):
1819 This will only succeed if the job has not started yet.
1821 @type job_id: string
1822 @param job_id: job ID of job to be cancelled.
1825 logging.info("Cancelling job %s", job_id)
1827 job = self._LoadJobUnlocked(job_id)
1829 logging.debug("Job %s not found", job_id)
1830 return (False, "Job %s not found" % job_id)
1832 (success, msg) = job.Cancel()
1835 self.UpdateJobUnlocked(job)
1837 return (success, msg)
1840 def _ArchiveJobsUnlocked(self, jobs):
1843 @type jobs: list of L{_QueuedJob}
1844 @param jobs: Job objects
1846 @return: Number of archived jobs
1852 if job.CalcStatus() not in constants.JOBS_FINALIZED:
1853 logging.debug("Job %s is not yet done", job.id)
1856 archive_jobs.append(job)
1858 old = self._GetJobPath(job.id)
1859 new = self._GetArchivedJobPath(job.id)
1860 rename_files.append((old, new))
1862 # TODO: What if 1..n files fail to rename?
1863 self._RenameFilesUnlocked(rename_files)
1865 logging.debug("Successfully archived job(s) %s",
1866 utils.CommaJoin(job.id for job in archive_jobs))
1868 # Since we haven't quite checked, above, if we succeeded or failed renaming
1869 # the files, we update the cached queue size from the filesystem. When we
1870 # get around to fix the TODO: above, we can use the number of actually
1871 # archived jobs to fix this.
1872 self._UpdateQueueSizeUnlocked()
1873 return len(archive_jobs)
1875 @locking.ssynchronized(_LOCK)
1877 def ArchiveJob(self, job_id):
1880 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1882 @type job_id: string
1883 @param job_id: Job ID of job to be archived.
1885 @return: Whether job was archived
1888 logging.info("Archiving job %s", job_id)
1890 job = self._LoadJobUnlocked(job_id)
1892 logging.debug("Job %s not found", job_id)
1895 return self._ArchiveJobsUnlocked([job]) == 1
1897 @locking.ssynchronized(_LOCK)
1899 def AutoArchiveJobs(self, age, timeout):
1900 """Archives all jobs based on age.
1902 The method will archive all jobs which are older than the age
1903 parameter. For jobs that don't have an end timestamp, the start
1904 timestamp will be considered. The special '-1' age will cause
1905 archival of all jobs (that are not running or queued).
1908 @param age: the minimum age in seconds
1911 logging.info("Archiving jobs with age more than %s seconds", age)
1914 end_time = now + timeout
1918 all_job_ids = self._GetJobIDsUnlocked()
1920 for idx, job_id in enumerate(all_job_ids):
1921 last_touched = idx + 1
1923 # Not optimal because jobs could be pending
1924 # TODO: Measure average duration for job archival and take number of
1925 # pending jobs into account.
1926 if time.time() > end_time:
1929 # Returns None if the job failed to load
1930 job = self._LoadJobUnlocked(job_id)
1932 if job.end_timestamp is None:
1933 if job.start_timestamp is None:
1934 job_age = job.received_timestamp
1936 job_age = job.start_timestamp
1938 job_age = job.end_timestamp
1940 if age == -1 or now - job_age[0] > age:
1943 # Archive 10 jobs at a time
1944 if len(pending) >= 10:
1945 archived_count += self._ArchiveJobsUnlocked(pending)
1949 archived_count += self._ArchiveJobsUnlocked(pending)
1951 return (archived_count, len(all_job_ids) - last_touched)
1953 def QueryJobs(self, job_ids, fields):
1954 """Returns a list of jobs in queue.
1957 @param job_ids: sequence of job identifiers or None for all
1959 @param fields: names of fields to return
1961 @return: list one element per job, each element being list with
1962 the requested fields
1968 # Since files are added to/removed from the queue atomically, there's no
1969 # risk of getting the job ids in an inconsistent state.
1970 job_ids = self._GetJobIDsUnlocked()
1973 for job_id in job_ids:
1974 job = self.SafeLoadJobFromDisk(job_id)
1976 jobs.append(job.GetInfo(fields))
1982 @locking.ssynchronized(_LOCK)
1985 """Stops the job queue.
1987 This shutdowns all the worker threads an closes the queue.
1990 self._wpool.TerminateWorkers()
1992 self._queue_filelock.Close()
1993 self._queue_filelock = None