jqueue: Don't update file in MarkUnfinishedOps
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import errno
35 import re
36 import time
37 import weakref
38
39 try:
40   # pylint: disable-msg=E0611
41   from pyinotify import pyinotify
42 except ImportError:
43   import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import netutils
57 from ganeti import compat
58
59
60 JOBQUEUE_THREADS = 25
61 JOBS_PER_ARCHIVE_DIRECTORY = 10000
62
63 # member lock names to be passed to @ssynchronized decorator
64 _LOCK = "_lock"
65 _QUEUE = "_queue"
66
67
68 class CancelJob(Exception):
69   """Special exception to cancel a job.
70
71   """
72
73
74 def TimeStampNow():
75   """Returns the current timestamp.
76
77   @rtype: tuple
78   @return: the current time in the (seconds, microseconds) format
79
80   """
81   return utils.SplitTime(time.time())
82
83
84 class _QueuedOpCode(object):
85   """Encapsulates an opcode object.
86
87   @ivar log: holds the execution log and consists of tuples
88   of the form C{(log_serial, timestamp, level, message)}
89   @ivar input: the OpCode we encapsulate
90   @ivar status: the current status
91   @ivar result: the result of the LU execution
92   @ivar start_timestamp: timestamp for the start of the execution
93   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94   @ivar stop_timestamp: timestamp for the end of the execution
95
96   """
97   __slots__ = ["input", "status", "result", "log",
98                "start_timestamp", "exec_timestamp", "end_timestamp",
99                "__weakref__"]
100
101   def __init__(self, op):
102     """Constructor for the _QuededOpCode.
103
104     @type op: L{opcodes.OpCode}
105     @param op: the opcode we encapsulate
106
107     """
108     self.input = op
109     self.status = constants.OP_STATUS_QUEUED
110     self.result = None
111     self.log = []
112     self.start_timestamp = None
113     self.exec_timestamp = None
114     self.end_timestamp = None
115
116   @classmethod
117   def Restore(cls, state):
118     """Restore the _QueuedOpCode from the serialized form.
119
120     @type state: dict
121     @param state: the serialized state
122     @rtype: _QueuedOpCode
123     @return: a new _QueuedOpCode instance
124
125     """
126     obj = _QueuedOpCode.__new__(cls)
127     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128     obj.status = state["status"]
129     obj.result = state["result"]
130     obj.log = state["log"]
131     obj.start_timestamp = state.get("start_timestamp", None)
132     obj.exec_timestamp = state.get("exec_timestamp", None)
133     obj.end_timestamp = state.get("end_timestamp", None)
134     return obj
135
136   def Serialize(self):
137     """Serializes this _QueuedOpCode.
138
139     @rtype: dict
140     @return: the dictionary holding the serialized state
141
142     """
143     return {
144       "input": self.input.__getstate__(),
145       "status": self.status,
146       "result": self.result,
147       "log": self.log,
148       "start_timestamp": self.start_timestamp,
149       "exec_timestamp": self.exec_timestamp,
150       "end_timestamp": self.end_timestamp,
151       }
152
153
154 class _QueuedJob(object):
155   """In-memory job representation.
156
157   This is what we use to track the user-submitted jobs. Locking must
158   be taken care of by users of this class.
159
160   @type queue: L{JobQueue}
161   @ivar queue: the parent queue
162   @ivar id: the job ID
163   @type ops: list
164   @ivar ops: the list of _QueuedOpCode that constitute the job
165   @type log_serial: int
166   @ivar log_serial: holds the index for the next log entry
167   @ivar received_timestamp: the timestamp for when the job was received
168   @ivar start_timestmap: the timestamp for start of execution
169   @ivar end_timestamp: the timestamp for end of execution
170
171   """
172   # pylint: disable-msg=W0212
173   __slots__ = ["queue", "id", "ops", "log_serial",
174                "received_timestamp", "start_timestamp", "end_timestamp",
175                "__weakref__"]
176
177   def __init__(self, queue, job_id, ops):
178     """Constructor for the _QueuedJob.
179
180     @type queue: L{JobQueue}
181     @param queue: our parent queue
182     @type job_id: job_id
183     @param job_id: our job id
184     @type ops: list
185     @param ops: the list of opcodes we hold, which will be encapsulated
186         in _QueuedOpCodes
187
188     """
189     if not ops:
190       raise errors.GenericError("A job needs at least one opcode")
191
192     self.queue = queue
193     self.id = job_id
194     self.ops = [_QueuedOpCode(op) for op in ops]
195     self.log_serial = 0
196     self.received_timestamp = TimeStampNow()
197     self.start_timestamp = None
198     self.end_timestamp = None
199
200   def __repr__(self):
201     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
202               "id=%s" % self.id,
203               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
204
205     return "<%s at %#x>" % (" ".join(status), id(self))
206
207   @classmethod
208   def Restore(cls, queue, state):
209     """Restore a _QueuedJob from serialized state:
210
211     @type queue: L{JobQueue}
212     @param queue: to which queue the restored job belongs
213     @type state: dict
214     @param state: the serialized state
215     @rtype: _JobQueue
216     @return: the restored _JobQueue instance
217
218     """
219     obj = _QueuedJob.__new__(cls)
220     obj.queue = queue
221     obj.id = state["id"]
222     obj.received_timestamp = state.get("received_timestamp", None)
223     obj.start_timestamp = state.get("start_timestamp", None)
224     obj.end_timestamp = state.get("end_timestamp", None)
225
226     obj.ops = []
227     obj.log_serial = 0
228     for op_state in state["ops"]:
229       op = _QueuedOpCode.Restore(op_state)
230       for log_entry in op.log:
231         obj.log_serial = max(obj.log_serial, log_entry[0])
232       obj.ops.append(op)
233
234     return obj
235
236   def Serialize(self):
237     """Serialize the _JobQueue instance.
238
239     @rtype: dict
240     @return: the serialized state
241
242     """
243     return {
244       "id": self.id,
245       "ops": [op.Serialize() for op in self.ops],
246       "start_timestamp": self.start_timestamp,
247       "end_timestamp": self.end_timestamp,
248       "received_timestamp": self.received_timestamp,
249       }
250
251   def CalcStatus(self):
252     """Compute the status of this job.
253
254     This function iterates over all the _QueuedOpCodes in the job and
255     based on their status, computes the job status.
256
257     The algorithm is:
258       - if we find a cancelled, or finished with error, the job
259         status will be the same
260       - otherwise, the last opcode with the status one of:
261           - waitlock
262           - canceling
263           - running
264
265         will determine the job status
266
267       - otherwise, it means either all opcodes are queued, or success,
268         and the job status will be the same
269
270     @return: the job status
271
272     """
273     status = constants.JOB_STATUS_QUEUED
274
275     all_success = True
276     for op in self.ops:
277       if op.status == constants.OP_STATUS_SUCCESS:
278         continue
279
280       all_success = False
281
282       if op.status == constants.OP_STATUS_QUEUED:
283         pass
284       elif op.status == constants.OP_STATUS_WAITLOCK:
285         status = constants.JOB_STATUS_WAITLOCK
286       elif op.status == constants.OP_STATUS_RUNNING:
287         status = constants.JOB_STATUS_RUNNING
288       elif op.status == constants.OP_STATUS_CANCELING:
289         status = constants.JOB_STATUS_CANCELING
290         break
291       elif op.status == constants.OP_STATUS_ERROR:
292         status = constants.JOB_STATUS_ERROR
293         # The whole job fails if one opcode failed
294         break
295       elif op.status == constants.OP_STATUS_CANCELED:
296         status = constants.OP_STATUS_CANCELED
297         break
298
299     if all_success:
300       status = constants.JOB_STATUS_SUCCESS
301
302     return status
303
304   def GetLogEntries(self, newer_than):
305     """Selectively returns the log entries.
306
307     @type newer_than: None or int
308     @param newer_than: if this is None, return all log entries,
309         otherwise return only the log entries with serial higher
310         than this value
311     @rtype: list
312     @return: the list of the log entries selected
313
314     """
315     if newer_than is None:
316       serial = -1
317     else:
318       serial = newer_than
319
320     entries = []
321     for op in self.ops:
322       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
323
324     return entries
325
326   def GetInfo(self, fields):
327     """Returns information about a job.
328
329     @type fields: list
330     @param fields: names of fields to return
331     @rtype: list
332     @return: list with one element for each field
333     @raise errors.OpExecError: when an invalid field
334         has been passed
335
336     """
337     row = []
338     for fname in fields:
339       if fname == "id":
340         row.append(self.id)
341       elif fname == "status":
342         row.append(self.CalcStatus())
343       elif fname == "ops":
344         row.append([op.input.__getstate__() for op in self.ops])
345       elif fname == "opresult":
346         row.append([op.result for op in self.ops])
347       elif fname == "opstatus":
348         row.append([op.status for op in self.ops])
349       elif fname == "oplog":
350         row.append([op.log for op in self.ops])
351       elif fname == "opstart":
352         row.append([op.start_timestamp for op in self.ops])
353       elif fname == "opexec":
354         row.append([op.exec_timestamp for op in self.ops])
355       elif fname == "opend":
356         row.append([op.end_timestamp for op in self.ops])
357       elif fname == "received_ts":
358         row.append(self.received_timestamp)
359       elif fname == "start_ts":
360         row.append(self.start_timestamp)
361       elif fname == "end_ts":
362         row.append(self.end_timestamp)
363       elif fname == "summary":
364         row.append([op.input.Summary() for op in self.ops])
365       else:
366         raise errors.OpExecError("Invalid self query field '%s'" % fname)
367     return row
368
369   def MarkUnfinishedOps(self, status, result):
370     """Mark unfinished opcodes with a given status and result.
371
372     This is an utility function for marking all running or waiting to
373     be run opcodes with a given status. Opcodes which are already
374     finalised are not changed.
375
376     @param status: a given opcode status
377     @param result: the opcode result
378
379     """
380     not_marked = True
381     for op in self.ops:
382       if op.status in constants.OPS_FINALIZED:
383         assert not_marked, "Finalized opcodes found after non-finalized ones"
384         continue
385       op.status = status
386       op.result = result
387       not_marked = False
388
389
390 class _OpExecCallbacks(mcpu.OpExecCbBase):
391   def __init__(self, queue, job, op):
392     """Initializes this class.
393
394     @type queue: L{JobQueue}
395     @param queue: Job queue
396     @type job: L{_QueuedJob}
397     @param job: Job object
398     @type op: L{_QueuedOpCode}
399     @param op: OpCode
400
401     """
402     assert queue, "Queue is missing"
403     assert job, "Job is missing"
404     assert op, "Opcode is missing"
405
406     self._queue = queue
407     self._job = job
408     self._op = op
409
410   def _CheckCancel(self):
411     """Raises an exception to cancel the job if asked to.
412
413     """
414     # Cancel here if we were asked to
415     if self._op.status == constants.OP_STATUS_CANCELING:
416       logging.debug("Canceling opcode")
417       raise CancelJob()
418
419   @locking.ssynchronized(_QUEUE, shared=1)
420   def NotifyStart(self):
421     """Mark the opcode as running, not lock-waiting.
422
423     This is called from the mcpu code as a notifier function, when the LU is
424     finally about to start the Exec() method. Of course, to have end-user
425     visible results, the opcode must be initially (before calling into
426     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
427
428     """
429     assert self._op in self._job.ops
430     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
431                                constants.OP_STATUS_CANCELING)
432
433     # Cancel here if we were asked to
434     self._CheckCancel()
435
436     logging.debug("Opcode is now running")
437
438     self._op.status = constants.OP_STATUS_RUNNING
439     self._op.exec_timestamp = TimeStampNow()
440
441     # And finally replicate the job status
442     self._queue.UpdateJobUnlocked(self._job)
443
444   @locking.ssynchronized(_QUEUE, shared=1)
445   def _AppendFeedback(self, timestamp, log_type, log_msg):
446     """Internal feedback append function, with locks
447
448     """
449     self._job.log_serial += 1
450     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
451     self._queue.UpdateJobUnlocked(self._job, replicate=False)
452
453   def Feedback(self, *args):
454     """Append a log entry.
455
456     """
457     assert len(args) < 3
458
459     if len(args) == 1:
460       log_type = constants.ELOG_MESSAGE
461       log_msg = args[0]
462     else:
463       (log_type, log_msg) = args
464
465     # The time is split to make serialization easier and not lose
466     # precision.
467     timestamp = utils.SplitTime(time.time())
468     self._AppendFeedback(timestamp, log_type, log_msg)
469
470   def ReportLocks(self, msg):
471     """Write locking information to the job.
472
473     Called whenever the LU processor is waiting for a lock or has acquired one.
474
475     """
476     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
477                                constants.OP_STATUS_CANCELING)
478
479     # Cancel here if we were asked to
480     self._CheckCancel()
481
482
483 class _JobChangesChecker(object):
484   def __init__(self, fields, prev_job_info, prev_log_serial):
485     """Initializes this class.
486
487     @type fields: list of strings
488     @param fields: Fields requested by LUXI client
489     @type prev_job_info: string
490     @param prev_job_info: previous job info, as passed by the LUXI client
491     @type prev_log_serial: string
492     @param prev_log_serial: previous job serial, as passed by the LUXI client
493
494     """
495     self._fields = fields
496     self._prev_job_info = prev_job_info
497     self._prev_log_serial = prev_log_serial
498
499   def __call__(self, job):
500     """Checks whether job has changed.
501
502     @type job: L{_QueuedJob}
503     @param job: Job object
504
505     """
506     status = job.CalcStatus()
507     job_info = job.GetInfo(self._fields)
508     log_entries = job.GetLogEntries(self._prev_log_serial)
509
510     # Serializing and deserializing data can cause type changes (e.g. from
511     # tuple to list) or precision loss. We're doing it here so that we get
512     # the same modifications as the data received from the client. Without
513     # this, the comparison afterwards might fail without the data being
514     # significantly different.
515     # TODO: we just deserialized from disk, investigate how to make sure that
516     # the job info and log entries are compatible to avoid this further step.
517     # TODO: Doing something like in testutils.py:UnifyValueType might be more
518     # efficient, though floats will be tricky
519     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
520     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
521
522     # Don't even try to wait if the job is no longer running, there will be
523     # no changes.
524     if (status not in (constants.JOB_STATUS_QUEUED,
525                        constants.JOB_STATUS_RUNNING,
526                        constants.JOB_STATUS_WAITLOCK) or
527         job_info != self._prev_job_info or
528         (log_entries and self._prev_log_serial != log_entries[0][0])):
529       logging.debug("Job %s changed", job.id)
530       return (job_info, log_entries)
531
532     return None
533
534
535 class _JobFileChangesWaiter(object):
536   def __init__(self, filename):
537     """Initializes this class.
538
539     @type filename: string
540     @param filename: Path to job file
541     @raises errors.InotifyError: if the notifier cannot be setup
542
543     """
544     self._wm = pyinotify.WatchManager()
545     self._inotify_handler = \
546       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
547     self._notifier = \
548       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
549     try:
550       self._inotify_handler.enable()
551     except Exception:
552       # pyinotify doesn't close file descriptors automatically
553       self._notifier.stop()
554       raise
555
556   def _OnInotify(self, notifier_enabled):
557     """Callback for inotify.
558
559     """
560     if not notifier_enabled:
561       self._inotify_handler.enable()
562
563   def Wait(self, timeout):
564     """Waits for the job file to change.
565
566     @type timeout: float
567     @param timeout: Timeout in seconds
568     @return: Whether there have been events
569
570     """
571     assert timeout >= 0
572     have_events = self._notifier.check_events(timeout * 1000)
573     if have_events:
574       self._notifier.read_events()
575     self._notifier.process_events()
576     return have_events
577
578   def Close(self):
579     """Closes underlying notifier and its file descriptor.
580
581     """
582     self._notifier.stop()
583
584
585 class _JobChangesWaiter(object):
586   def __init__(self, filename):
587     """Initializes this class.
588
589     @type filename: string
590     @param filename: Path to job file
591
592     """
593     self._filewaiter = None
594     self._filename = filename
595
596   def Wait(self, timeout):
597     """Waits for a job to change.
598
599     @type timeout: float
600     @param timeout: Timeout in seconds
601     @return: Whether there have been events
602
603     """
604     if self._filewaiter:
605       return self._filewaiter.Wait(timeout)
606
607     # Lazy setup: Avoid inotify setup cost when job file has already changed.
608     # If this point is reached, return immediately and let caller check the job
609     # file again in case there were changes since the last check. This avoids a
610     # race condition.
611     self._filewaiter = _JobFileChangesWaiter(self._filename)
612
613     return True
614
615   def Close(self):
616     """Closes underlying waiter.
617
618     """
619     if self._filewaiter:
620       self._filewaiter.Close()
621
622
623 class _WaitForJobChangesHelper(object):
624   """Helper class using inotify to wait for changes in a job file.
625
626   This class takes a previous job status and serial, and alerts the client when
627   the current job status has changed.
628
629   """
630   @staticmethod
631   def _CheckForChanges(job_load_fn, check_fn):
632     job = job_load_fn()
633     if not job:
634       raise errors.JobLost()
635
636     result = check_fn(job)
637     if result is None:
638       raise utils.RetryAgain()
639
640     return result
641
642   def __call__(self, filename, job_load_fn,
643                fields, prev_job_info, prev_log_serial, timeout):
644     """Waits for changes on a job.
645
646     @type filename: string
647     @param filename: File on which to wait for changes
648     @type job_load_fn: callable
649     @param job_load_fn: Function to load job
650     @type fields: list of strings
651     @param fields: Which fields to check for changes
652     @type prev_job_info: list or None
653     @param prev_job_info: Last job information returned
654     @type prev_log_serial: int
655     @param prev_log_serial: Last job message serial number
656     @type timeout: float
657     @param timeout: maximum time to wait in seconds
658
659     """
660     try:
661       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
662       waiter = _JobChangesWaiter(filename)
663       try:
664         return utils.Retry(compat.partial(self._CheckForChanges,
665                                           job_load_fn, check_fn),
666                            utils.RETRY_REMAINING_TIME, timeout,
667                            wait_fn=waiter.Wait)
668       finally:
669         waiter.Close()
670     except (errors.InotifyError, errors.JobLost):
671       return None
672     except utils.RetryTimeout:
673       return constants.JOB_NOTCHANGED
674
675
676 def _EncodeOpError(err):
677   """Encodes an error which occurred while processing an opcode.
678
679   """
680   if isinstance(err, errors.GenericError):
681     to_encode = err
682   else:
683     to_encode = errors.OpExecError(str(err))
684
685   return errors.EncodeException(to_encode)
686
687
688 class _JobQueueWorker(workerpool.BaseWorker):
689   """The actual job workers.
690
691   """
692   def RunTask(self, job): # pylint: disable-msg=W0221
693     """Job executor.
694
695     This functions processes a job. It is closely tied to the _QueuedJob and
696     _QueuedOpCode classes.
697
698     @type job: L{_QueuedJob}
699     @param job: the job to be processed
700
701     """
702     self.SetTaskName("Job%s" % job.id)
703
704     logging.info("Processing job %s", job.id)
705     proc = mcpu.Processor(self.pool.queue.context, job.id)
706     queue = job.queue
707     try:
708       try:
709         count = len(job.ops)
710         for idx, op in enumerate(job.ops):
711           op_summary = op.input.Summary()
712           if op.status == constants.OP_STATUS_SUCCESS:
713             # this is a job that was partially completed before master
714             # daemon shutdown, so it can be expected that some opcodes
715             # are already completed successfully (if any did error
716             # out, then the whole job should have been aborted and not
717             # resubmitted for processing)
718             logging.info("Op %s/%s: opcode %s already processed, skipping",
719                          idx + 1, count, op_summary)
720             continue
721           try:
722             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
723                          op_summary)
724
725             queue.acquire(shared=1)
726             try:
727               if op.status == constants.OP_STATUS_CANCELED:
728                 logging.debug("Canceling opcode")
729                 raise CancelJob()
730               assert op.status == constants.OP_STATUS_QUEUED
731               logging.debug("Opcode %s/%s waiting for locks",
732                             idx + 1, count)
733               op.status = constants.OP_STATUS_WAITLOCK
734               op.result = None
735               op.start_timestamp = TimeStampNow()
736               if idx == 0: # first opcode
737                 job.start_timestamp = op.start_timestamp
738               queue.UpdateJobUnlocked(job)
739
740               input_opcode = op.input
741             finally:
742               queue.release()
743
744             # Make sure not to hold queue lock while calling ExecOpCode
745             result = proc.ExecOpCode(input_opcode,
746                                      _OpExecCallbacks(queue, job, op))
747
748             queue.acquire(shared=1)
749             try:
750               logging.debug("Opcode %s/%s succeeded", idx + 1, count)
751               op.status = constants.OP_STATUS_SUCCESS
752               op.result = result
753               op.end_timestamp = TimeStampNow()
754               if idx == count - 1:
755                 job.end_timestamp = TimeStampNow()
756
757                 # Consistency check
758                 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
759                                   for i in job.ops)
760
761               queue.UpdateJobUnlocked(job)
762             finally:
763               queue.release()
764
765             logging.info("Op %s/%s: Successfully finished opcode %s",
766                          idx + 1, count, op_summary)
767           except CancelJob:
768             # Will be handled further up
769             raise
770           except Exception, err:
771             queue.acquire(shared=1)
772             try:
773               try:
774                 logging.debug("Opcode %s/%s failed", idx + 1, count)
775                 op.status = constants.OP_STATUS_ERROR
776                 op.result = _EncodeOpError(err)
777                 op.end_timestamp = TimeStampNow()
778                 logging.info("Op %s/%s: Error in opcode %s: %s",
779                              idx + 1, count, op_summary, err)
780
781                 to_encode = errors.OpExecError("Preceding opcode failed")
782                 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
783                                       _EncodeOpError(to_encode))
784
785                 # Consistency check
786                 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
787                                   for i in job.ops[:idx])
788                 assert compat.all(i.status == constants.OP_STATUS_ERROR and
789                                   errors.GetEncodedError(i.result)
790                                   for i in job.ops[idx:])
791               finally:
792                 job.end_timestamp = TimeStampNow()
793                 queue.UpdateJobUnlocked(job)
794             finally:
795               queue.release()
796             raise
797
798       except CancelJob:
799         queue.acquire(shared=1)
800         try:
801           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
802                                 "Job canceled by request")
803           job.end_timestamp = TimeStampNow()
804           queue.UpdateJobUnlocked(job)
805         finally:
806           queue.release()
807       except errors.GenericError, err:
808         logging.exception("Ganeti exception")
809       except:
810         logging.exception("Unhandled exception")
811     finally:
812       status = job.CalcStatus()
813       logging.info("Finished job %s, status = %s", job.id, status)
814
815
816 class _JobQueueWorkerPool(workerpool.WorkerPool):
817   """Simple class implementing a job-processing workerpool.
818
819   """
820   def __init__(self, queue):
821     super(_JobQueueWorkerPool, self).__init__("JobQueue",
822                                               JOBQUEUE_THREADS,
823                                               _JobQueueWorker)
824     self.queue = queue
825
826
827 def _RequireOpenQueue(fn):
828   """Decorator for "public" functions.
829
830   This function should be used for all 'public' functions. That is,
831   functions usually called from other classes. Note that this should
832   be applied only to methods (not plain functions), since it expects
833   that the decorated function is called with a first argument that has
834   a '_queue_filelock' argument.
835
836   @warning: Use this decorator only after locking.ssynchronized
837
838   Example::
839     @locking.ssynchronized(_LOCK)
840     @_RequireOpenQueue
841     def Example(self):
842       pass
843
844   """
845   def wrapper(self, *args, **kwargs):
846     # pylint: disable-msg=W0212
847     assert self._queue_filelock is not None, "Queue should be open"
848     return fn(self, *args, **kwargs)
849   return wrapper
850
851
852 class JobQueue(object):
853   """Queue used to manage the jobs.
854
855   @cvar _RE_JOB_FILE: regex matching the valid job file names
856
857   """
858   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
859
860   def __init__(self, context):
861     """Constructor for JobQueue.
862
863     The constructor will initialize the job queue object and then
864     start loading the current jobs from disk, either for starting them
865     (if they were queue) or for aborting them (if they were already
866     running).
867
868     @type context: GanetiContext
869     @param context: the context object for access to the configuration
870         data and other ganeti objects
871
872     """
873     self.context = context
874     self._memcache = weakref.WeakValueDictionary()
875     self._my_hostname = netutils.HostInfo().name
876
877     # The Big JobQueue lock. If a code block or method acquires it in shared
878     # mode safe it must guarantee concurrency with all the code acquiring it in
879     # shared mode, including itself. In order not to acquire it at all
880     # concurrency must be guaranteed with all code acquiring it in shared mode
881     # and all code acquiring it exclusively.
882     self._lock = locking.SharedLock("JobQueue")
883
884     self.acquire = self._lock.acquire
885     self.release = self._lock.release
886
887     # Initialize the queue, and acquire the filelock.
888     # This ensures no other process is working on the job queue.
889     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
890
891     # Read serial file
892     self._last_serial = jstore.ReadSerial()
893     assert self._last_serial is not None, ("Serial file was modified between"
894                                            " check in jstore and here")
895
896     # Get initial list of nodes
897     self._nodes = dict((n.name, n.primary_ip)
898                        for n in self.context.cfg.GetAllNodesInfo().values()
899                        if n.master_candidate)
900
901     # Remove master node
902     self._nodes.pop(self._my_hostname, None)
903
904     # TODO: Check consistency across nodes
905
906     self._queue_size = 0
907     self._UpdateQueueSizeUnlocked()
908     self._drained = self._IsQueueMarkedDrain()
909
910     # Setup worker pool
911     self._wpool = _JobQueueWorkerPool(self)
912     try:
913       # We need to lock here because WorkerPool.AddTask() may start a job while
914       # we're still doing our work.
915       self.acquire()
916       try:
917         logging.info("Inspecting job queue")
918
919         all_job_ids = self._GetJobIDsUnlocked()
920         jobs_count = len(all_job_ids)
921         lastinfo = time.time()
922         for idx, job_id in enumerate(all_job_ids):
923           # Give an update every 1000 jobs or 10 seconds
924           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
925               idx == (jobs_count - 1)):
926             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
927                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
928             lastinfo = time.time()
929
930           job = self._LoadJobUnlocked(job_id)
931
932           # a failure in loading the job can cause 'None' to be returned
933           if job is None:
934             continue
935
936           status = job.CalcStatus()
937
938           if status in (constants.JOB_STATUS_QUEUED, ):
939             self._wpool.AddTask((job, ))
940
941           elif status in (constants.JOB_STATUS_RUNNING,
942                           constants.JOB_STATUS_WAITLOCK,
943                           constants.JOB_STATUS_CANCELING):
944             logging.warning("Unfinished job %s found: %s", job.id, job)
945             job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
946                                   "Unclean master daemon shutdown")
947             self.UpdateJobUnlocked(job)
948
949         logging.info("Job queue inspection finished")
950       finally:
951         self.release()
952     except:
953       self._wpool.TerminateWorkers()
954       raise
955
956   @locking.ssynchronized(_LOCK)
957   @_RequireOpenQueue
958   def AddNode(self, node):
959     """Register a new node with the queue.
960
961     @type node: L{objects.Node}
962     @param node: the node object to be added
963
964     """
965     node_name = node.name
966     assert node_name != self._my_hostname
967
968     # Clean queue directory on added node
969     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
970     msg = result.fail_msg
971     if msg:
972       logging.warning("Cannot cleanup queue directory on node %s: %s",
973                       node_name, msg)
974
975     if not node.master_candidate:
976       # remove if existing, ignoring errors
977       self._nodes.pop(node_name, None)
978       # and skip the replication of the job ids
979       return
980
981     # Upload the whole queue excluding archived jobs
982     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
983
984     # Upload current serial file
985     files.append(constants.JOB_QUEUE_SERIAL_FILE)
986
987     for file_name in files:
988       # Read file content
989       content = utils.ReadFile(file_name)
990
991       result = rpc.RpcRunner.call_jobqueue_update([node_name],
992                                                   [node.primary_ip],
993                                                   file_name, content)
994       msg = result[node_name].fail_msg
995       if msg:
996         logging.error("Failed to upload file %s to node %s: %s",
997                       file_name, node_name, msg)
998
999     self._nodes[node_name] = node.primary_ip
1000
1001   @locking.ssynchronized(_LOCK)
1002   @_RequireOpenQueue
1003   def RemoveNode(self, node_name):
1004     """Callback called when removing nodes from the cluster.
1005
1006     @type node_name: str
1007     @param node_name: the name of the node to remove
1008
1009     """
1010     self._nodes.pop(node_name, None)
1011
1012   @staticmethod
1013   def _CheckRpcResult(result, nodes, failmsg):
1014     """Verifies the status of an RPC call.
1015
1016     Since we aim to keep consistency should this node (the current
1017     master) fail, we will log errors if our rpc fail, and especially
1018     log the case when more than half of the nodes fails.
1019
1020     @param result: the data as returned from the rpc call
1021     @type nodes: list
1022     @param nodes: the list of nodes we made the call to
1023     @type failmsg: str
1024     @param failmsg: the identifier to be used for logging
1025
1026     """
1027     failed = []
1028     success = []
1029
1030     for node in nodes:
1031       msg = result[node].fail_msg
1032       if msg:
1033         failed.append(node)
1034         logging.error("RPC call %s (%s) failed on node %s: %s",
1035                       result[node].call, failmsg, node, msg)
1036       else:
1037         success.append(node)
1038
1039     # +1 for the master node
1040     if (len(success) + 1) < len(failed):
1041       # TODO: Handle failing nodes
1042       logging.error("More than half of the nodes failed")
1043
1044   def _GetNodeIp(self):
1045     """Helper for returning the node name/ip list.
1046
1047     @rtype: (list, list)
1048     @return: a tuple of two lists, the first one with the node
1049         names and the second one with the node addresses
1050
1051     """
1052     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1053     name_list = self._nodes.keys()
1054     addr_list = [self._nodes[name] for name in name_list]
1055     return name_list, addr_list
1056
1057   def _UpdateJobQueueFile(self, file_name, data, replicate):
1058     """Writes a file locally and then replicates it to all nodes.
1059
1060     This function will replace the contents of a file on the local
1061     node and then replicate it to all the other nodes we have.
1062
1063     @type file_name: str
1064     @param file_name: the path of the file to be replicated
1065     @type data: str
1066     @param data: the new contents of the file
1067     @type replicate: boolean
1068     @param replicate: whether to spread the changes to the remote nodes
1069
1070     """
1071     utils.WriteFile(file_name, data=data)
1072
1073     if replicate:
1074       names, addrs = self._GetNodeIp()
1075       result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1076       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1077
1078   def _RenameFilesUnlocked(self, rename):
1079     """Renames a file locally and then replicate the change.
1080
1081     This function will rename a file in the local queue directory
1082     and then replicate this rename to all the other nodes we have.
1083
1084     @type rename: list of (old, new)
1085     @param rename: List containing tuples mapping old to new names
1086
1087     """
1088     # Rename them locally
1089     for old, new in rename:
1090       utils.RenameFile(old, new, mkdir=True)
1091
1092     # ... and on all nodes
1093     names, addrs = self._GetNodeIp()
1094     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1095     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1096
1097   @staticmethod
1098   def _FormatJobID(job_id):
1099     """Convert a job ID to string format.
1100
1101     Currently this just does C{str(job_id)} after performing some
1102     checks, but if we want to change the job id format this will
1103     abstract this change.
1104
1105     @type job_id: int or long
1106     @param job_id: the numeric job id
1107     @rtype: str
1108     @return: the formatted job id
1109
1110     """
1111     if not isinstance(job_id, (int, long)):
1112       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1113     if job_id < 0:
1114       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1115
1116     return str(job_id)
1117
1118   @classmethod
1119   def _GetArchiveDirectory(cls, job_id):
1120     """Returns the archive directory for a job.
1121
1122     @type job_id: str
1123     @param job_id: Job identifier
1124     @rtype: str
1125     @return: Directory name
1126
1127     """
1128     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1129
1130   def _NewSerialsUnlocked(self, count):
1131     """Generates a new job identifier.
1132
1133     Job identifiers are unique during the lifetime of a cluster.
1134
1135     @type count: integer
1136     @param count: how many serials to return
1137     @rtype: str
1138     @return: a string representing the job identifier.
1139
1140     """
1141     assert count > 0
1142     # New number
1143     serial = self._last_serial + count
1144
1145     # Write to file
1146     self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1147                              "%s\n" % serial, True)
1148
1149     result = [self._FormatJobID(v)
1150               for v in range(self._last_serial, serial + 1)]
1151     # Keep it only if we were able to write the file
1152     self._last_serial = serial
1153
1154     return result
1155
1156   @staticmethod
1157   def _GetJobPath(job_id):
1158     """Returns the job file for a given job id.
1159
1160     @type job_id: str
1161     @param job_id: the job identifier
1162     @rtype: str
1163     @return: the path to the job file
1164
1165     """
1166     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1167
1168   @classmethod
1169   def _GetArchivedJobPath(cls, job_id):
1170     """Returns the archived job file for a give job id.
1171
1172     @type job_id: str
1173     @param job_id: the job identifier
1174     @rtype: str
1175     @return: the path to the archived job file
1176
1177     """
1178     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1179                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1180
1181   def _GetJobIDsUnlocked(self, sort=True):
1182     """Return all known job IDs.
1183
1184     The method only looks at disk because it's a requirement that all
1185     jobs are present on disk (so in the _memcache we don't have any
1186     extra IDs).
1187
1188     @type sort: boolean
1189     @param sort: perform sorting on the returned job ids
1190     @rtype: list
1191     @return: the list of job IDs
1192
1193     """
1194     jlist = []
1195     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1196       m = self._RE_JOB_FILE.match(filename)
1197       if m:
1198         jlist.append(m.group(1))
1199     if sort:
1200       jlist = utils.NiceSort(jlist)
1201     return jlist
1202
1203   def _LoadJobUnlocked(self, job_id):
1204     """Loads a job from the disk or memory.
1205
1206     Given a job id, this will return the cached job object if
1207     existing, or try to load the job from the disk. If loading from
1208     disk, it will also add the job to the cache.
1209
1210     @param job_id: the job id
1211     @rtype: L{_QueuedJob} or None
1212     @return: either None or the job object
1213
1214     """
1215     job = self._memcache.get(job_id, None)
1216     if job:
1217       logging.debug("Found job %s in memcache", job_id)
1218       return job
1219
1220     try:
1221       job = self._LoadJobFromDisk(job_id)
1222       if job is None:
1223         return job
1224     except errors.JobFileCorrupted:
1225       old_path = self._GetJobPath(job_id)
1226       new_path = self._GetArchivedJobPath(job_id)
1227       if old_path == new_path:
1228         # job already archived (future case)
1229         logging.exception("Can't parse job %s", job_id)
1230       else:
1231         # non-archived case
1232         logging.exception("Can't parse job %s, will archive.", job_id)
1233         self._RenameFilesUnlocked([(old_path, new_path)])
1234       return None
1235
1236     self._memcache[job_id] = job
1237     logging.debug("Added job %s to the cache", job_id)
1238     return job
1239
1240   def _LoadJobFromDisk(self, job_id):
1241     """Load the given job file from disk.
1242
1243     Given a job file, read, load and restore it in a _QueuedJob format.
1244
1245     @type job_id: string
1246     @param job_id: job identifier
1247     @rtype: L{_QueuedJob} or None
1248     @return: either None or the job object
1249
1250     """
1251     filepath = self._GetJobPath(job_id)
1252     logging.debug("Loading job from %s", filepath)
1253     try:
1254       raw_data = utils.ReadFile(filepath)
1255     except EnvironmentError, err:
1256       if err.errno in (errno.ENOENT, ):
1257         return None
1258       raise
1259
1260     try:
1261       data = serializer.LoadJson(raw_data)
1262       job = _QueuedJob.Restore(self, data)
1263     except Exception, err: # pylint: disable-msg=W0703
1264       raise errors.JobFileCorrupted(err)
1265
1266     return job
1267
1268   def SafeLoadJobFromDisk(self, job_id):
1269     """Load the given job file from disk.
1270
1271     Given a job file, read, load and restore it in a _QueuedJob format.
1272     In case of error reading the job, it gets returned as None, and the
1273     exception is logged.
1274
1275     @type job_id: string
1276     @param job_id: job identifier
1277     @rtype: L{_QueuedJob} or None
1278     @return: either None or the job object
1279
1280     """
1281     try:
1282       return self._LoadJobFromDisk(job_id)
1283     except (errors.JobFileCorrupted, EnvironmentError):
1284       logging.exception("Can't load/parse job %s", job_id)
1285       return None
1286
1287   @staticmethod
1288   def _IsQueueMarkedDrain():
1289     """Check if the queue is marked from drain.
1290
1291     This currently uses the queue drain file, which makes it a
1292     per-node flag. In the future this can be moved to the config file.
1293
1294     @rtype: boolean
1295     @return: True of the job queue is marked for draining
1296
1297     """
1298     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1299
1300   def _UpdateQueueSizeUnlocked(self):
1301     """Update the queue size.
1302
1303     """
1304     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1305
1306   @locking.ssynchronized(_LOCK)
1307   @_RequireOpenQueue
1308   def SetDrainFlag(self, drain_flag):
1309     """Sets the drain flag for the queue.
1310
1311     @type drain_flag: boolean
1312     @param drain_flag: Whether to set or unset the drain flag
1313
1314     """
1315     if drain_flag:
1316       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1317     else:
1318       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1319
1320     self._drained = drain_flag
1321
1322     return True
1323
1324   @_RequireOpenQueue
1325   def _SubmitJobUnlocked(self, job_id, ops):
1326     """Create and store a new job.
1327
1328     This enters the job into our job queue and also puts it on the new
1329     queue, in order for it to be picked up by the queue processors.
1330
1331     @type job_id: job ID
1332     @param job_id: the job ID for the new job
1333     @type ops: list
1334     @param ops: The list of OpCodes that will become the new job.
1335     @rtype: L{_QueuedJob}
1336     @return: the job object to be queued
1337     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1338     @raise errors.JobQueueFull: if the job queue has too many jobs in it
1339
1340     """
1341     # Ok when sharing the big job queue lock, as the drain file is created when
1342     # the lock is exclusive.
1343     if self._drained:
1344       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1345
1346     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1347       raise errors.JobQueueFull()
1348
1349     job = _QueuedJob(self, job_id, ops)
1350
1351     # Write to disk
1352     self.UpdateJobUnlocked(job)
1353
1354     self._queue_size += 1
1355
1356     logging.debug("Adding new job %s to the cache", job_id)
1357     self._memcache[job_id] = job
1358
1359     return job
1360
1361   @locking.ssynchronized(_LOCK)
1362   @_RequireOpenQueue
1363   def SubmitJob(self, ops):
1364     """Create and store a new job.
1365
1366     @see: L{_SubmitJobUnlocked}
1367
1368     """
1369     job_id = self._NewSerialsUnlocked(1)[0]
1370     self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1371     return job_id
1372
1373   @locking.ssynchronized(_LOCK)
1374   @_RequireOpenQueue
1375   def SubmitManyJobs(self, jobs):
1376     """Create and store multiple jobs.
1377
1378     @see: L{_SubmitJobUnlocked}
1379
1380     """
1381     results = []
1382     tasks = []
1383     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1384     for job_id, ops in zip(all_job_ids, jobs):
1385       try:
1386         tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1387         status = True
1388         data = job_id
1389       except errors.GenericError, err:
1390         data = str(err)
1391         status = False
1392       results.append((status, data))
1393     self._wpool.AddManyTasks(tasks)
1394
1395     return results
1396
1397   @_RequireOpenQueue
1398   def UpdateJobUnlocked(self, job, replicate=True):
1399     """Update a job's on disk storage.
1400
1401     After a job has been modified, this function needs to be called in
1402     order to write the changes to disk and replicate them to the other
1403     nodes.
1404
1405     @type job: L{_QueuedJob}
1406     @param job: the changed job
1407     @type replicate: boolean
1408     @param replicate: whether to replicate the change to remote nodes
1409
1410     """
1411     filename = self._GetJobPath(job.id)
1412     data = serializer.DumpJson(job.Serialize(), indent=False)
1413     logging.debug("Writing job %s to %s", job.id, filename)
1414     self._UpdateJobQueueFile(filename, data, replicate)
1415
1416   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1417                         timeout):
1418     """Waits for changes in a job.
1419
1420     @type job_id: string
1421     @param job_id: Job identifier
1422     @type fields: list of strings
1423     @param fields: Which fields to check for changes
1424     @type prev_job_info: list or None
1425     @param prev_job_info: Last job information returned
1426     @type prev_log_serial: int
1427     @param prev_log_serial: Last job message serial number
1428     @type timeout: float
1429     @param timeout: maximum time to wait in seconds
1430     @rtype: tuple (job info, log entries)
1431     @return: a tuple of the job information as required via
1432         the fields parameter, and the log entries as a list
1433
1434         if the job has not changed and the timeout has expired,
1435         we instead return a special value,
1436         L{constants.JOB_NOTCHANGED}, which should be interpreted
1437         as such by the clients
1438
1439     """
1440     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1441
1442     helper = _WaitForJobChangesHelper()
1443
1444     return helper(self._GetJobPath(job_id), load_fn,
1445                   fields, prev_job_info, prev_log_serial, timeout)
1446
1447   @locking.ssynchronized(_LOCK)
1448   @_RequireOpenQueue
1449   def CancelJob(self, job_id):
1450     """Cancels a job.
1451
1452     This will only succeed if the job has not started yet.
1453
1454     @type job_id: string
1455     @param job_id: job ID of job to be cancelled.
1456
1457     """
1458     logging.info("Cancelling job %s", job_id)
1459
1460     job = self._LoadJobUnlocked(job_id)
1461     if not job:
1462       logging.debug("Job %s not found", job_id)
1463       return (False, "Job %s not found" % job_id)
1464
1465     job_status = job.CalcStatus()
1466
1467     if job_status not in (constants.JOB_STATUS_QUEUED,
1468                           constants.JOB_STATUS_WAITLOCK):
1469       logging.debug("Job %s is no longer waiting in the queue", job.id)
1470       return (False, "Job %s is no longer waiting in the queue" % job.id)
1471
1472     if job_status == constants.JOB_STATUS_QUEUED:
1473       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1474                             "Job canceled by request")
1475       msg = "Job %s canceled" % job.id
1476
1477     elif job_status == constants.JOB_STATUS_WAITLOCK:
1478       # The worker will notice the new status and cancel the job
1479       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1480       msg = "Job %s will be canceled" % job.id
1481
1482     self.UpdateJobUnlocked(job)
1483
1484     return (True, msg)
1485
1486   @_RequireOpenQueue
1487   def _ArchiveJobsUnlocked(self, jobs):
1488     """Archives jobs.
1489
1490     @type jobs: list of L{_QueuedJob}
1491     @param jobs: Job objects
1492     @rtype: int
1493     @return: Number of archived jobs
1494
1495     """
1496     archive_jobs = []
1497     rename_files = []
1498     for job in jobs:
1499       if job.CalcStatus() not in constants.JOBS_FINALIZED:
1500         logging.debug("Job %s is not yet done", job.id)
1501         continue
1502
1503       archive_jobs.append(job)
1504
1505       old = self._GetJobPath(job.id)
1506       new = self._GetArchivedJobPath(job.id)
1507       rename_files.append((old, new))
1508
1509     # TODO: What if 1..n files fail to rename?
1510     self._RenameFilesUnlocked(rename_files)
1511
1512     logging.debug("Successfully archived job(s) %s",
1513                   utils.CommaJoin(job.id for job in archive_jobs))
1514
1515     # Since we haven't quite checked, above, if we succeeded or failed renaming
1516     # the files, we update the cached queue size from the filesystem. When we
1517     # get around to fix the TODO: above, we can use the number of actually
1518     # archived jobs to fix this.
1519     self._UpdateQueueSizeUnlocked()
1520     return len(archive_jobs)
1521
1522   @locking.ssynchronized(_LOCK)
1523   @_RequireOpenQueue
1524   def ArchiveJob(self, job_id):
1525     """Archives a job.
1526
1527     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1528
1529     @type job_id: string
1530     @param job_id: Job ID of job to be archived.
1531     @rtype: bool
1532     @return: Whether job was archived
1533
1534     """
1535     logging.info("Archiving job %s", job_id)
1536
1537     job = self._LoadJobUnlocked(job_id)
1538     if not job:
1539       logging.debug("Job %s not found", job_id)
1540       return False
1541
1542     return self._ArchiveJobsUnlocked([job]) == 1
1543
1544   @locking.ssynchronized(_LOCK)
1545   @_RequireOpenQueue
1546   def AutoArchiveJobs(self, age, timeout):
1547     """Archives all jobs based on age.
1548
1549     The method will archive all jobs which are older than the age
1550     parameter. For jobs that don't have an end timestamp, the start
1551     timestamp will be considered. The special '-1' age will cause
1552     archival of all jobs (that are not running or queued).
1553
1554     @type age: int
1555     @param age: the minimum age in seconds
1556
1557     """
1558     logging.info("Archiving jobs with age more than %s seconds", age)
1559
1560     now = time.time()
1561     end_time = now + timeout
1562     archived_count = 0
1563     last_touched = 0
1564
1565     all_job_ids = self._GetJobIDsUnlocked()
1566     pending = []
1567     for idx, job_id in enumerate(all_job_ids):
1568       last_touched = idx + 1
1569
1570       # Not optimal because jobs could be pending
1571       # TODO: Measure average duration for job archival and take number of
1572       # pending jobs into account.
1573       if time.time() > end_time:
1574         break
1575
1576       # Returns None if the job failed to load
1577       job = self._LoadJobUnlocked(job_id)
1578       if job:
1579         if job.end_timestamp is None:
1580           if job.start_timestamp is None:
1581             job_age = job.received_timestamp
1582           else:
1583             job_age = job.start_timestamp
1584         else:
1585           job_age = job.end_timestamp
1586
1587         if age == -1 or now - job_age[0] > age:
1588           pending.append(job)
1589
1590           # Archive 10 jobs at a time
1591           if len(pending) >= 10:
1592             archived_count += self._ArchiveJobsUnlocked(pending)
1593             pending = []
1594
1595     if pending:
1596       archived_count += self._ArchiveJobsUnlocked(pending)
1597
1598     return (archived_count, len(all_job_ids) - last_touched)
1599
1600   def QueryJobs(self, job_ids, fields):
1601     """Returns a list of jobs in queue.
1602
1603     @type job_ids: list
1604     @param job_ids: sequence of job identifiers or None for all
1605     @type fields: list
1606     @param fields: names of fields to return
1607     @rtype: list
1608     @return: list one element per job, each element being list with
1609         the requested fields
1610
1611     """
1612     jobs = []
1613     list_all = False
1614     if not job_ids:
1615       # Since files are added to/removed from the queue atomically, there's no
1616       # risk of getting the job ids in an inconsistent state.
1617       job_ids = self._GetJobIDsUnlocked()
1618       list_all = True
1619
1620     for job_id in job_ids:
1621       job = self.SafeLoadJobFromDisk(job_id)
1622       if job is not None:
1623         jobs.append(job.GetInfo(fields))
1624       elif not list_all:
1625         jobs.append(None)
1626
1627     return jobs
1628
1629   @locking.ssynchronized(_LOCK)
1630   @_RequireOpenQueue
1631   def Shutdown(self):
1632     """Stops the job queue.
1633
1634     This shutdowns all the worker threads an closes the queue.
1635
1636     """
1637     self._wpool.TerminateWorkers()
1638
1639     self._queue_filelock.Close()
1640     self._queue_filelock = None