4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 # pylint: disable-msg=E0611
41 from pyinotify import pyinotify
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
62 JOBS_PER_ARCHIVE_DIRECTORY = 10000
64 # member lock names to be passed to @ssynchronized decorator
69 class CancelJob(Exception):
70 """Special exception to cancel a job.
76 """Returns the current timestamp.
79 @return: the current time in the (seconds, microseconds) format
82 return utils.SplitTime(time.time())
85 class _QueuedOpCode(object):
86 """Encapsulates an opcode object.
88 @ivar log: holds the execution log and consists of tuples
89 of the form C{(log_serial, timestamp, level, message)}
90 @ivar input: the OpCode we encapsulate
91 @ivar status: the current status
92 @ivar result: the result of the LU execution
93 @ivar start_timestamp: timestamp for the start of the execution
94 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95 @ivar stop_timestamp: timestamp for the end of the execution
98 __slots__ = ["input", "status", "result", "log", "priority",
99 "start_timestamp", "exec_timestamp", "end_timestamp",
102 def __init__(self, op):
103 """Constructor for the _QuededOpCode.
105 @type op: L{opcodes.OpCode}
106 @param op: the opcode we encapsulate
110 self.status = constants.OP_STATUS_QUEUED
113 self.start_timestamp = None
114 self.exec_timestamp = None
115 self.end_timestamp = None
117 # Get initial priority (it might change during the lifetime of this opcode)
118 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
121 def Restore(cls, state):
122 """Restore the _QueuedOpCode from the serialized form.
125 @param state: the serialized state
126 @rtype: _QueuedOpCode
127 @return: a new _QueuedOpCode instance
130 obj = _QueuedOpCode.__new__(cls)
131 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132 obj.status = state["status"]
133 obj.result = state["result"]
134 obj.log = state["log"]
135 obj.start_timestamp = state.get("start_timestamp", None)
136 obj.exec_timestamp = state.get("exec_timestamp", None)
137 obj.end_timestamp = state.get("end_timestamp", None)
138 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
142 """Serializes this _QueuedOpCode.
145 @return: the dictionary holding the serialized state
149 "input": self.input.__getstate__(),
150 "status": self.status,
151 "result": self.result,
153 "start_timestamp": self.start_timestamp,
154 "exec_timestamp": self.exec_timestamp,
155 "end_timestamp": self.end_timestamp,
156 "priority": self.priority,
160 class _QueuedJob(object):
161 """In-memory job representation.
163 This is what we use to track the user-submitted jobs. Locking must
164 be taken care of by users of this class.
166 @type queue: L{JobQueue}
167 @ivar queue: the parent queue
170 @ivar ops: the list of _QueuedOpCode that constitute the job
171 @type log_serial: int
172 @ivar log_serial: holds the index for the next log entry
173 @ivar received_timestamp: the timestamp for when the job was received
174 @ivar start_timestmap: the timestamp for start of execution
175 @ivar end_timestamp: the timestamp for end of execution
178 # pylint: disable-msg=W0212
179 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
180 "received_timestamp", "start_timestamp", "end_timestamp",
183 def __init__(self, queue, job_id, ops):
184 """Constructor for the _QueuedJob.
186 @type queue: L{JobQueue}
187 @param queue: our parent queue
189 @param job_id: our job id
191 @param ops: the list of opcodes we hold, which will be encapsulated
196 raise errors.GenericError("A job needs at least one opcode")
200 self.ops = [_QueuedOpCode(op) for op in ops]
202 self.received_timestamp = TimeStampNow()
203 self.start_timestamp = None
204 self.end_timestamp = None
206 self._InitInMemory(self)
209 def _InitInMemory(obj):
210 """Initializes in-memory variables.
217 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
219 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
221 return "<%s at %#x>" % (" ".join(status), id(self))
224 def Restore(cls, queue, state):
225 """Restore a _QueuedJob from serialized state:
227 @type queue: L{JobQueue}
228 @param queue: to which queue the restored job belongs
230 @param state: the serialized state
232 @return: the restored _JobQueue instance
235 obj = _QueuedJob.__new__(cls)
238 obj.received_timestamp = state.get("received_timestamp", None)
239 obj.start_timestamp = state.get("start_timestamp", None)
240 obj.end_timestamp = state.get("end_timestamp", None)
244 for op_state in state["ops"]:
245 op = _QueuedOpCode.Restore(op_state)
246 for log_entry in op.log:
247 obj.log_serial = max(obj.log_serial, log_entry[0])
250 cls._InitInMemory(obj)
255 """Serialize the _JobQueue instance.
258 @return: the serialized state
263 "ops": [op.Serialize() for op in self.ops],
264 "start_timestamp": self.start_timestamp,
265 "end_timestamp": self.end_timestamp,
266 "received_timestamp": self.received_timestamp,
269 def CalcStatus(self):
270 """Compute the status of this job.
272 This function iterates over all the _QueuedOpCodes in the job and
273 based on their status, computes the job status.
276 - if we find a cancelled, or finished with error, the job
277 status will be the same
278 - otherwise, the last opcode with the status one of:
283 will determine the job status
285 - otherwise, it means either all opcodes are queued, or success,
286 and the job status will be the same
288 @return: the job status
291 status = constants.JOB_STATUS_QUEUED
295 if op.status == constants.OP_STATUS_SUCCESS:
300 if op.status == constants.OP_STATUS_QUEUED:
302 elif op.status == constants.OP_STATUS_WAITLOCK:
303 status = constants.JOB_STATUS_WAITLOCK
304 elif op.status == constants.OP_STATUS_RUNNING:
305 status = constants.JOB_STATUS_RUNNING
306 elif op.status == constants.OP_STATUS_CANCELING:
307 status = constants.JOB_STATUS_CANCELING
309 elif op.status == constants.OP_STATUS_ERROR:
310 status = constants.JOB_STATUS_ERROR
311 # The whole job fails if one opcode failed
313 elif op.status == constants.OP_STATUS_CANCELED:
314 status = constants.OP_STATUS_CANCELED
318 status = constants.JOB_STATUS_SUCCESS
322 def CalcPriority(self):
323 """Gets the current priority for this job.
325 Only unfinished opcodes are considered. When all are done, the default
331 priorities = [op.priority for op in self.ops
332 if op.status not in constants.OPS_FINALIZED]
335 # All opcodes are done, assume default priority
336 return constants.OP_PRIO_DEFAULT
338 return min(priorities)
340 def GetLogEntries(self, newer_than):
341 """Selectively returns the log entries.
343 @type newer_than: None or int
344 @param newer_than: if this is None, return all log entries,
345 otherwise return only the log entries with serial higher
348 @return: the list of the log entries selected
351 if newer_than is None:
358 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
362 def GetInfo(self, fields):
363 """Returns information about a job.
366 @param fields: names of fields to return
368 @return: list with one element for each field
369 @raise errors.OpExecError: when an invalid field
377 elif fname == "status":
378 row.append(self.CalcStatus())
379 elif fname == "priority":
380 row.append(self.CalcPriority())
382 row.append([op.input.__getstate__() for op in self.ops])
383 elif fname == "opresult":
384 row.append([op.result for op in self.ops])
385 elif fname == "opstatus":
386 row.append([op.status for op in self.ops])
387 elif fname == "oplog":
388 row.append([op.log for op in self.ops])
389 elif fname == "opstart":
390 row.append([op.start_timestamp for op in self.ops])
391 elif fname == "opexec":
392 row.append([op.exec_timestamp for op in self.ops])
393 elif fname == "opend":
394 row.append([op.end_timestamp for op in self.ops])
395 elif fname == "oppriority":
396 row.append([op.priority for op in self.ops])
397 elif fname == "received_ts":
398 row.append(self.received_timestamp)
399 elif fname == "start_ts":
400 row.append(self.start_timestamp)
401 elif fname == "end_ts":
402 row.append(self.end_timestamp)
403 elif fname == "summary":
404 row.append([op.input.Summary() for op in self.ops])
406 raise errors.OpExecError("Invalid self query field '%s'" % fname)
409 def MarkUnfinishedOps(self, status, result):
410 """Mark unfinished opcodes with a given status and result.
412 This is an utility function for marking all running or waiting to
413 be run opcodes with a given status. Opcodes which are already
414 finalised are not changed.
416 @param status: a given opcode status
417 @param result: the opcode result
422 if op.status in constants.OPS_FINALIZED:
423 assert not_marked, "Finalized opcodes found after non-finalized ones"
430 """Marks the job as finalized.
433 self.end_timestamp = TimeStampNow()
436 """Marks job as canceled/-ing if possible.
438 @rtype: tuple; (bool, string)
439 @return: Boolean describing whether job was successfully canceled or marked
440 as canceling and a text message
443 status = self.CalcStatus()
445 if status == constants.JOB_STATUS_QUEUED:
446 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
447 "Job canceled by request")
449 return (True, "Job %s canceled" % self.id)
451 elif status == constants.JOB_STATUS_WAITLOCK:
452 # The worker will notice the new status and cancel the job
453 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
454 return (True, "Job %s will be canceled" % self.id)
457 logging.debug("Job %s is no longer waiting in the queue", self.id)
458 return (False, "Job %s is no longer waiting in the queue" % self.id)
461 class _OpExecCallbacks(mcpu.OpExecCbBase):
462 def __init__(self, queue, job, op):
463 """Initializes this class.
465 @type queue: L{JobQueue}
466 @param queue: Job queue
467 @type job: L{_QueuedJob}
468 @param job: Job object
469 @type op: L{_QueuedOpCode}
473 assert queue, "Queue is missing"
474 assert job, "Job is missing"
475 assert op, "Opcode is missing"
481 def _CheckCancel(self):
482 """Raises an exception to cancel the job if asked to.
485 # Cancel here if we were asked to
486 if self._op.status == constants.OP_STATUS_CANCELING:
487 logging.debug("Canceling opcode")
490 @locking.ssynchronized(_QUEUE, shared=1)
491 def NotifyStart(self):
492 """Mark the opcode as running, not lock-waiting.
494 This is called from the mcpu code as a notifier function, when the LU is
495 finally about to start the Exec() method. Of course, to have end-user
496 visible results, the opcode must be initially (before calling into
497 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
500 assert self._op in self._job.ops
501 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
502 constants.OP_STATUS_CANCELING)
504 # Cancel here if we were asked to
507 logging.debug("Opcode is now running")
509 self._op.status = constants.OP_STATUS_RUNNING
510 self._op.exec_timestamp = TimeStampNow()
512 # And finally replicate the job status
513 self._queue.UpdateJobUnlocked(self._job)
515 @locking.ssynchronized(_QUEUE, shared=1)
516 def _AppendFeedback(self, timestamp, log_type, log_msg):
517 """Internal feedback append function, with locks
520 self._job.log_serial += 1
521 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
522 self._queue.UpdateJobUnlocked(self._job, replicate=False)
524 def Feedback(self, *args):
525 """Append a log entry.
531 log_type = constants.ELOG_MESSAGE
534 (log_type, log_msg) = args
536 # The time is split to make serialization easier and not lose
538 timestamp = utils.SplitTime(time.time())
539 self._AppendFeedback(timestamp, log_type, log_msg)
541 def CheckCancel(self):
542 """Check whether job has been cancelled.
545 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
546 constants.OP_STATUS_CANCELING)
548 # Cancel here if we were asked to
552 class _JobChangesChecker(object):
553 def __init__(self, fields, prev_job_info, prev_log_serial):
554 """Initializes this class.
556 @type fields: list of strings
557 @param fields: Fields requested by LUXI client
558 @type prev_job_info: string
559 @param prev_job_info: previous job info, as passed by the LUXI client
560 @type prev_log_serial: string
561 @param prev_log_serial: previous job serial, as passed by the LUXI client
564 self._fields = fields
565 self._prev_job_info = prev_job_info
566 self._prev_log_serial = prev_log_serial
568 def __call__(self, job):
569 """Checks whether job has changed.
571 @type job: L{_QueuedJob}
572 @param job: Job object
575 status = job.CalcStatus()
576 job_info = job.GetInfo(self._fields)
577 log_entries = job.GetLogEntries(self._prev_log_serial)
579 # Serializing and deserializing data can cause type changes (e.g. from
580 # tuple to list) or precision loss. We're doing it here so that we get
581 # the same modifications as the data received from the client. Without
582 # this, the comparison afterwards might fail without the data being
583 # significantly different.
584 # TODO: we just deserialized from disk, investigate how to make sure that
585 # the job info and log entries are compatible to avoid this further step.
586 # TODO: Doing something like in testutils.py:UnifyValueType might be more
587 # efficient, though floats will be tricky
588 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
589 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
591 # Don't even try to wait if the job is no longer running, there will be
593 if (status not in (constants.JOB_STATUS_QUEUED,
594 constants.JOB_STATUS_RUNNING,
595 constants.JOB_STATUS_WAITLOCK) or
596 job_info != self._prev_job_info or
597 (log_entries and self._prev_log_serial != log_entries[0][0])):
598 logging.debug("Job %s changed", job.id)
599 return (job_info, log_entries)
604 class _JobFileChangesWaiter(object):
605 def __init__(self, filename):
606 """Initializes this class.
608 @type filename: string
609 @param filename: Path to job file
610 @raises errors.InotifyError: if the notifier cannot be setup
613 self._wm = pyinotify.WatchManager()
614 self._inotify_handler = \
615 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
617 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
619 self._inotify_handler.enable()
621 # pyinotify doesn't close file descriptors automatically
622 self._notifier.stop()
625 def _OnInotify(self, notifier_enabled):
626 """Callback for inotify.
629 if not notifier_enabled:
630 self._inotify_handler.enable()
632 def Wait(self, timeout):
633 """Waits for the job file to change.
636 @param timeout: Timeout in seconds
637 @return: Whether there have been events
641 have_events = self._notifier.check_events(timeout * 1000)
643 self._notifier.read_events()
644 self._notifier.process_events()
648 """Closes underlying notifier and its file descriptor.
651 self._notifier.stop()
654 class _JobChangesWaiter(object):
655 def __init__(self, filename):
656 """Initializes this class.
658 @type filename: string
659 @param filename: Path to job file
662 self._filewaiter = None
663 self._filename = filename
665 def Wait(self, timeout):
666 """Waits for a job to change.
669 @param timeout: Timeout in seconds
670 @return: Whether there have been events
674 return self._filewaiter.Wait(timeout)
676 # Lazy setup: Avoid inotify setup cost when job file has already changed.
677 # If this point is reached, return immediately and let caller check the job
678 # file again in case there were changes since the last check. This avoids a
680 self._filewaiter = _JobFileChangesWaiter(self._filename)
685 """Closes underlying waiter.
689 self._filewaiter.Close()
692 class _WaitForJobChangesHelper(object):
693 """Helper class using inotify to wait for changes in a job file.
695 This class takes a previous job status and serial, and alerts the client when
696 the current job status has changed.
700 def _CheckForChanges(job_load_fn, check_fn):
703 raise errors.JobLost()
705 result = check_fn(job)
707 raise utils.RetryAgain()
711 def __call__(self, filename, job_load_fn,
712 fields, prev_job_info, prev_log_serial, timeout):
713 """Waits for changes on a job.
715 @type filename: string
716 @param filename: File on which to wait for changes
717 @type job_load_fn: callable
718 @param job_load_fn: Function to load job
719 @type fields: list of strings
720 @param fields: Which fields to check for changes
721 @type prev_job_info: list or None
722 @param prev_job_info: Last job information returned
723 @type prev_log_serial: int
724 @param prev_log_serial: Last job message serial number
726 @param timeout: maximum time to wait in seconds
730 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
731 waiter = _JobChangesWaiter(filename)
733 return utils.Retry(compat.partial(self._CheckForChanges,
734 job_load_fn, check_fn),
735 utils.RETRY_REMAINING_TIME, timeout,
739 except (errors.InotifyError, errors.JobLost):
741 except utils.RetryTimeout:
742 return constants.JOB_NOTCHANGED
745 def _EncodeOpError(err):
746 """Encodes an error which occurred while processing an opcode.
749 if isinstance(err, errors.GenericError):
752 to_encode = errors.OpExecError(str(err))
754 return errors.EncodeException(to_encode)
757 class _TimeoutStrategyWrapper:
758 def __init__(self, fn):
759 """Initializes this class.
766 """Gets the next timeout if necessary.
769 if self._next is None:
770 self._next = self._fn()
773 """Returns the next timeout.
780 """Returns the current timeout and advances the internal state.
789 class _OpExecContext:
790 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
791 """Initializes this class.
796 self.log_prefix = log_prefix
797 self.summary = op.input.Summary()
799 self._timeout_strategy_factory = timeout_strategy_factory
800 self._ResetTimeoutStrategy()
802 def _ResetTimeoutStrategy(self):
803 """Creates a new timeout strategy.
806 self._timeout_strategy = \
807 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
809 def CheckPriorityIncrease(self):
810 """Checks whether priority can and should be increased.
812 Called when locks couldn't be acquired.
817 # Exhausted all retries and next round should not use blocking acquire
819 if (self._timeout_strategy.Peek() is None and
820 op.priority > constants.OP_PRIO_HIGHEST):
821 logging.debug("Increasing priority")
823 self._ResetTimeoutStrategy()
828 def GetNextLockTimeout(self):
829 """Returns the next lock acquire timeout.
832 return self._timeout_strategy.Next()
835 class _JobProcessor(object):
836 def __init__(self, queue, opexec_fn, job,
837 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
838 """Initializes this class.
842 self.opexec_fn = opexec_fn
844 self._timeout_strategy_factory = _timeout_strategy_factory
847 def _FindNextOpcode(job, timeout_strategy_factory):
848 """Locates the next opcode to run.
850 @type job: L{_QueuedJob}
851 @param job: Job object
852 @param timeout_strategy_factory: Callable to create new timeout strategy
855 # Create some sort of a cache to speed up locating next opcode for future
857 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
858 # pending and one for processed ops.
859 if job.ops_iter is None:
860 job.ops_iter = enumerate(job.ops)
862 # Find next opcode to run
865 (idx, op) = job.ops_iter.next()
866 except StopIteration:
867 raise errors.ProgrammerError("Called for a finished job")
869 if op.status == constants.OP_STATUS_RUNNING:
870 # Found an opcode already marked as running
871 raise errors.ProgrammerError("Called for job marked as running")
873 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
874 timeout_strategy_factory)
876 if op.status not in constants.OPS_FINALIZED:
879 # This is a job that was partially completed before master daemon
880 # shutdown, so it can be expected that some opcodes are already
881 # completed successfully (if any did error out, then the whole job
882 # should have been aborted and not resubmitted for processing).
883 logging.info("%s: opcode %s already processed, skipping",
884 opctx.log_prefix, opctx.summary)
887 def _MarkWaitlock(job, op):
888 """Marks an opcode as waiting for locks.
890 The job's start timestamp is also set if necessary.
892 @type job: L{_QueuedJob}
893 @param job: Job object
894 @type op: L{_QueuedOpCode}
895 @param op: Opcode object
899 assert op.status in (constants.OP_STATUS_QUEUED,
900 constants.OP_STATUS_WAITLOCK)
906 if op.status == constants.OP_STATUS_QUEUED:
907 op.status = constants.OP_STATUS_WAITLOCK
910 if op.start_timestamp is None:
911 op.start_timestamp = TimeStampNow()
914 if job.start_timestamp is None:
915 job.start_timestamp = op.start_timestamp
918 assert op.status == constants.OP_STATUS_WAITLOCK
922 def _ExecOpCodeUnlocked(self, opctx):
923 """Processes one opcode and returns the result.
928 assert op.status == constants.OP_STATUS_WAITLOCK
930 timeout = opctx.GetNextLockTimeout()
933 # Make sure not to hold queue lock while calling ExecOpCode
934 result = self.opexec_fn(op.input,
935 _OpExecCallbacks(self.queue, self.job, op),
936 timeout=timeout, priority=op.priority)
937 except mcpu.LockAcquireTimeout:
938 assert timeout is not None, "Received timeout for blocking acquire"
939 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
941 assert op.status in (constants.OP_STATUS_WAITLOCK,
942 constants.OP_STATUS_CANCELING)
944 # Was job cancelled while we were waiting for the lock?
945 if op.status == constants.OP_STATUS_CANCELING:
946 return (constants.OP_STATUS_CANCELING, None)
948 # Stay in waitlock while trying to re-acquire lock
949 return (constants.OP_STATUS_WAITLOCK, None)
951 logging.exception("%s: Canceling job", opctx.log_prefix)
952 assert op.status == constants.OP_STATUS_CANCELING
953 return (constants.OP_STATUS_CANCELING, None)
954 except Exception, err: # pylint: disable-msg=W0703
955 logging.exception("%s: Caught exception in %s",
956 opctx.log_prefix, opctx.summary)
957 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
959 logging.debug("%s: %s successful",
960 opctx.log_prefix, opctx.summary)
961 return (constants.OP_STATUS_SUCCESS, result)
963 def __call__(self, _nextop_fn=None):
964 """Continues execution of a job.
966 @param _nextop_fn: Callback function for tests
968 @return: True if job is finished, False if processor needs to be called
975 logging.debug("Processing job %s", job.id)
977 queue.acquire(shared=1)
979 opcount = len(job.ops)
981 # Don't do anything for finalized jobs
982 if job.CalcStatus() in constants.JOBS_FINALIZED:
985 # Is a previous opcode still pending?
987 opctx = job.cur_opctx
990 if __debug__ and _nextop_fn:
992 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
997 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
998 constants.OP_STATUS_CANCELING)
999 for i in job.ops[opctx.index + 1:])
1001 assert op.status in (constants.OP_STATUS_QUEUED,
1002 constants.OP_STATUS_WAITLOCK,
1003 constants.OP_STATUS_CANCELING)
1005 assert (op.priority <= constants.OP_PRIO_LOWEST and
1006 op.priority >= constants.OP_PRIO_HIGHEST)
1008 if op.status != constants.OP_STATUS_CANCELING:
1009 assert op.status in (constants.OP_STATUS_QUEUED,
1010 constants.OP_STATUS_WAITLOCK)
1012 # Prepare to start opcode
1013 if self._MarkWaitlock(job, op):
1015 queue.UpdateJobUnlocked(job)
1017 assert op.status == constants.OP_STATUS_WAITLOCK
1018 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1019 assert job.start_timestamp and op.start_timestamp
1021 logging.info("%s: opcode %s waiting for locks",
1022 opctx.log_prefix, opctx.summary)
1026 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1028 queue.acquire(shared=1)
1030 op.status = op_status
1031 op.result = op_result
1033 if op.status == constants.OP_STATUS_WAITLOCK:
1034 # Couldn't get locks in time
1035 assert not op.end_timestamp
1038 op.end_timestamp = TimeStampNow()
1040 if op.status == constants.OP_STATUS_CANCELING:
1041 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1042 for i in job.ops[opctx.index:])
1044 assert op.status in constants.OPS_FINALIZED
1046 if op.status == constants.OP_STATUS_WAITLOCK:
1049 if opctx.CheckPriorityIncrease():
1050 # Priority was changed, need to update on-disk file
1051 queue.UpdateJobUnlocked(job)
1053 # Keep around for another round
1054 job.cur_opctx = opctx
1056 assert (op.priority <= constants.OP_PRIO_LOWEST and
1057 op.priority >= constants.OP_PRIO_HIGHEST)
1059 # In no case must the status be finalized here
1060 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1063 # Ensure all opcodes so far have been successful
1064 assert (opctx.index == 0 or
1065 compat.all(i.status == constants.OP_STATUS_SUCCESS
1066 for i in job.ops[:opctx.index]))
1069 job.cur_opctx = None
1071 if op.status == constants.OP_STATUS_SUCCESS:
1074 elif op.status == constants.OP_STATUS_ERROR:
1075 # Ensure failed opcode has an exception as its result
1076 assert errors.GetEncodedError(job.ops[opctx.index].result)
1078 to_encode = errors.OpExecError("Preceding opcode failed")
1079 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1080 _EncodeOpError(to_encode))
1084 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1085 errors.GetEncodedError(i.result)
1086 for i in job.ops[opctx.index:])
1088 elif op.status == constants.OP_STATUS_CANCELING:
1089 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1090 "Job canceled by request")
1094 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1096 if opctx.index == (opcount - 1):
1097 # Finalize on last opcode
1101 # All opcodes have been run, finalize job
1104 # Write to disk. If the job status is final, this is the final write
1105 # allowed. Once the file has been written, it can be archived anytime.
1106 queue.UpdateJobUnlocked(job)
1109 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1117 class _JobQueueWorker(workerpool.BaseWorker):
1118 """The actual job workers.
1121 def RunTask(self, job): # pylint: disable-msg=W0221
1124 This functions processes a job. It is closely tied to the L{_QueuedJob} and
1125 L{_QueuedOpCode} classes.
1127 @type job: L{_QueuedJob}
1128 @param job: the job to be processed
1132 assert queue == self.pool.queue
1134 self.SetTaskName("Job%s" % job.id)
1136 proc = mcpu.Processor(queue.context, job.id)
1138 if not _JobProcessor(queue, proc.ExecOpCode, job)():
1140 raise workerpool.DeferTask(priority=job.CalcPriority())
1143 class _JobQueueWorkerPool(workerpool.WorkerPool):
1144 """Simple class implementing a job-processing workerpool.
1147 def __init__(self, queue):
1148 super(_JobQueueWorkerPool, self).__init__("JobQueue",
1154 def _RequireOpenQueue(fn):
1155 """Decorator for "public" functions.
1157 This function should be used for all 'public' functions. That is,
1158 functions usually called from other classes. Note that this should
1159 be applied only to methods (not plain functions), since it expects
1160 that the decorated function is called with a first argument that has
1161 a '_queue_filelock' argument.
1163 @warning: Use this decorator only after locking.ssynchronized
1166 @locking.ssynchronized(_LOCK)
1172 def wrapper(self, *args, **kwargs):
1173 # pylint: disable-msg=W0212
1174 assert self._queue_filelock is not None, "Queue should be open"
1175 return fn(self, *args, **kwargs)
1179 class JobQueue(object):
1180 """Queue used to manage the jobs.
1182 @cvar _RE_JOB_FILE: regex matching the valid job file names
1185 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1187 def __init__(self, context):
1188 """Constructor for JobQueue.
1190 The constructor will initialize the job queue object and then
1191 start loading the current jobs from disk, either for starting them
1192 (if they were queue) or for aborting them (if they were already
1195 @type context: GanetiContext
1196 @param context: the context object for access to the configuration
1197 data and other ganeti objects
1200 self.context = context
1201 self._memcache = weakref.WeakValueDictionary()
1202 self._my_hostname = netutils.Hostname.GetSysName()
1204 # The Big JobQueue lock. If a code block or method acquires it in shared
1205 # mode safe it must guarantee concurrency with all the code acquiring it in
1206 # shared mode, including itself. In order not to acquire it at all
1207 # concurrency must be guaranteed with all code acquiring it in shared mode
1208 # and all code acquiring it exclusively.
1209 self._lock = locking.SharedLock("JobQueue")
1211 self.acquire = self._lock.acquire
1212 self.release = self._lock.release
1214 # Initialize the queue, and acquire the filelock.
1215 # This ensures no other process is working on the job queue.
1216 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1219 self._last_serial = jstore.ReadSerial()
1220 assert self._last_serial is not None, ("Serial file was modified between"
1221 " check in jstore and here")
1223 # Get initial list of nodes
1224 self._nodes = dict((n.name, n.primary_ip)
1225 for n in self.context.cfg.GetAllNodesInfo().values()
1226 if n.master_candidate)
1228 # Remove master node
1229 self._nodes.pop(self._my_hostname, None)
1231 # TODO: Check consistency across nodes
1233 self._queue_size = 0
1234 self._UpdateQueueSizeUnlocked()
1235 self._drained = self._IsQueueMarkedDrain()
1238 self._wpool = _JobQueueWorkerPool(self)
1240 self._InspectQueue()
1242 self._wpool.TerminateWorkers()
1245 @locking.ssynchronized(_LOCK)
1247 def _InspectQueue(self):
1248 """Loads the whole job queue and resumes unfinished jobs.
1250 This function needs the lock here because WorkerPool.AddTask() may start a
1251 job while we're still doing our work.
1254 logging.info("Inspecting job queue")
1258 all_job_ids = self._GetJobIDsUnlocked()
1259 jobs_count = len(all_job_ids)
1260 lastinfo = time.time()
1261 for idx, job_id in enumerate(all_job_ids):
1262 # Give an update every 1000 jobs or 10 seconds
1263 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1264 idx == (jobs_count - 1)):
1265 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1266 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1267 lastinfo = time.time()
1269 job = self._LoadJobUnlocked(job_id)
1271 # a failure in loading the job can cause 'None' to be returned
1275 status = job.CalcStatus()
1277 if status == constants.JOB_STATUS_QUEUED:
1278 restartjobs.append(job)
1280 elif status in (constants.JOB_STATUS_RUNNING,
1281 constants.JOB_STATUS_WAITLOCK,
1282 constants.JOB_STATUS_CANCELING):
1283 logging.warning("Unfinished job %s found: %s", job.id, job)
1285 if status == constants.JOB_STATUS_WAITLOCK:
1287 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1288 restartjobs.append(job)
1290 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1291 "Unclean master daemon shutdown")
1293 self.UpdateJobUnlocked(job)
1296 logging.info("Restarting %s jobs", len(restartjobs))
1297 self._EnqueueJobs(restartjobs)
1299 logging.info("Job queue inspection finished")
1301 @locking.ssynchronized(_LOCK)
1303 def AddNode(self, node):
1304 """Register a new node with the queue.
1306 @type node: L{objects.Node}
1307 @param node: the node object to be added
1310 node_name = node.name
1311 assert node_name != self._my_hostname
1313 # Clean queue directory on added node
1314 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1315 msg = result.fail_msg
1317 logging.warning("Cannot cleanup queue directory on node %s: %s",
1320 if not node.master_candidate:
1321 # remove if existing, ignoring errors
1322 self._nodes.pop(node_name, None)
1323 # and skip the replication of the job ids
1326 # Upload the whole queue excluding archived jobs
1327 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1329 # Upload current serial file
1330 files.append(constants.JOB_QUEUE_SERIAL_FILE)
1332 for file_name in files:
1334 content = utils.ReadFile(file_name)
1336 result = rpc.RpcRunner.call_jobqueue_update([node_name],
1339 msg = result[node_name].fail_msg
1341 logging.error("Failed to upload file %s to node %s: %s",
1342 file_name, node_name, msg)
1344 self._nodes[node_name] = node.primary_ip
1346 @locking.ssynchronized(_LOCK)
1348 def RemoveNode(self, node_name):
1349 """Callback called when removing nodes from the cluster.
1351 @type node_name: str
1352 @param node_name: the name of the node to remove
1355 self._nodes.pop(node_name, None)
1358 def _CheckRpcResult(result, nodes, failmsg):
1359 """Verifies the status of an RPC call.
1361 Since we aim to keep consistency should this node (the current
1362 master) fail, we will log errors if our rpc fail, and especially
1363 log the case when more than half of the nodes fails.
1365 @param result: the data as returned from the rpc call
1367 @param nodes: the list of nodes we made the call to
1369 @param failmsg: the identifier to be used for logging
1376 msg = result[node].fail_msg
1379 logging.error("RPC call %s (%s) failed on node %s: %s",
1380 result[node].call, failmsg, node, msg)
1382 success.append(node)
1384 # +1 for the master node
1385 if (len(success) + 1) < len(failed):
1386 # TODO: Handle failing nodes
1387 logging.error("More than half of the nodes failed")
1389 def _GetNodeIp(self):
1390 """Helper for returning the node name/ip list.
1392 @rtype: (list, list)
1393 @return: a tuple of two lists, the first one with the node
1394 names and the second one with the node addresses
1397 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1398 name_list = self._nodes.keys()
1399 addr_list = [self._nodes[name] for name in name_list]
1400 return name_list, addr_list
1402 def _UpdateJobQueueFile(self, file_name, data, replicate):
1403 """Writes a file locally and then replicates it to all nodes.
1405 This function will replace the contents of a file on the local
1406 node and then replicate it to all the other nodes we have.
1408 @type file_name: str
1409 @param file_name: the path of the file to be replicated
1411 @param data: the new contents of the file
1412 @type replicate: boolean
1413 @param replicate: whether to spread the changes to the remote nodes
1416 getents = runtime.GetEnts()
1417 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1418 gid=getents.masterd_gid)
1421 names, addrs = self._GetNodeIp()
1422 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1423 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1425 def _RenameFilesUnlocked(self, rename):
1426 """Renames a file locally and then replicate the change.
1428 This function will rename a file in the local queue directory
1429 and then replicate this rename to all the other nodes we have.
1431 @type rename: list of (old, new)
1432 @param rename: List containing tuples mapping old to new names
1435 # Rename them locally
1436 for old, new in rename:
1437 utils.RenameFile(old, new, mkdir=True)
1439 # ... and on all nodes
1440 names, addrs = self._GetNodeIp()
1441 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1442 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1445 def _FormatJobID(job_id):
1446 """Convert a job ID to string format.
1448 Currently this just does C{str(job_id)} after performing some
1449 checks, but if we want to change the job id format this will
1450 abstract this change.
1452 @type job_id: int or long
1453 @param job_id: the numeric job id
1455 @return: the formatted job id
1458 if not isinstance(job_id, (int, long)):
1459 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1461 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1466 def _GetArchiveDirectory(cls, job_id):
1467 """Returns the archive directory for a job.
1470 @param job_id: Job identifier
1472 @return: Directory name
1475 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1477 def _NewSerialsUnlocked(self, count):
1478 """Generates a new job identifier.
1480 Job identifiers are unique during the lifetime of a cluster.
1482 @type count: integer
1483 @param count: how many serials to return
1485 @return: a string representing the job identifier.
1490 serial = self._last_serial + count
1493 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1494 "%s\n" % serial, True)
1496 result = [self._FormatJobID(v)
1497 for v in range(self._last_serial, serial + 1)]
1498 # Keep it only if we were able to write the file
1499 self._last_serial = serial
1504 def _GetJobPath(job_id):
1505 """Returns the job file for a given job id.
1508 @param job_id: the job identifier
1510 @return: the path to the job file
1513 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1516 def _GetArchivedJobPath(cls, job_id):
1517 """Returns the archived job file for a give job id.
1520 @param job_id: the job identifier
1522 @return: the path to the archived job file
1525 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1526 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1528 def _GetJobIDsUnlocked(self, sort=True):
1529 """Return all known job IDs.
1531 The method only looks at disk because it's a requirement that all
1532 jobs are present on disk (so in the _memcache we don't have any
1536 @param sort: perform sorting on the returned job ids
1538 @return: the list of job IDs
1542 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1543 m = self._RE_JOB_FILE.match(filename)
1545 jlist.append(m.group(1))
1547 jlist = utils.NiceSort(jlist)
1550 def _LoadJobUnlocked(self, job_id):
1551 """Loads a job from the disk or memory.
1553 Given a job id, this will return the cached job object if
1554 existing, or try to load the job from the disk. If loading from
1555 disk, it will also add the job to the cache.
1557 @param job_id: the job id
1558 @rtype: L{_QueuedJob} or None
1559 @return: either None or the job object
1562 job = self._memcache.get(job_id, None)
1564 logging.debug("Found job %s in memcache", job_id)
1568 job = self._LoadJobFromDisk(job_id)
1571 except errors.JobFileCorrupted:
1572 old_path = self._GetJobPath(job_id)
1573 new_path = self._GetArchivedJobPath(job_id)
1574 if old_path == new_path:
1575 # job already archived (future case)
1576 logging.exception("Can't parse job %s", job_id)
1579 logging.exception("Can't parse job %s, will archive.", job_id)
1580 self._RenameFilesUnlocked([(old_path, new_path)])
1583 self._memcache[job_id] = job
1584 logging.debug("Added job %s to the cache", job_id)
1587 def _LoadJobFromDisk(self, job_id):
1588 """Load the given job file from disk.
1590 Given a job file, read, load and restore it in a _QueuedJob format.
1592 @type job_id: string
1593 @param job_id: job identifier
1594 @rtype: L{_QueuedJob} or None
1595 @return: either None or the job object
1598 filepath = self._GetJobPath(job_id)
1599 logging.debug("Loading job from %s", filepath)
1601 raw_data = utils.ReadFile(filepath)
1602 except EnvironmentError, err:
1603 if err.errno in (errno.ENOENT, ):
1608 data = serializer.LoadJson(raw_data)
1609 job = _QueuedJob.Restore(self, data)
1610 except Exception, err: # pylint: disable-msg=W0703
1611 raise errors.JobFileCorrupted(err)
1615 def SafeLoadJobFromDisk(self, job_id):
1616 """Load the given job file from disk.
1618 Given a job file, read, load and restore it in a _QueuedJob format.
1619 In case of error reading the job, it gets returned as None, and the
1620 exception is logged.
1622 @type job_id: string
1623 @param job_id: job identifier
1624 @rtype: L{_QueuedJob} or None
1625 @return: either None or the job object
1629 return self._LoadJobFromDisk(job_id)
1630 except (errors.JobFileCorrupted, EnvironmentError):
1631 logging.exception("Can't load/parse job %s", job_id)
1635 def _IsQueueMarkedDrain():
1636 """Check if the queue is marked from drain.
1638 This currently uses the queue drain file, which makes it a
1639 per-node flag. In the future this can be moved to the config file.
1642 @return: True of the job queue is marked for draining
1645 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1647 def _UpdateQueueSizeUnlocked(self):
1648 """Update the queue size.
1651 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1653 @locking.ssynchronized(_LOCK)
1655 def SetDrainFlag(self, drain_flag):
1656 """Sets the drain flag for the queue.
1658 @type drain_flag: boolean
1659 @param drain_flag: Whether to set or unset the drain flag
1662 getents = runtime.GetEnts()
1665 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1666 uid=getents.masterd_uid, gid=getents.masterd_gid)
1668 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1670 self._drained = drain_flag
1675 def _SubmitJobUnlocked(self, job_id, ops):
1676 """Create and store a new job.
1678 This enters the job into our job queue and also puts it on the new
1679 queue, in order for it to be picked up by the queue processors.
1681 @type job_id: job ID
1682 @param job_id: the job ID for the new job
1684 @param ops: The list of OpCodes that will become the new job.
1685 @rtype: L{_QueuedJob}
1686 @return: the job object to be queued
1687 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1688 @raise errors.JobQueueFull: if the job queue has too many jobs in it
1689 @raise errors.GenericError: If an opcode is not valid
1692 # Ok when sharing the big job queue lock, as the drain file is created when
1693 # the lock is exclusive.
1695 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1697 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1698 raise errors.JobQueueFull()
1700 job = _QueuedJob(self, job_id, ops)
1703 for idx, op in enumerate(job.ops):
1704 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1705 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1706 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1707 " are %s" % (idx, op.priority, allowed))
1710 self.UpdateJobUnlocked(job)
1712 self._queue_size += 1
1714 logging.debug("Adding new job %s to the cache", job_id)
1715 self._memcache[job_id] = job
1719 @locking.ssynchronized(_LOCK)
1721 def SubmitJob(self, ops):
1722 """Create and store a new job.
1724 @see: L{_SubmitJobUnlocked}
1727 job_id = self._NewSerialsUnlocked(1)[0]
1728 self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1731 @locking.ssynchronized(_LOCK)
1733 def SubmitManyJobs(self, jobs):
1734 """Create and store multiple jobs.
1736 @see: L{_SubmitJobUnlocked}
1741 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1742 for job_id, ops in zip(all_job_ids, jobs):
1744 added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1747 except errors.GenericError, err:
1750 results.append((status, data))
1752 self._EnqueueJobs(added_jobs)
1756 def _EnqueueJobs(self, jobs):
1757 """Helper function to add jobs to worker pool's queue.
1760 @param jobs: List of all jobs
1763 self._wpool.AddManyTasks([(job, ) for job in jobs],
1764 priority=[job.CalcPriority() for job in jobs])
1767 def UpdateJobUnlocked(self, job, replicate=True):
1768 """Update a job's on disk storage.
1770 After a job has been modified, this function needs to be called in
1771 order to write the changes to disk and replicate them to the other
1774 @type job: L{_QueuedJob}
1775 @param job: the changed job
1776 @type replicate: boolean
1777 @param replicate: whether to replicate the change to remote nodes
1781 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1782 assert (finalized ^ (job.end_timestamp is None))
1784 filename = self._GetJobPath(job.id)
1785 data = serializer.DumpJson(job.Serialize(), indent=False)
1786 logging.debug("Writing job %s to %s", job.id, filename)
1787 self._UpdateJobQueueFile(filename, data, replicate)
1789 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1791 """Waits for changes in a job.
1793 @type job_id: string
1794 @param job_id: Job identifier
1795 @type fields: list of strings
1796 @param fields: Which fields to check for changes
1797 @type prev_job_info: list or None
1798 @param prev_job_info: Last job information returned
1799 @type prev_log_serial: int
1800 @param prev_log_serial: Last job message serial number
1801 @type timeout: float
1802 @param timeout: maximum time to wait in seconds
1803 @rtype: tuple (job info, log entries)
1804 @return: a tuple of the job information as required via
1805 the fields parameter, and the log entries as a list
1807 if the job has not changed and the timeout has expired,
1808 we instead return a special value,
1809 L{constants.JOB_NOTCHANGED}, which should be interpreted
1810 as such by the clients
1813 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1815 helper = _WaitForJobChangesHelper()
1817 return helper(self._GetJobPath(job_id), load_fn,
1818 fields, prev_job_info, prev_log_serial, timeout)
1820 @locking.ssynchronized(_LOCK)
1822 def CancelJob(self, job_id):
1825 This will only succeed if the job has not started yet.
1827 @type job_id: string
1828 @param job_id: job ID of job to be cancelled.
1831 logging.info("Cancelling job %s", job_id)
1833 job = self._LoadJobUnlocked(job_id)
1835 logging.debug("Job %s not found", job_id)
1836 return (False, "Job %s not found" % job_id)
1838 (success, msg) = job.Cancel()
1841 # If the job was finalized (e.g. cancelled), this is the final write
1842 # allowed. The job can be archived anytime.
1843 self.UpdateJobUnlocked(job)
1845 return (success, msg)
1848 def _ArchiveJobsUnlocked(self, jobs):
1851 @type jobs: list of L{_QueuedJob}
1852 @param jobs: Job objects
1854 @return: Number of archived jobs
1860 if job.CalcStatus() not in constants.JOBS_FINALIZED:
1861 logging.debug("Job %s is not yet done", job.id)
1864 archive_jobs.append(job)
1866 old = self._GetJobPath(job.id)
1867 new = self._GetArchivedJobPath(job.id)
1868 rename_files.append((old, new))
1870 # TODO: What if 1..n files fail to rename?
1871 self._RenameFilesUnlocked(rename_files)
1873 logging.debug("Successfully archived job(s) %s",
1874 utils.CommaJoin(job.id for job in archive_jobs))
1876 # Since we haven't quite checked, above, if we succeeded or failed renaming
1877 # the files, we update the cached queue size from the filesystem. When we
1878 # get around to fix the TODO: above, we can use the number of actually
1879 # archived jobs to fix this.
1880 self._UpdateQueueSizeUnlocked()
1881 return len(archive_jobs)
1883 @locking.ssynchronized(_LOCK)
1885 def ArchiveJob(self, job_id):
1888 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1890 @type job_id: string
1891 @param job_id: Job ID of job to be archived.
1893 @return: Whether job was archived
1896 logging.info("Archiving job %s", job_id)
1898 job = self._LoadJobUnlocked(job_id)
1900 logging.debug("Job %s not found", job_id)
1903 return self._ArchiveJobsUnlocked([job]) == 1
1905 @locking.ssynchronized(_LOCK)
1907 def AutoArchiveJobs(self, age, timeout):
1908 """Archives all jobs based on age.
1910 The method will archive all jobs which are older than the age
1911 parameter. For jobs that don't have an end timestamp, the start
1912 timestamp will be considered. The special '-1' age will cause
1913 archival of all jobs (that are not running or queued).
1916 @param age: the minimum age in seconds
1919 logging.info("Archiving jobs with age more than %s seconds", age)
1922 end_time = now + timeout
1926 all_job_ids = self._GetJobIDsUnlocked()
1928 for idx, job_id in enumerate(all_job_ids):
1929 last_touched = idx + 1
1931 # Not optimal because jobs could be pending
1932 # TODO: Measure average duration for job archival and take number of
1933 # pending jobs into account.
1934 if time.time() > end_time:
1937 # Returns None if the job failed to load
1938 job = self._LoadJobUnlocked(job_id)
1940 if job.end_timestamp is None:
1941 if job.start_timestamp is None:
1942 job_age = job.received_timestamp
1944 job_age = job.start_timestamp
1946 job_age = job.end_timestamp
1948 if age == -1 or now - job_age[0] > age:
1951 # Archive 10 jobs at a time
1952 if len(pending) >= 10:
1953 archived_count += self._ArchiveJobsUnlocked(pending)
1957 archived_count += self._ArchiveJobsUnlocked(pending)
1959 return (archived_count, len(all_job_ids) - last_touched)
1961 def QueryJobs(self, job_ids, fields):
1962 """Returns a list of jobs in queue.
1965 @param job_ids: sequence of job identifiers or None for all
1967 @param fields: names of fields to return
1969 @return: list one element per job, each element being list with
1970 the requested fields
1976 # Since files are added to/removed from the queue atomically, there's no
1977 # risk of getting the job ids in an inconsistent state.
1978 job_ids = self._GetJobIDsUnlocked()
1981 for job_id in job_ids:
1982 job = self.SafeLoadJobFromDisk(job_id)
1984 jobs.append(job.GetInfo(fields))
1990 @locking.ssynchronized(_LOCK)
1993 """Stops the job queue.
1995 This shutdowns all the worker threads an closes the queue.
1998 self._wpool.TerminateWorkers()
2000 self._queue_filelock.Close()
2001 self._queue_filelock = None