Fix job queue behaviour when loading jobs
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking:
25 There's a single, large lock in the JobQueue class. It's used by all other
26 classes in this module.
27
28 """
29
30 import os
31 import logging
32 import threading
33 import errno
34 import re
35 import time
36 import weakref
37
38 from ganeti import constants
39 from ganeti import serializer
40 from ganeti import workerpool
41 from ganeti import opcodes
42 from ganeti import errors
43 from ganeti import mcpu
44 from ganeti import utils
45 from ganeti import jstore
46 from ganeti import rpc
47
48 from ganeti.rpc import RpcRunner
49
50 JOBQUEUE_THREADS = 25
51
52
53 def TimeStampNow():
54   return utils.SplitTime(time.time())
55
56
57 class _QueuedOpCode(object):
58   """Encasulates an opcode object.
59
60   The 'log' attribute holds the execution log and consists of tuples
61   of the form (log_serial, timestamp, level, message).
62
63   """
64   def __init__(self, op):
65     self.input = op
66     self.status = constants.OP_STATUS_QUEUED
67     self.result = None
68     self.log = []
69     self.start_timestamp = None
70     self.end_timestamp = None
71
72   @classmethod
73   def Restore(cls, state):
74     obj = _QueuedOpCode.__new__(cls)
75     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
76     obj.status = state["status"]
77     obj.result = state["result"]
78     obj.log = state["log"]
79     obj.start_timestamp = state.get("start_timestamp", None)
80     obj.end_timestamp = state.get("end_timestamp", None)
81     return obj
82
83   def Serialize(self):
84     return {
85       "input": self.input.__getstate__(),
86       "status": self.status,
87       "result": self.result,
88       "log": self.log,
89       "start_timestamp": self.start_timestamp,
90       "end_timestamp": self.end_timestamp,
91       }
92
93
94 class _QueuedJob(object):
95   """In-memory job representation.
96
97   This is what we use to track the user-submitted jobs. Locking must be taken
98   care of by users of this class.
99
100   """
101   def __init__(self, queue, job_id, ops):
102     if not ops:
103       # TODO
104       raise Exception("No opcodes")
105
106     self.queue = queue
107     self.id = job_id
108     self.ops = [_QueuedOpCode(op) for op in ops]
109     self.run_op_index = -1
110     self.log_serial = 0
111     self.received_timestamp = TimeStampNow()
112     self.start_timestamp = None
113     self.end_timestamp = None
114
115     # Condition to wait for changes
116     self.change = threading.Condition(self.queue._lock)
117
118   @classmethod
119   def Restore(cls, queue, state):
120     obj = _QueuedJob.__new__(cls)
121     obj.queue = queue
122     obj.id = state["id"]
123     obj.run_op_index = state["run_op_index"]
124     obj.received_timestamp = state.get("received_timestamp", None)
125     obj.start_timestamp = state.get("start_timestamp", None)
126     obj.end_timestamp = state.get("end_timestamp", None)
127
128     obj.ops = []
129     obj.log_serial = 0
130     for op_state in state["ops"]:
131       op = _QueuedOpCode.Restore(op_state)
132       for log_entry in op.log:
133         obj.log_serial = max(obj.log_serial, log_entry[0])
134       obj.ops.append(op)
135
136     # Condition to wait for changes
137     obj.change = threading.Condition(obj.queue._lock)
138
139     return obj
140
141   def Serialize(self):
142     return {
143       "id": self.id,
144       "ops": [op.Serialize() for op in self.ops],
145       "run_op_index": self.run_op_index,
146       "start_timestamp": self.start_timestamp,
147       "end_timestamp": self.end_timestamp,
148       "received_timestamp": self.received_timestamp,
149       }
150
151   def CalcStatus(self):
152     status = constants.JOB_STATUS_QUEUED
153
154     all_success = True
155     for op in self.ops:
156       if op.status == constants.OP_STATUS_SUCCESS:
157         continue
158
159       all_success = False
160
161       if op.status == constants.OP_STATUS_QUEUED:
162         pass
163       elif op.status == constants.OP_STATUS_WAITLOCK:
164         status = constants.JOB_STATUS_WAITLOCK
165       elif op.status == constants.OP_STATUS_RUNNING:
166         status = constants.JOB_STATUS_RUNNING
167       elif op.status == constants.OP_STATUS_ERROR:
168         status = constants.JOB_STATUS_ERROR
169         # The whole job fails if one opcode failed
170         break
171       elif op.status == constants.OP_STATUS_CANCELED:
172         status = constants.OP_STATUS_CANCELED
173         break
174
175     if all_success:
176       status = constants.JOB_STATUS_SUCCESS
177
178     return status
179
180   def GetLogEntries(self, newer_than):
181     if newer_than is None:
182       serial = -1
183     else:
184       serial = newer_than
185
186     entries = []
187     for op in self.ops:
188       entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
189
190     return entries
191
192
193 class _JobQueueWorker(workerpool.BaseWorker):
194   def _NotifyStart(self):
195     """Mark the opcode as running, not lock-waiting.
196
197     This is called from the mcpu code as a notifier function, when the
198     LU is finally about to start the Exec() method. Of course, to have
199     end-user visible results, the opcode must be initially (before
200     calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
201
202     """
203     assert self.queue, "Queue attribute is missing"
204     assert self.opcode, "Opcode attribute is missing"
205
206     self.queue.acquire()
207     try:
208       self.opcode.status = constants.OP_STATUS_RUNNING
209     finally:
210       self.queue.release()
211
212   def RunTask(self, job):
213     """Job executor.
214
215     This functions processes a job. It is closely tied to the _QueuedJob and
216     _QueuedOpCode classes.
217
218     """
219     logging.debug("Worker %s processing job %s",
220                   self.worker_id, job.id)
221     proc = mcpu.Processor(self.pool.queue.context)
222     self.queue = queue = job.queue
223     try:
224       try:
225         count = len(job.ops)
226         for idx, op in enumerate(job.ops):
227           try:
228             logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
229
230             queue.acquire()
231             try:
232               job.run_op_index = idx
233               op.status = constants.OP_STATUS_WAITLOCK
234               op.result = None
235               op.start_timestamp = TimeStampNow()
236               if idx == 0: # first opcode
237                 job.start_timestamp = op.start_timestamp
238               queue.UpdateJobUnlocked(job)
239
240               input_opcode = op.input
241             finally:
242               queue.release()
243
244             def _Log(*args):
245               """Append a log entry.
246
247               """
248               assert len(args) < 3
249
250               if len(args) == 1:
251                 log_type = constants.ELOG_MESSAGE
252                 log_msg = args[0]
253               else:
254                 log_type, log_msg = args
255
256               # The time is split to make serialization easier and not lose
257               # precision.
258               timestamp = utils.SplitTime(time.time())
259
260               queue.acquire()
261               try:
262                 job.log_serial += 1
263                 op.log.append((job.log_serial, timestamp, log_type, log_msg))
264
265                 job.change.notifyAll()
266               finally:
267                 queue.release()
268
269             # Make sure not to hold lock while _Log is called
270             self.opcode = op
271             result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
272
273             queue.acquire()
274             try:
275               op.status = constants.OP_STATUS_SUCCESS
276               op.result = result
277               op.end_timestamp = TimeStampNow()
278               queue.UpdateJobUnlocked(job)
279             finally:
280               queue.release()
281
282             logging.debug("Op %s/%s: Successfully finished %s",
283                           idx + 1, count, op)
284           except Exception, err:
285             queue.acquire()
286             try:
287               try:
288                 op.status = constants.OP_STATUS_ERROR
289                 op.result = str(err)
290                 op.end_timestamp = TimeStampNow()
291                 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
292               finally:
293                 queue.UpdateJobUnlocked(job)
294             finally:
295               queue.release()
296             raise
297
298       except errors.GenericError, err:
299         logging.exception("Ganeti exception")
300       except:
301         logging.exception("Unhandled exception")
302     finally:
303       queue.acquire()
304       try:
305         try:
306           job.run_op_idx = -1
307           job.end_timestamp = TimeStampNow()
308           queue.UpdateJobUnlocked(job)
309         finally:
310           job_id = job.id
311           status = job.CalcStatus()
312       finally:
313         queue.release()
314       logging.debug("Worker %s finished job %s, status = %s",
315                     self.worker_id, job_id, status)
316
317
318 class _JobQueueWorkerPool(workerpool.WorkerPool):
319   def __init__(self, queue):
320     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
321                                               _JobQueueWorker)
322     self.queue = queue
323
324
325 class JobQueue(object):
326   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
327
328   def _RequireOpenQueue(fn):
329     """Decorator for "public" functions.
330
331     This function should be used for all "public" functions. That is, functions
332     usually called from other classes.
333
334     Important: Use this decorator only after utils.LockedMethod!
335
336     Example:
337       @utils.LockedMethod
338       @_RequireOpenQueue
339       def Example(self):
340         pass
341
342     """
343     def wrapper(self, *args, **kwargs):
344       assert self._queue_lock is not None, "Queue should be open"
345       return fn(self, *args, **kwargs)
346     return wrapper
347
348   def __init__(self, context):
349     self.context = context
350     self._memcache = weakref.WeakValueDictionary()
351     self._my_hostname = utils.HostInfo().name
352
353     # Locking
354     self._lock = threading.Lock()
355     self.acquire = self._lock.acquire
356     self.release = self._lock.release
357
358     # Initialize
359     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
360
361     # Read serial file
362     self._last_serial = jstore.ReadSerial()
363     assert self._last_serial is not None, ("Serial file was modified between"
364                                            " check in jstore and here")
365
366     # Get initial list of nodes
367     self._nodes = set(self.context.cfg.GetNodeList())
368
369     # Remove master node
370     try:
371       self._nodes.remove(self._my_hostname)
372     except ValueError:
373       pass
374
375     # TODO: Check consistency across nodes
376
377     # Setup worker pool
378     self._wpool = _JobQueueWorkerPool(self)
379
380     # We need to lock here because WorkerPool.AddTask() may start a job while
381     # we're still doing our work.
382     self.acquire()
383     try:
384       for job in self._GetJobsUnlocked(None):
385         # a failure in loading the job can cause 'None' to be returned
386         if job is None:
387           continue
388
389         status = job.CalcStatus()
390
391         if status in (constants.JOB_STATUS_QUEUED, ):
392           self._wpool.AddTask(job)
393
394         elif status in (constants.JOB_STATUS_RUNNING,
395                         constants.JOB_STATUS_WAITLOCK):
396           logging.warning("Unfinished job %s found: %s", job.id, job)
397           try:
398             for op in job.ops:
399               op.status = constants.OP_STATUS_ERROR
400               op.result = "Unclean master daemon shutdown"
401           finally:
402             self.UpdateJobUnlocked(job)
403     finally:
404       self.release()
405
406   @utils.LockedMethod
407   @_RequireOpenQueue
408   def AddNode(self, node_name):
409     assert node_name != self._my_hostname
410
411     # Clean queue directory on added node
412     RpcRunner.call_jobqueue_purge(node_name)
413
414     # Upload the whole queue excluding archived jobs
415     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
416
417     # Upload current serial file
418     files.append(constants.JOB_QUEUE_SERIAL_FILE)
419
420     for file_name in files:
421       # Read file content
422       fd = open(file_name, "r")
423       try:
424         content = fd.read()
425       finally:
426         fd.close()
427
428       result = RpcRunner.call_jobqueue_update([node_name], file_name, content)
429       if not result[node_name]:
430         logging.error("Failed to upload %s to %s", file_name, node_name)
431
432     self._nodes.add(node_name)
433
434   @utils.LockedMethod
435   @_RequireOpenQueue
436   def RemoveNode(self, node_name):
437     try:
438       # The queue is removed by the "leave node" RPC call.
439       self._nodes.remove(node_name)
440     except KeyError:
441       pass
442
443   def _CheckRpcResult(self, result, nodes, failmsg):
444     failed = []
445     success = []
446
447     for node in nodes:
448       if result[node]:
449         success.append(node)
450       else:
451         failed.append(node)
452
453     if failed:
454       logging.error("%s failed on %s", failmsg, ", ".join(failed))
455
456     # +1 for the master node
457     if (len(success) + 1) < len(failed):
458       # TODO: Handle failing nodes
459       logging.error("More than half of the nodes failed")
460
461   def _WriteAndReplicateFileUnlocked(self, file_name, data):
462     """Writes a file locally and then replicates it to all nodes.
463
464     """
465     utils.WriteFile(file_name, data=data)
466
467     result = RpcRunner.call_jobqueue_update(self._nodes, file_name, data)
468     self._CheckRpcResult(result, self._nodes,
469                          "Updating %s" % file_name)
470
471   def _RenameFileUnlocked(self, old, new):
472     os.rename(old, new)
473
474     result = RpcRunner.call_jobqueue_rename(self._nodes, old, new)
475     self._CheckRpcResult(result, self._nodes,
476                          "Moving %s to %s" % (old, new))
477
478   def _FormatJobID(self, job_id):
479     if not isinstance(job_id, (int, long)):
480       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
481     if job_id < 0:
482       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
483
484     return str(job_id)
485
486   def _NewSerialUnlocked(self):
487     """Generates a new job identifier.
488
489     Job identifiers are unique during the lifetime of a cluster.
490
491     Returns: A string representing the job identifier.
492
493     """
494     # New number
495     serial = self._last_serial + 1
496
497     # Write to file
498     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
499                                         "%s\n" % serial)
500
501     # Keep it only if we were able to write the file
502     self._last_serial = serial
503
504     return self._FormatJobID(serial)
505
506   @staticmethod
507   def _GetJobPath(job_id):
508     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
509
510   @staticmethod
511   def _GetArchivedJobPath(job_id):
512     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
513
514   @classmethod
515   def _ExtractJobID(cls, name):
516     m = cls._RE_JOB_FILE.match(name)
517     if m:
518       return m.group(1)
519     else:
520       return None
521
522   def _GetJobIDsUnlocked(self, archived=False):
523     """Return all known job IDs.
524
525     If the parameter archived is True, archived jobs IDs will be
526     included. Currently this argument is unused.
527
528     The method only looks at disk because it's a requirement that all
529     jobs are present on disk (so in the _memcache we don't have any
530     extra IDs).
531
532     """
533     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
534     jlist = utils.NiceSort(jlist)
535     return jlist
536
537   def _ListJobFiles(self):
538     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
539             if self._RE_JOB_FILE.match(name)]
540
541   def _LoadJobUnlocked(self, job_id):
542     job = self._memcache.get(job_id, None)
543     if job:
544       logging.debug("Found job %s in memcache", job_id)
545       return job
546
547     filepath = self._GetJobPath(job_id)
548     logging.debug("Loading job from %s", filepath)
549     try:
550       fd = open(filepath, "r")
551     except IOError, err:
552       if err.errno in (errno.ENOENT, ):
553         return None
554       raise
555     try:
556       data = serializer.LoadJson(fd.read())
557     finally:
558       fd.close()
559
560     try:
561       job = _QueuedJob.Restore(self, data)
562     except Exception, err:
563       new_path = self._GetArchivedJobPath(job_id)
564       if filepath == new_path:
565         # job already archived (future case)
566         logging.exception("Can't parse job %s", job_id)
567       else:
568         # non-archived case
569         logging.exception("Can't parse job %s, will archive.", job_id)
570         self._RenameFileUnlocked(filepath, new_path)
571       return None
572
573     self._memcache[job_id] = job
574     logging.debug("Added job %s to the cache", job_id)
575     return job
576
577   def _GetJobsUnlocked(self, job_ids):
578     if not job_ids:
579       job_ids = self._GetJobIDsUnlocked()
580
581     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
582
583   @staticmethod
584   def _IsQueueMarkedDrain():
585     """Check if the queue is marked from drain.
586
587     This currently uses the queue drain file, which makes it a
588     per-node flag. In the future this can be moved to the config file.
589
590     """
591     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
592
593   @staticmethod
594   def SetDrainFlag(drain_flag):
595     """Sets the drain flag for the queue.
596
597     This is similar to the function L{backend.JobQueueSetDrainFlag},
598     and in the future we might merge them.
599
600     """
601     if drain_flag:
602       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
603     else:
604       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
605     return True
606
607   @utils.LockedMethod
608   @_RequireOpenQueue
609   def SubmitJob(self, ops):
610     """Create and store a new job.
611
612     This enters the job into our job queue and also puts it on the new
613     queue, in order for it to be picked up by the queue processors.
614
615     @type ops: list
616     @param ops: The list of OpCodes that will become the new job.
617
618     """
619     if self._IsQueueMarkedDrain():
620       raise errors.JobQueueDrainError()
621     # Get job identifier
622     job_id = self._NewSerialUnlocked()
623     job = _QueuedJob(self, job_id, ops)
624
625     # Write to disk
626     self.UpdateJobUnlocked(job)
627
628     logging.debug("Adding new job %s to the cache", job_id)
629     self._memcache[job_id] = job
630
631     # Add to worker pool
632     self._wpool.AddTask(job)
633
634     return job.id
635
636   @_RequireOpenQueue
637   def UpdateJobUnlocked(self, job):
638     filename = self._GetJobPath(job.id)
639     data = serializer.DumpJson(job.Serialize(), indent=False)
640     logging.debug("Writing job %s to %s", job.id, filename)
641     self._WriteAndReplicateFileUnlocked(filename, data)
642
643     # Notify waiters about potential changes
644     job.change.notifyAll()
645
646   @utils.LockedMethod
647   @_RequireOpenQueue
648   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
649                         timeout):
650     """Waits for changes in a job.
651
652     @type job_id: string
653     @param job_id: Job identifier
654     @type fields: list of strings
655     @param fields: Which fields to check for changes
656     @type prev_job_info: list or None
657     @param prev_job_info: Last job information returned
658     @type prev_log_serial: int
659     @param prev_log_serial: Last job message serial number
660     @type timeout: float
661     @param timeout: maximum time to wait
662
663     """
664     logging.debug("Waiting for changes in job %s", job_id)
665     end_time = time.time() + timeout
666     while True:
667       delta_time = end_time - time.time()
668       if delta_time < 0:
669         return constants.JOB_NOTCHANGED
670
671       job = self._LoadJobUnlocked(job_id)
672       if not job:
673         logging.debug("Job %s not found", job_id)
674         break
675
676       status = job.CalcStatus()
677       job_info = self._GetJobInfoUnlocked(job, fields)
678       log_entries = job.GetLogEntries(prev_log_serial)
679
680       # Serializing and deserializing data can cause type changes (e.g. from
681       # tuple to list) or precision loss. We're doing it here so that we get
682       # the same modifications as the data received from the client. Without
683       # this, the comparison afterwards might fail without the data being
684       # significantly different.
685       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
686       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
687
688       if status not in (constants.JOB_STATUS_QUEUED,
689                         constants.JOB_STATUS_RUNNING,
690                         constants.JOB_STATUS_WAITLOCK):
691         # Don't even try to wait if the job is no longer running, there will be
692         # no changes.
693         break
694
695       if (prev_job_info != job_info or
696           (log_entries and prev_log_serial != log_entries[0][0])):
697         break
698
699       logging.debug("Waiting again")
700
701       # Release the queue lock while waiting
702       job.change.wait(delta_time)
703
704     logging.debug("Job %s changed", job_id)
705
706     return (job_info, log_entries)
707
708   @utils.LockedMethod
709   @_RequireOpenQueue
710   def CancelJob(self, job_id):
711     """Cancels a job.
712
713     @type job_id: string
714     @param job_id: Job ID of job to be cancelled.
715
716     """
717     logging.debug("Cancelling job %s", job_id)
718
719     job = self._LoadJobUnlocked(job_id)
720     if not job:
721       logging.debug("Job %s not found", job_id)
722       return
723
724     if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
725       logging.debug("Job %s is no longer in the queue", job.id)
726       return
727
728     try:
729       for op in job.ops:
730         op.status = constants.OP_STATUS_ERROR
731         op.result = "Job cancelled by request"
732     finally:
733       self.UpdateJobUnlocked(job)
734
735   @_RequireOpenQueue
736   def _ArchiveJobUnlocked(self, job_id):
737     """Archives a job.
738
739     @type job_id: string
740     @param job_id: Job ID of job to be archived.
741
742     """
743     logging.info("Archiving job %s", job_id)
744
745     job = self._LoadJobUnlocked(job_id)
746     if not job:
747       logging.debug("Job %s not found", job_id)
748       return
749
750     if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
751                                 constants.JOB_STATUS_SUCCESS,
752                                 constants.JOB_STATUS_ERROR):
753       logging.debug("Job %s is not yet done", job.id)
754       return
755
756     old = self._GetJobPath(job.id)
757     new = self._GetArchivedJobPath(job.id)
758
759     self._RenameFileUnlocked(old, new)
760
761     logging.debug("Successfully archived job %s", job.id)
762
763   @utils.LockedMethod
764   @_RequireOpenQueue
765   def ArchiveJob(self, job_id):
766     """Archives a job.
767
768     @type job_id: string
769     @param job_id: Job ID of job to be archived.
770
771     """
772     return self._ArchiveJobUnlocked(job_id)
773
774   @utils.LockedMethod
775   @_RequireOpenQueue
776   def AutoArchiveJobs(self, age):
777     """Archives all jobs based on age.
778
779     The method will archive all jobs which are older than the age
780     parameter. For jobs that don't have an end timestamp, the start
781     timestamp will be considered. The special '-1' age will cause
782     archival of all jobs (that are not running or queued).
783
784     @type age: int
785     @param age: the minimum age in seconds
786
787     """
788     logging.info("Archiving jobs with age more than %s seconds", age)
789
790     now = time.time()
791     for jid in self._GetJobIDsUnlocked(archived=False):
792       job = self._LoadJobUnlocked(jid)
793       if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
794                                   constants.OP_STATUS_ERROR,
795                                   constants.OP_STATUS_CANCELED):
796         continue
797       if job.end_timestamp is None:
798         if job.start_timestamp is None:
799           job_age = job.received_timestamp
800         else:
801           job_age = job.start_timestamp
802       else:
803         job_age = job.end_timestamp
804
805       if age == -1 or now - job_age[0] > age:
806         self._ArchiveJobUnlocked(jid)
807
808   def _GetJobInfoUnlocked(self, job, fields):
809     row = []
810     for fname in fields:
811       if fname == "id":
812         row.append(job.id)
813       elif fname == "status":
814         row.append(job.CalcStatus())
815       elif fname == "ops":
816         row.append([op.input.__getstate__() for op in job.ops])
817       elif fname == "opresult":
818         row.append([op.result for op in job.ops])
819       elif fname == "opstatus":
820         row.append([op.status for op in job.ops])
821       elif fname == "oplog":
822         row.append([op.log for op in job.ops])
823       elif fname == "opstart":
824         row.append([op.start_timestamp for op in job.ops])
825       elif fname == "opend":
826         row.append([op.end_timestamp for op in job.ops])
827       elif fname == "received_ts":
828         row.append(job.received_timestamp)
829       elif fname == "start_ts":
830         row.append(job.start_timestamp)
831       elif fname == "end_ts":
832         row.append(job.end_timestamp)
833       elif fname == "summary":
834         row.append([op.input.Summary() for op in job.ops])
835       else:
836         raise errors.OpExecError("Invalid job query field '%s'" % fname)
837     return row
838
839   @utils.LockedMethod
840   @_RequireOpenQueue
841   def QueryJobs(self, job_ids, fields):
842     """Returns a list of jobs in queue.
843
844     Args:
845     - job_ids: Sequence of job identifiers or None for all
846     - fields: Names of fields to return
847
848     """
849     jobs = []
850
851     for job in self._GetJobsUnlocked(job_ids):
852       if job is None:
853         jobs.append(None)
854       else:
855         jobs.append(self._GetJobInfoUnlocked(job, fields))
856
857     return jobs
858
859   @utils.LockedMethod
860   @_RequireOpenQueue
861   def Shutdown(self):
862     """Stops the job queue.
863
864     """
865     self._wpool.TerminateWorkers()
866
867     self._queue_lock.Close()
868     self._queue_lock = None