4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
39 # pylint: disable-msg=E0611
40 from pyinotify import pyinotify
44 from ganeti import asyncnotifier
45 from ganeti import constants
46 from ganeti import serializer
47 from ganeti import workerpool
48 from ganeti import locking
49 from ganeti import opcodes
50 from ganeti import errors
51 from ganeti import mcpu
52 from ganeti import utils
53 from ganeti import jstore
54 from ganeti import rpc
55 from ganeti import runtime
56 from ganeti import netutils
57 from ganeti import compat
61 JOBS_PER_ARCHIVE_DIRECTORY = 10000
63 # member lock names to be passed to @ssynchronized decorator
68 class CancelJob(Exception):
69 """Special exception to cancel a job.
75 """Returns the current timestamp.
78 @return: the current time in the (seconds, microseconds) format
81 return utils.SplitTime(time.time())
84 class _QueuedOpCode(object):
85 """Encapsulates an opcode object.
87 @ivar log: holds the execution log and consists of tuples
88 of the form C{(log_serial, timestamp, level, message)}
89 @ivar input: the OpCode we encapsulate
90 @ivar status: the current status
91 @ivar result: the result of the LU execution
92 @ivar start_timestamp: timestamp for the start of the execution
93 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94 @ivar stop_timestamp: timestamp for the end of the execution
97 __slots__ = ["input", "status", "result", "log", "priority",
98 "start_timestamp", "exec_timestamp", "end_timestamp",
101 def __init__(self, op):
102 """Constructor for the _QuededOpCode.
104 @type op: L{opcodes.OpCode}
105 @param op: the opcode we encapsulate
109 self.status = constants.OP_STATUS_QUEUED
112 self.start_timestamp = None
113 self.exec_timestamp = None
114 self.end_timestamp = None
116 # Get initial priority (it might change during the lifetime of this opcode)
117 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
120 def Restore(cls, state):
121 """Restore the _QueuedOpCode from the serialized form.
124 @param state: the serialized state
125 @rtype: _QueuedOpCode
126 @return: a new _QueuedOpCode instance
129 obj = _QueuedOpCode.__new__(cls)
130 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
131 obj.status = state["status"]
132 obj.result = state["result"]
133 obj.log = state["log"]
134 obj.start_timestamp = state.get("start_timestamp", None)
135 obj.exec_timestamp = state.get("exec_timestamp", None)
136 obj.end_timestamp = state.get("end_timestamp", None)
137 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
141 """Serializes this _QueuedOpCode.
144 @return: the dictionary holding the serialized state
148 "input": self.input.__getstate__(),
149 "status": self.status,
150 "result": self.result,
152 "start_timestamp": self.start_timestamp,
153 "exec_timestamp": self.exec_timestamp,
154 "end_timestamp": self.end_timestamp,
155 "priority": self.priority,
159 class _QueuedJob(object):
160 """In-memory job representation.
162 This is what we use to track the user-submitted jobs. Locking must
163 be taken care of by users of this class.
165 @type queue: L{JobQueue}
166 @ivar queue: the parent queue
169 @ivar ops: the list of _QueuedOpCode that constitute the job
170 @type log_serial: int
171 @ivar log_serial: holds the index for the next log entry
172 @ivar received_timestamp: the timestamp for when the job was received
173 @ivar start_timestmap: the timestamp for start of execution
174 @ivar end_timestamp: the timestamp for end of execution
177 # pylint: disable-msg=W0212
178 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
179 "received_timestamp", "start_timestamp", "end_timestamp",
182 def __init__(self, queue, job_id, ops):
183 """Constructor for the _QueuedJob.
185 @type queue: L{JobQueue}
186 @param queue: our parent queue
188 @param job_id: our job id
190 @param ops: the list of opcodes we hold, which will be encapsulated
195 raise errors.GenericError("A job needs at least one opcode")
199 self.ops = [_QueuedOpCode(op) for op in ops]
201 self.received_timestamp = TimeStampNow()
202 self.start_timestamp = None
203 self.end_timestamp = None
205 self._InitInMemory(self)
208 def _InitInMemory(obj):
209 """Initializes in-memory variables.
216 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
218 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
220 return "<%s at %#x>" % (" ".join(status), id(self))
223 def Restore(cls, queue, state):
224 """Restore a _QueuedJob from serialized state:
226 @type queue: L{JobQueue}
227 @param queue: to which queue the restored job belongs
229 @param state: the serialized state
231 @return: the restored _JobQueue instance
234 obj = _QueuedJob.__new__(cls)
237 obj.received_timestamp = state.get("received_timestamp", None)
238 obj.start_timestamp = state.get("start_timestamp", None)
239 obj.end_timestamp = state.get("end_timestamp", None)
243 for op_state in state["ops"]:
244 op = _QueuedOpCode.Restore(op_state)
245 for log_entry in op.log:
246 obj.log_serial = max(obj.log_serial, log_entry[0])
249 cls._InitInMemory(obj)
254 """Serialize the _JobQueue instance.
257 @return: the serialized state
262 "ops": [op.Serialize() for op in self.ops],
263 "start_timestamp": self.start_timestamp,
264 "end_timestamp": self.end_timestamp,
265 "received_timestamp": self.received_timestamp,
268 def CalcStatus(self):
269 """Compute the status of this job.
271 This function iterates over all the _QueuedOpCodes in the job and
272 based on their status, computes the job status.
275 - if we find a cancelled, or finished with error, the job
276 status will be the same
277 - otherwise, the last opcode with the status one of:
282 will determine the job status
284 - otherwise, it means either all opcodes are queued, or success,
285 and the job status will be the same
287 @return: the job status
290 status = constants.JOB_STATUS_QUEUED
294 if op.status == constants.OP_STATUS_SUCCESS:
299 if op.status == constants.OP_STATUS_QUEUED:
301 elif op.status == constants.OP_STATUS_WAITLOCK:
302 status = constants.JOB_STATUS_WAITLOCK
303 elif op.status == constants.OP_STATUS_RUNNING:
304 status = constants.JOB_STATUS_RUNNING
305 elif op.status == constants.OP_STATUS_CANCELING:
306 status = constants.JOB_STATUS_CANCELING
308 elif op.status == constants.OP_STATUS_ERROR:
309 status = constants.JOB_STATUS_ERROR
310 # The whole job fails if one opcode failed
312 elif op.status == constants.OP_STATUS_CANCELED:
313 status = constants.OP_STATUS_CANCELED
317 status = constants.JOB_STATUS_SUCCESS
321 def CalcPriority(self):
322 """Gets the current priority for this job.
324 Only unfinished opcodes are considered. When all are done, the default
330 priorities = [op.priority for op in self.ops
331 if op.status not in constants.OPS_FINALIZED]
334 # All opcodes are done, assume default priority
335 return constants.OP_PRIO_DEFAULT
337 return min(priorities)
339 def GetLogEntries(self, newer_than):
340 """Selectively returns the log entries.
342 @type newer_than: None or int
343 @param newer_than: if this is None, return all log entries,
344 otherwise return only the log entries with serial higher
347 @return: the list of the log entries selected
350 if newer_than is None:
357 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
361 def GetInfo(self, fields):
362 """Returns information about a job.
365 @param fields: names of fields to return
367 @return: list with one element for each field
368 @raise errors.OpExecError: when an invalid field
376 elif fname == "status":
377 row.append(self.CalcStatus())
378 elif fname == "priority":
379 row.append(self.CalcPriority())
381 row.append([op.input.__getstate__() for op in self.ops])
382 elif fname == "opresult":
383 row.append([op.result for op in self.ops])
384 elif fname == "opstatus":
385 row.append([op.status for op in self.ops])
386 elif fname == "oplog":
387 row.append([op.log for op in self.ops])
388 elif fname == "opstart":
389 row.append([op.start_timestamp for op in self.ops])
390 elif fname == "opexec":
391 row.append([op.exec_timestamp for op in self.ops])
392 elif fname == "opend":
393 row.append([op.end_timestamp for op in self.ops])
394 elif fname == "oppriority":
395 row.append([op.priority for op in self.ops])
396 elif fname == "received_ts":
397 row.append(self.received_timestamp)
398 elif fname == "start_ts":
399 row.append(self.start_timestamp)
400 elif fname == "end_ts":
401 row.append(self.end_timestamp)
402 elif fname == "summary":
403 row.append([op.input.Summary() for op in self.ops])
405 raise errors.OpExecError("Invalid self query field '%s'" % fname)
408 def MarkUnfinishedOps(self, status, result):
409 """Mark unfinished opcodes with a given status and result.
411 This is an utility function for marking all running or waiting to
412 be run opcodes with a given status. Opcodes which are already
413 finalised are not changed.
415 @param status: a given opcode status
416 @param result: the opcode result
421 if op.status in constants.OPS_FINALIZED:
422 assert not_marked, "Finalized opcodes found after non-finalized ones"
429 """Marks job as canceled/-ing if possible.
431 @rtype: tuple; (bool, string)
432 @return: Boolean describing whether job was successfully canceled or marked
433 as canceling and a text message
436 status = self.CalcStatus()
438 if status == constants.JOB_STATUS_QUEUED:
439 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
440 "Job canceled by request")
441 return (True, "Job %s canceled" % self.id)
443 elif status == constants.JOB_STATUS_WAITLOCK:
444 # The worker will notice the new status and cancel the job
445 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
446 return (True, "Job %s will be canceled" % self.id)
449 logging.debug("Job %s is no longer waiting in the queue", self.id)
450 return (False, "Job %s is no longer waiting in the queue" % self.id)
453 class _OpExecCallbacks(mcpu.OpExecCbBase):
454 def __init__(self, queue, job, op):
455 """Initializes this class.
457 @type queue: L{JobQueue}
458 @param queue: Job queue
459 @type job: L{_QueuedJob}
460 @param job: Job object
461 @type op: L{_QueuedOpCode}
465 assert queue, "Queue is missing"
466 assert job, "Job is missing"
467 assert op, "Opcode is missing"
473 def _CheckCancel(self):
474 """Raises an exception to cancel the job if asked to.
477 # Cancel here if we were asked to
478 if self._op.status == constants.OP_STATUS_CANCELING:
479 logging.debug("Canceling opcode")
482 @locking.ssynchronized(_QUEUE, shared=1)
483 def NotifyStart(self):
484 """Mark the opcode as running, not lock-waiting.
486 This is called from the mcpu code as a notifier function, when the LU is
487 finally about to start the Exec() method. Of course, to have end-user
488 visible results, the opcode must be initially (before calling into
489 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
492 assert self._op in self._job.ops
493 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
494 constants.OP_STATUS_CANCELING)
496 # Cancel here if we were asked to
499 logging.debug("Opcode is now running")
501 self._op.status = constants.OP_STATUS_RUNNING
502 self._op.exec_timestamp = TimeStampNow()
504 # And finally replicate the job status
505 self._queue.UpdateJobUnlocked(self._job)
507 @locking.ssynchronized(_QUEUE, shared=1)
508 def _AppendFeedback(self, timestamp, log_type, log_msg):
509 """Internal feedback append function, with locks
512 self._job.log_serial += 1
513 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
514 self._queue.UpdateJobUnlocked(self._job, replicate=False)
516 def Feedback(self, *args):
517 """Append a log entry.
523 log_type = constants.ELOG_MESSAGE
526 (log_type, log_msg) = args
528 # The time is split to make serialization easier and not lose
530 timestamp = utils.SplitTime(time.time())
531 self._AppendFeedback(timestamp, log_type, log_msg)
533 def CheckCancel(self):
534 """Check whether job has been cancelled.
537 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
538 constants.OP_STATUS_CANCELING)
540 # Cancel here if we were asked to
543 def SubmitManyJobs(self, jobs):
544 """Submits jobs for processing.
546 See L{JobQueue.SubmitManyJobs}.
549 # Locking is done in job queue
550 return self._queue.SubmitManyJobs(jobs)
553 class _JobChangesChecker(object):
554 def __init__(self, fields, prev_job_info, prev_log_serial):
555 """Initializes this class.
557 @type fields: list of strings
558 @param fields: Fields requested by LUXI client
559 @type prev_job_info: string
560 @param prev_job_info: previous job info, as passed by the LUXI client
561 @type prev_log_serial: string
562 @param prev_log_serial: previous job serial, as passed by the LUXI client
565 self._fields = fields
566 self._prev_job_info = prev_job_info
567 self._prev_log_serial = prev_log_serial
569 def __call__(self, job):
570 """Checks whether job has changed.
572 @type job: L{_QueuedJob}
573 @param job: Job object
576 status = job.CalcStatus()
577 job_info = job.GetInfo(self._fields)
578 log_entries = job.GetLogEntries(self._prev_log_serial)
580 # Serializing and deserializing data can cause type changes (e.g. from
581 # tuple to list) or precision loss. We're doing it here so that we get
582 # the same modifications as the data received from the client. Without
583 # this, the comparison afterwards might fail without the data being
584 # significantly different.
585 # TODO: we just deserialized from disk, investigate how to make sure that
586 # the job info and log entries are compatible to avoid this further step.
587 # TODO: Doing something like in testutils.py:UnifyValueType might be more
588 # efficient, though floats will be tricky
589 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
590 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
592 # Don't even try to wait if the job is no longer running, there will be
594 if (status not in (constants.JOB_STATUS_QUEUED,
595 constants.JOB_STATUS_RUNNING,
596 constants.JOB_STATUS_WAITLOCK) or
597 job_info != self._prev_job_info or
598 (log_entries and self._prev_log_serial != log_entries[0][0])):
599 logging.debug("Job %s changed", job.id)
600 return (job_info, log_entries)
605 class _JobFileChangesWaiter(object):
606 def __init__(self, filename):
607 """Initializes this class.
609 @type filename: string
610 @param filename: Path to job file
611 @raises errors.InotifyError: if the notifier cannot be setup
614 self._wm = pyinotify.WatchManager()
615 self._inotify_handler = \
616 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
618 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
620 self._inotify_handler.enable()
622 # pyinotify doesn't close file descriptors automatically
623 self._notifier.stop()
626 def _OnInotify(self, notifier_enabled):
627 """Callback for inotify.
630 if not notifier_enabled:
631 self._inotify_handler.enable()
633 def Wait(self, timeout):
634 """Waits for the job file to change.
637 @param timeout: Timeout in seconds
638 @return: Whether there have been events
642 have_events = self._notifier.check_events(timeout * 1000)
644 self._notifier.read_events()
645 self._notifier.process_events()
649 """Closes underlying notifier and its file descriptor.
652 self._notifier.stop()
655 class _JobChangesWaiter(object):
656 def __init__(self, filename):
657 """Initializes this class.
659 @type filename: string
660 @param filename: Path to job file
663 self._filewaiter = None
664 self._filename = filename
666 def Wait(self, timeout):
667 """Waits for a job to change.
670 @param timeout: Timeout in seconds
671 @return: Whether there have been events
675 return self._filewaiter.Wait(timeout)
677 # Lazy setup: Avoid inotify setup cost when job file has already changed.
678 # If this point is reached, return immediately and let caller check the job
679 # file again in case there were changes since the last check. This avoids a
681 self._filewaiter = _JobFileChangesWaiter(self._filename)
686 """Closes underlying waiter.
690 self._filewaiter.Close()
693 class _WaitForJobChangesHelper(object):
694 """Helper class using inotify to wait for changes in a job file.
696 This class takes a previous job status and serial, and alerts the client when
697 the current job status has changed.
701 def _CheckForChanges(job_load_fn, check_fn):
704 raise errors.JobLost()
706 result = check_fn(job)
708 raise utils.RetryAgain()
712 def __call__(self, filename, job_load_fn,
713 fields, prev_job_info, prev_log_serial, timeout):
714 """Waits for changes on a job.
716 @type filename: string
717 @param filename: File on which to wait for changes
718 @type job_load_fn: callable
719 @param job_load_fn: Function to load job
720 @type fields: list of strings
721 @param fields: Which fields to check for changes
722 @type prev_job_info: list or None
723 @param prev_job_info: Last job information returned
724 @type prev_log_serial: int
725 @param prev_log_serial: Last job message serial number
727 @param timeout: maximum time to wait in seconds
731 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
732 waiter = _JobChangesWaiter(filename)
734 return utils.Retry(compat.partial(self._CheckForChanges,
735 job_load_fn, check_fn),
736 utils.RETRY_REMAINING_TIME, timeout,
740 except (errors.InotifyError, errors.JobLost):
742 except utils.RetryTimeout:
743 return constants.JOB_NOTCHANGED
746 def _EncodeOpError(err):
747 """Encodes an error which occurred while processing an opcode.
750 if isinstance(err, errors.GenericError):
753 to_encode = errors.OpExecError(str(err))
755 return errors.EncodeException(to_encode)
758 class _TimeoutStrategyWrapper:
759 def __init__(self, fn):
760 """Initializes this class.
767 """Gets the next timeout if necessary.
770 if self._next is None:
771 self._next = self._fn()
774 """Returns the next timeout.
781 """Returns the current timeout and advances the internal state.
790 class _OpExecContext:
791 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
792 """Initializes this class.
797 self.log_prefix = log_prefix
798 self.summary = op.input.Summary()
800 self._timeout_strategy_factory = timeout_strategy_factory
801 self._ResetTimeoutStrategy()
803 def _ResetTimeoutStrategy(self):
804 """Creates a new timeout strategy.
807 self._timeout_strategy = \
808 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
810 def CheckPriorityIncrease(self):
811 """Checks whether priority can and should be increased.
813 Called when locks couldn't be acquired.
818 # Exhausted all retries and next round should not use blocking acquire
820 if (self._timeout_strategy.Peek() is None and
821 op.priority > constants.OP_PRIO_HIGHEST):
822 logging.debug("Increasing priority")
824 self._ResetTimeoutStrategy()
829 def GetNextLockTimeout(self):
830 """Returns the next lock acquire timeout.
833 return self._timeout_strategy.Next()
836 class _JobProcessor(object):
837 def __init__(self, queue, opexec_fn, job,
838 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
839 """Initializes this class.
843 self.opexec_fn = opexec_fn
845 self._timeout_strategy_factory = _timeout_strategy_factory
848 def _FindNextOpcode(job, timeout_strategy_factory):
849 """Locates the next opcode to run.
851 @type job: L{_QueuedJob}
852 @param job: Job object
853 @param timeout_strategy_factory: Callable to create new timeout strategy
856 # Create some sort of a cache to speed up locating next opcode for future
858 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
859 # pending and one for processed ops.
860 if job.ops_iter is None:
861 job.ops_iter = enumerate(job.ops)
863 # Find next opcode to run
866 (idx, op) = job.ops_iter.next()
867 except StopIteration:
868 raise errors.ProgrammerError("Called for a finished job")
870 if op.status == constants.OP_STATUS_RUNNING:
871 # Found an opcode already marked as running
872 raise errors.ProgrammerError("Called for job marked as running")
874 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
875 timeout_strategy_factory)
877 if op.status == constants.OP_STATUS_CANCELED:
878 # Cancelled jobs are handled by the caller
879 assert not compat.any(i.status != constants.OP_STATUS_CANCELED
880 for i in job.ops[idx:])
882 elif op.status in constants.OPS_FINALIZED:
883 # This is a job that was partially completed before master daemon
884 # shutdown, so it can be expected that some opcodes are already
885 # completed successfully (if any did error out, then the whole job
886 # should have been aborted and not resubmitted for processing).
887 logging.info("%s: opcode %s already processed, skipping",
888 opctx.log_prefix, opctx.summary)
894 def _MarkWaitlock(job, op):
895 """Marks an opcode as waiting for locks.
897 The job's start timestamp is also set if necessary.
899 @type job: L{_QueuedJob}
900 @param job: Job object
901 @type op: L{_QueuedOpCode}
902 @param op: Opcode object
906 assert op.status in (constants.OP_STATUS_QUEUED,
907 constants.OP_STATUS_WAITLOCK)
913 if op.status == constants.OP_STATUS_QUEUED:
914 op.status = constants.OP_STATUS_WAITLOCK
917 if op.start_timestamp is None:
918 op.start_timestamp = TimeStampNow()
921 if job.start_timestamp is None:
922 job.start_timestamp = op.start_timestamp
925 assert op.status == constants.OP_STATUS_WAITLOCK
929 def _ExecOpCodeUnlocked(self, opctx):
930 """Processes one opcode and returns the result.
935 assert op.status == constants.OP_STATUS_WAITLOCK
937 timeout = opctx.GetNextLockTimeout()
940 # Make sure not to hold queue lock while calling ExecOpCode
941 result = self.opexec_fn(op.input,
942 _OpExecCallbacks(self.queue, self.job, op),
943 timeout=timeout, priority=op.priority)
944 except mcpu.LockAcquireTimeout:
945 assert timeout is not None, "Received timeout for blocking acquire"
946 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
948 assert op.status in (constants.OP_STATUS_WAITLOCK,
949 constants.OP_STATUS_CANCELING)
951 # Was job cancelled while we were waiting for the lock?
952 if op.status == constants.OP_STATUS_CANCELING:
953 return (constants.OP_STATUS_CANCELING, None)
955 # Stay in waitlock while trying to re-acquire lock
956 return (constants.OP_STATUS_WAITLOCK, None)
958 logging.exception("%s: Canceling job", opctx.log_prefix)
959 assert op.status == constants.OP_STATUS_CANCELING
960 return (constants.OP_STATUS_CANCELING, None)
961 except Exception, err: # pylint: disable-msg=W0703
962 logging.exception("%s: Caught exception in %s",
963 opctx.log_prefix, opctx.summary)
964 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
966 logging.debug("%s: %s successful",
967 opctx.log_prefix, opctx.summary)
968 return (constants.OP_STATUS_SUCCESS, result)
970 def __call__(self, _nextop_fn=None):
971 """Continues execution of a job.
973 @param _nextop_fn: Callback function for tests
975 @return: True if job is finished, False if processor needs to be called
982 logging.debug("Processing job %s", job.id)
984 queue.acquire(shared=1)
986 opcount = len(job.ops)
988 # Is a previous opcode still pending?
990 opctx = job.cur_opctx
993 if __debug__ and _nextop_fn:
995 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1000 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1001 constants.OP_STATUS_CANCELING,
1002 constants.OP_STATUS_CANCELED)
1003 for i in job.ops[opctx.index + 1:])
1005 assert op.status in (constants.OP_STATUS_QUEUED,
1006 constants.OP_STATUS_WAITLOCK,
1007 constants.OP_STATUS_CANCELING,
1008 constants.OP_STATUS_CANCELED)
1010 assert (op.priority <= constants.OP_PRIO_LOWEST and
1011 op.priority >= constants.OP_PRIO_HIGHEST)
1013 if op.status not in (constants.OP_STATUS_CANCELING,
1014 constants.OP_STATUS_CANCELED):
1015 assert op.status in (constants.OP_STATUS_QUEUED,
1016 constants.OP_STATUS_WAITLOCK)
1018 # Prepare to start opcode
1019 if self._MarkWaitlock(job, op):
1021 queue.UpdateJobUnlocked(job)
1023 assert op.status == constants.OP_STATUS_WAITLOCK
1024 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1025 assert job.start_timestamp and op.start_timestamp
1027 logging.info("%s: opcode %s waiting for locks",
1028 opctx.log_prefix, opctx.summary)
1032 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1034 queue.acquire(shared=1)
1036 op.status = op_status
1037 op.result = op_result
1039 if op.status == constants.OP_STATUS_WAITLOCK:
1040 # Couldn't get locks in time
1041 assert not op.end_timestamp
1044 op.end_timestamp = TimeStampNow()
1046 if op.status == constants.OP_STATUS_CANCELING:
1047 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1048 for i in job.ops[opctx.index:])
1050 assert op.status in constants.OPS_FINALIZED
1052 if op.status == constants.OP_STATUS_WAITLOCK:
1055 if opctx.CheckPriorityIncrease():
1056 # Priority was changed, need to update on-disk file
1057 queue.UpdateJobUnlocked(job)
1059 # Keep around for another round
1060 job.cur_opctx = opctx
1062 assert (op.priority <= constants.OP_PRIO_LOWEST and
1063 op.priority >= constants.OP_PRIO_HIGHEST)
1065 # In no case must the status be finalized here
1066 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1069 # Ensure all opcodes so far have been successful
1070 assert (opctx.index == 0 or
1071 compat.all(i.status == constants.OP_STATUS_SUCCESS
1072 for i in job.ops[:opctx.index]))
1075 job.cur_opctx = None
1077 if op.status == constants.OP_STATUS_SUCCESS:
1080 elif op.status == constants.OP_STATUS_ERROR:
1081 # Ensure failed opcode has an exception as its result
1082 assert errors.GetEncodedError(job.ops[opctx.index].result)
1084 to_encode = errors.OpExecError("Preceding opcode failed")
1085 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1086 _EncodeOpError(to_encode))
1090 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1091 errors.GetEncodedError(i.result)
1092 for i in job.ops[opctx.index:])
1094 elif op.status == constants.OP_STATUS_CANCELING:
1095 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1096 "Job canceled by request")
1099 elif op.status == constants.OP_STATUS_CANCELED:
1103 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1105 # Finalizing or last opcode?
1106 if finalize or opctx.index == (opcount - 1):
1107 # All opcodes have been run, finalize job
1108 job.end_timestamp = TimeStampNow()
1110 # Write to disk. If the job status is final, this is the final write
1111 # allowed. Once the file has been written, it can be archived anytime.
1112 queue.UpdateJobUnlocked(job)
1114 if finalize or opctx.index == (opcount - 1):
1115 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1123 class _JobQueueWorker(workerpool.BaseWorker):
1124 """The actual job workers.
1127 def RunTask(self, job): # pylint: disable-msg=W0221
1130 This functions processes a job. It is closely tied to the L{_QueuedJob} and
1131 L{_QueuedOpCode} classes.
1133 @type job: L{_QueuedJob}
1134 @param job: the job to be processed
1138 assert queue == self.pool.queue
1140 self.SetTaskName("Job%s" % job.id)
1142 proc = mcpu.Processor(queue.context, job.id)
1144 if not _JobProcessor(queue, proc.ExecOpCode, job)():
1146 raise workerpool.DeferTask(priority=job.CalcPriority())
1149 class _JobQueueWorkerPool(workerpool.WorkerPool):
1150 """Simple class implementing a job-processing workerpool.
1153 def __init__(self, queue):
1154 super(_JobQueueWorkerPool, self).__init__("JobQueue",
1160 def _RequireOpenQueue(fn):
1161 """Decorator for "public" functions.
1163 This function should be used for all 'public' functions. That is,
1164 functions usually called from other classes. Note that this should
1165 be applied only to methods (not plain functions), since it expects
1166 that the decorated function is called with a first argument that has
1167 a '_queue_filelock' argument.
1169 @warning: Use this decorator only after locking.ssynchronized
1172 @locking.ssynchronized(_LOCK)
1178 def wrapper(self, *args, **kwargs):
1179 # pylint: disable-msg=W0212
1180 assert self._queue_filelock is not None, "Queue should be open"
1181 return fn(self, *args, **kwargs)
1185 class JobQueue(object):
1186 """Queue used to manage the jobs.
1188 @cvar _RE_JOB_FILE: regex matching the valid job file names
1191 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1193 def __init__(self, context):
1194 """Constructor for JobQueue.
1196 The constructor will initialize the job queue object and then
1197 start loading the current jobs from disk, either for starting them
1198 (if they were queue) or for aborting them (if they were already
1201 @type context: GanetiContext
1202 @param context: the context object for access to the configuration
1203 data and other ganeti objects
1206 self.context = context
1207 self._memcache = weakref.WeakValueDictionary()
1208 self._my_hostname = netutils.Hostname.GetSysName()
1210 # The Big JobQueue lock. If a code block or method acquires it in shared
1211 # mode safe it must guarantee concurrency with all the code acquiring it in
1212 # shared mode, including itself. In order not to acquire it at all
1213 # concurrency must be guaranteed with all code acquiring it in shared mode
1214 # and all code acquiring it exclusively.
1215 self._lock = locking.SharedLock("JobQueue")
1217 self.acquire = self._lock.acquire
1218 self.release = self._lock.release
1220 # Initialize the queue, and acquire the filelock.
1221 # This ensures no other process is working on the job queue.
1222 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1225 self._last_serial = jstore.ReadSerial()
1226 assert self._last_serial is not None, ("Serial file was modified between"
1227 " check in jstore and here")
1229 # Get initial list of nodes
1230 self._nodes = dict((n.name, n.primary_ip)
1231 for n in self.context.cfg.GetAllNodesInfo().values()
1232 if n.master_candidate)
1234 # Remove master node
1235 self._nodes.pop(self._my_hostname, None)
1237 # TODO: Check consistency across nodes
1239 self._queue_size = 0
1240 self._UpdateQueueSizeUnlocked()
1241 self._drained = jstore.CheckDrainFlag()
1244 self._wpool = _JobQueueWorkerPool(self)
1246 self._InspectQueue()
1248 self._wpool.TerminateWorkers()
1251 @locking.ssynchronized(_LOCK)
1253 def _InspectQueue(self):
1254 """Loads the whole job queue and resumes unfinished jobs.
1256 This function needs the lock here because WorkerPool.AddTask() may start a
1257 job while we're still doing our work.
1260 logging.info("Inspecting job queue")
1264 all_job_ids = self._GetJobIDsUnlocked()
1265 jobs_count = len(all_job_ids)
1266 lastinfo = time.time()
1267 for idx, job_id in enumerate(all_job_ids):
1268 # Give an update every 1000 jobs or 10 seconds
1269 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1270 idx == (jobs_count - 1)):
1271 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1272 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1273 lastinfo = time.time()
1275 job = self._LoadJobUnlocked(job_id)
1277 # a failure in loading the job can cause 'None' to be returned
1281 status = job.CalcStatus()
1283 if status == constants.JOB_STATUS_QUEUED:
1284 restartjobs.append(job)
1286 elif status in (constants.JOB_STATUS_RUNNING,
1287 constants.JOB_STATUS_WAITLOCK,
1288 constants.JOB_STATUS_CANCELING):
1289 logging.warning("Unfinished job %s found: %s", job.id, job)
1291 if status == constants.JOB_STATUS_WAITLOCK:
1293 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1294 restartjobs.append(job)
1296 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1297 "Unclean master daemon shutdown")
1299 self.UpdateJobUnlocked(job)
1302 logging.info("Restarting %s jobs", len(restartjobs))
1303 self._EnqueueJobs(restartjobs)
1305 logging.info("Job queue inspection finished")
1307 @locking.ssynchronized(_LOCK)
1309 def AddNode(self, node):
1310 """Register a new node with the queue.
1312 @type node: L{objects.Node}
1313 @param node: the node object to be added
1316 node_name = node.name
1317 assert node_name != self._my_hostname
1319 # Clean queue directory on added node
1320 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1321 msg = result.fail_msg
1323 logging.warning("Cannot cleanup queue directory on node %s: %s",
1326 if not node.master_candidate:
1327 # remove if existing, ignoring errors
1328 self._nodes.pop(node_name, None)
1329 # and skip the replication of the job ids
1332 # Upload the whole queue excluding archived jobs
1333 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1335 # Upload current serial file
1336 files.append(constants.JOB_QUEUE_SERIAL_FILE)
1338 for file_name in files:
1340 content = utils.ReadFile(file_name)
1342 result = rpc.RpcRunner.call_jobqueue_update([node_name],
1345 msg = result[node_name].fail_msg
1347 logging.error("Failed to upload file %s to node %s: %s",
1348 file_name, node_name, msg)
1350 self._nodes[node_name] = node.primary_ip
1352 @locking.ssynchronized(_LOCK)
1354 def RemoveNode(self, node_name):
1355 """Callback called when removing nodes from the cluster.
1357 @type node_name: str
1358 @param node_name: the name of the node to remove
1361 self._nodes.pop(node_name, None)
1364 def _CheckRpcResult(result, nodes, failmsg):
1365 """Verifies the status of an RPC call.
1367 Since we aim to keep consistency should this node (the current
1368 master) fail, we will log errors if our rpc fail, and especially
1369 log the case when more than half of the nodes fails.
1371 @param result: the data as returned from the rpc call
1373 @param nodes: the list of nodes we made the call to
1375 @param failmsg: the identifier to be used for logging
1382 msg = result[node].fail_msg
1385 logging.error("RPC call %s (%s) failed on node %s: %s",
1386 result[node].call, failmsg, node, msg)
1388 success.append(node)
1390 # +1 for the master node
1391 if (len(success) + 1) < len(failed):
1392 # TODO: Handle failing nodes
1393 logging.error("More than half of the nodes failed")
1395 def _GetNodeIp(self):
1396 """Helper for returning the node name/ip list.
1398 @rtype: (list, list)
1399 @return: a tuple of two lists, the first one with the node
1400 names and the second one with the node addresses
1403 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1404 name_list = self._nodes.keys()
1405 addr_list = [self._nodes[name] for name in name_list]
1406 return name_list, addr_list
1408 def _UpdateJobQueueFile(self, file_name, data, replicate):
1409 """Writes a file locally and then replicates it to all nodes.
1411 This function will replace the contents of a file on the local
1412 node and then replicate it to all the other nodes we have.
1414 @type file_name: str
1415 @param file_name: the path of the file to be replicated
1417 @param data: the new contents of the file
1418 @type replicate: boolean
1419 @param replicate: whether to spread the changes to the remote nodes
1422 getents = runtime.GetEnts()
1423 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1424 gid=getents.masterd_gid)
1427 names, addrs = self._GetNodeIp()
1428 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1429 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1431 def _RenameFilesUnlocked(self, rename):
1432 """Renames a file locally and then replicate the change.
1434 This function will rename a file in the local queue directory
1435 and then replicate this rename to all the other nodes we have.
1437 @type rename: list of (old, new)
1438 @param rename: List containing tuples mapping old to new names
1441 # Rename them locally
1442 for old, new in rename:
1443 utils.RenameFile(old, new, mkdir=True)
1445 # ... and on all nodes
1446 names, addrs = self._GetNodeIp()
1447 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1448 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1451 def _FormatJobID(job_id):
1452 """Convert a job ID to string format.
1454 Currently this just does C{str(job_id)} after performing some
1455 checks, but if we want to change the job id format this will
1456 abstract this change.
1458 @type job_id: int or long
1459 @param job_id: the numeric job id
1461 @return: the formatted job id
1464 if not isinstance(job_id, (int, long)):
1465 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1467 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1472 def _GetArchiveDirectory(cls, job_id):
1473 """Returns the archive directory for a job.
1476 @param job_id: Job identifier
1478 @return: Directory name
1481 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1483 def _NewSerialsUnlocked(self, count):
1484 """Generates a new job identifier.
1486 Job identifiers are unique during the lifetime of a cluster.
1488 @type count: integer
1489 @param count: how many serials to return
1491 @return: a string representing the job identifier.
1496 serial = self._last_serial + count
1499 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1500 "%s\n" % serial, True)
1502 result = [self._FormatJobID(v)
1503 for v in range(self._last_serial, serial + 1)]
1504 # Keep it only if we were able to write the file
1505 self._last_serial = serial
1510 def _GetJobPath(job_id):
1511 """Returns the job file for a given job id.
1514 @param job_id: the job identifier
1516 @return: the path to the job file
1519 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1522 def _GetArchivedJobPath(cls, job_id):
1523 """Returns the archived job file for a give job id.
1526 @param job_id: the job identifier
1528 @return: the path to the archived job file
1531 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1532 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1534 def _GetJobIDsUnlocked(self, sort=True):
1535 """Return all known job IDs.
1537 The method only looks at disk because it's a requirement that all
1538 jobs are present on disk (so in the _memcache we don't have any
1542 @param sort: perform sorting on the returned job ids
1544 @return: the list of job IDs
1548 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1549 m = self._RE_JOB_FILE.match(filename)
1551 jlist.append(m.group(1))
1553 jlist = utils.NiceSort(jlist)
1556 def _LoadJobUnlocked(self, job_id):
1557 """Loads a job from the disk or memory.
1559 Given a job id, this will return the cached job object if
1560 existing, or try to load the job from the disk. If loading from
1561 disk, it will also add the job to the cache.
1563 @param job_id: the job id
1564 @rtype: L{_QueuedJob} or None
1565 @return: either None or the job object
1568 job = self._memcache.get(job_id, None)
1570 logging.debug("Found job %s in memcache", job_id)
1574 job = self._LoadJobFromDisk(job_id)
1577 except errors.JobFileCorrupted:
1578 old_path = self._GetJobPath(job_id)
1579 new_path = self._GetArchivedJobPath(job_id)
1580 if old_path == new_path:
1581 # job already archived (future case)
1582 logging.exception("Can't parse job %s", job_id)
1585 logging.exception("Can't parse job %s, will archive.", job_id)
1586 self._RenameFilesUnlocked([(old_path, new_path)])
1589 self._memcache[job_id] = job
1590 logging.debug("Added job %s to the cache", job_id)
1593 def _LoadJobFromDisk(self, job_id):
1594 """Load the given job file from disk.
1596 Given a job file, read, load and restore it in a _QueuedJob format.
1598 @type job_id: string
1599 @param job_id: job identifier
1600 @rtype: L{_QueuedJob} or None
1601 @return: either None or the job object
1604 filepath = self._GetJobPath(job_id)
1605 logging.debug("Loading job from %s", filepath)
1607 raw_data = utils.ReadFile(filepath)
1608 except EnvironmentError, err:
1609 if err.errno in (errno.ENOENT, ):
1614 data = serializer.LoadJson(raw_data)
1615 job = _QueuedJob.Restore(self, data)
1616 except Exception, err: # pylint: disable-msg=W0703
1617 raise errors.JobFileCorrupted(err)
1621 def SafeLoadJobFromDisk(self, job_id):
1622 """Load the given job file from disk.
1624 Given a job file, read, load and restore it in a _QueuedJob format.
1625 In case of error reading the job, it gets returned as None, and the
1626 exception is logged.
1628 @type job_id: string
1629 @param job_id: job identifier
1630 @rtype: L{_QueuedJob} or None
1631 @return: either None or the job object
1635 return self._LoadJobFromDisk(job_id)
1636 except (errors.JobFileCorrupted, EnvironmentError):
1637 logging.exception("Can't load/parse job %s", job_id)
1640 def _UpdateQueueSizeUnlocked(self):
1641 """Update the queue size.
1644 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1646 @locking.ssynchronized(_LOCK)
1648 def SetDrainFlag(self, drain_flag):
1649 """Sets the drain flag for the queue.
1651 @type drain_flag: boolean
1652 @param drain_flag: Whether to set or unset the drain flag
1655 jstore.SetDrainFlag(drain_flag)
1657 self._drained = drain_flag
1662 def _SubmitJobUnlocked(self, job_id, ops):
1663 """Create and store a new job.
1665 This enters the job into our job queue and also puts it on the new
1666 queue, in order for it to be picked up by the queue processors.
1668 @type job_id: job ID
1669 @param job_id: the job ID for the new job
1671 @param ops: The list of OpCodes that will become the new job.
1672 @rtype: L{_QueuedJob}
1673 @return: the job object to be queued
1674 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1675 @raise errors.JobQueueFull: if the job queue has too many jobs in it
1676 @raise errors.GenericError: If an opcode is not valid
1679 # Ok when sharing the big job queue lock, as the drain file is created when
1680 # the lock is exclusive.
1682 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1684 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1685 raise errors.JobQueueFull()
1687 job = _QueuedJob(self, job_id, ops)
1690 for idx, op in enumerate(job.ops):
1691 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1692 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1693 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1694 " are %s" % (idx, op.priority, allowed))
1697 self.UpdateJobUnlocked(job)
1699 self._queue_size += 1
1701 logging.debug("Adding new job %s to the cache", job_id)
1702 self._memcache[job_id] = job
1706 @locking.ssynchronized(_LOCK)
1708 def SubmitJob(self, ops):
1709 """Create and store a new job.
1711 @see: L{_SubmitJobUnlocked}
1714 job_id = self._NewSerialsUnlocked(1)[0]
1715 self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1718 @locking.ssynchronized(_LOCK)
1720 def SubmitManyJobs(self, jobs):
1721 """Create and store multiple jobs.
1723 @see: L{_SubmitJobUnlocked}
1728 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1729 for job_id, ops in zip(all_job_ids, jobs):
1731 added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1734 except errors.GenericError, err:
1735 data = ("%s; opcodes %s" %
1736 (err, utils.CommaJoin(op.Summary() for op in ops)))
1738 results.append((status, data))
1740 self._EnqueueJobs(added_jobs)
1744 def _EnqueueJobs(self, jobs):
1745 """Helper function to add jobs to worker pool's queue.
1748 @param jobs: List of all jobs
1751 self._wpool.AddManyTasks([(job, ) for job in jobs],
1752 priority=[job.CalcPriority() for job in jobs])
1755 def UpdateJobUnlocked(self, job, replicate=True):
1756 """Update a job's on disk storage.
1758 After a job has been modified, this function needs to be called in
1759 order to write the changes to disk and replicate them to the other
1762 @type job: L{_QueuedJob}
1763 @param job: the changed job
1764 @type replicate: boolean
1765 @param replicate: whether to replicate the change to remote nodes
1768 filename = self._GetJobPath(job.id)
1769 data = serializer.DumpJson(job.Serialize(), indent=False)
1770 logging.debug("Writing job %s to %s", job.id, filename)
1771 self._UpdateJobQueueFile(filename, data, replicate)
1773 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1775 """Waits for changes in a job.
1777 @type job_id: string
1778 @param job_id: Job identifier
1779 @type fields: list of strings
1780 @param fields: Which fields to check for changes
1781 @type prev_job_info: list or None
1782 @param prev_job_info: Last job information returned
1783 @type prev_log_serial: int
1784 @param prev_log_serial: Last job message serial number
1785 @type timeout: float
1786 @param timeout: maximum time to wait in seconds
1787 @rtype: tuple (job info, log entries)
1788 @return: a tuple of the job information as required via
1789 the fields parameter, and the log entries as a list
1791 if the job has not changed and the timeout has expired,
1792 we instead return a special value,
1793 L{constants.JOB_NOTCHANGED}, which should be interpreted
1794 as such by the clients
1797 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1799 helper = _WaitForJobChangesHelper()
1801 return helper(self._GetJobPath(job_id), load_fn,
1802 fields, prev_job_info, prev_log_serial, timeout)
1804 @locking.ssynchronized(_LOCK)
1806 def CancelJob(self, job_id):
1809 This will only succeed if the job has not started yet.
1811 @type job_id: string
1812 @param job_id: job ID of job to be cancelled.
1815 logging.info("Cancelling job %s", job_id)
1817 job = self._LoadJobUnlocked(job_id)
1819 logging.debug("Job %s not found", job_id)
1820 return (False, "Job %s not found" % job_id)
1822 (success, msg) = job.Cancel()
1825 self.UpdateJobUnlocked(job)
1827 return (success, msg)
1830 def _ArchiveJobsUnlocked(self, jobs):
1833 @type jobs: list of L{_QueuedJob}
1834 @param jobs: Job objects
1836 @return: Number of archived jobs
1842 if job.CalcStatus() not in constants.JOBS_FINALIZED:
1843 logging.debug("Job %s is not yet done", job.id)
1846 archive_jobs.append(job)
1848 old = self._GetJobPath(job.id)
1849 new = self._GetArchivedJobPath(job.id)
1850 rename_files.append((old, new))
1852 # TODO: What if 1..n files fail to rename?
1853 self._RenameFilesUnlocked(rename_files)
1855 logging.debug("Successfully archived job(s) %s",
1856 utils.CommaJoin(job.id for job in archive_jobs))
1858 # Since we haven't quite checked, above, if we succeeded or failed renaming
1859 # the files, we update the cached queue size from the filesystem. When we
1860 # get around to fix the TODO: above, we can use the number of actually
1861 # archived jobs to fix this.
1862 self._UpdateQueueSizeUnlocked()
1863 return len(archive_jobs)
1865 @locking.ssynchronized(_LOCK)
1867 def ArchiveJob(self, job_id):
1870 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1872 @type job_id: string
1873 @param job_id: Job ID of job to be archived.
1875 @return: Whether job was archived
1878 logging.info("Archiving job %s", job_id)
1880 job = self._LoadJobUnlocked(job_id)
1882 logging.debug("Job %s not found", job_id)
1885 return self._ArchiveJobsUnlocked([job]) == 1
1887 @locking.ssynchronized(_LOCK)
1889 def AutoArchiveJobs(self, age, timeout):
1890 """Archives all jobs based on age.
1892 The method will archive all jobs which are older than the age
1893 parameter. For jobs that don't have an end timestamp, the start
1894 timestamp will be considered. The special '-1' age will cause
1895 archival of all jobs (that are not running or queued).
1898 @param age: the minimum age in seconds
1901 logging.info("Archiving jobs with age more than %s seconds", age)
1904 end_time = now + timeout
1908 all_job_ids = self._GetJobIDsUnlocked()
1910 for idx, job_id in enumerate(all_job_ids):
1911 last_touched = idx + 1
1913 # Not optimal because jobs could be pending
1914 # TODO: Measure average duration for job archival and take number of
1915 # pending jobs into account.
1916 if time.time() > end_time:
1919 # Returns None if the job failed to load
1920 job = self._LoadJobUnlocked(job_id)
1922 if job.end_timestamp is None:
1923 if job.start_timestamp is None:
1924 job_age = job.received_timestamp
1926 job_age = job.start_timestamp
1928 job_age = job.end_timestamp
1930 if age == -1 or now - job_age[0] > age:
1933 # Archive 10 jobs at a time
1934 if len(pending) >= 10:
1935 archived_count += self._ArchiveJobsUnlocked(pending)
1939 archived_count += self._ArchiveJobsUnlocked(pending)
1941 return (archived_count, len(all_job_ids) - last_touched)
1943 def QueryJobs(self, job_ids, fields):
1944 """Returns a list of jobs in queue.
1947 @param job_ids: sequence of job identifiers or None for all
1949 @param fields: names of fields to return
1951 @return: list one element per job, each element being list with
1952 the requested fields
1958 # Since files are added to/removed from the queue atomically, there's no
1959 # risk of getting the job ids in an inconsistent state.
1960 job_ids = self._GetJobIDsUnlocked()
1963 for job_id in job_ids:
1964 job = self.SafeLoadJobFromDisk(job_id)
1966 jobs.append(job.GetInfo(fields))
1972 @locking.ssynchronized(_LOCK)
1975 """Stops the job queue.
1977 This shutdowns all the worker threads an closes the queue.
1980 self._wpool.TerminateWorkers()
1982 self._queue_filelock.Close()
1983 self._queue_filelock = None