Convert the job queue rpcs to address-based
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking:
25 There's a single, large lock in the JobQueue class. It's used by all other
26 classes in this module.
27
28 """
29
30 import os
31 import logging
32 import threading
33 import errno
34 import re
35 import time
36 import weakref
37
38 from ganeti import constants
39 from ganeti import serializer
40 from ganeti import workerpool
41 from ganeti import opcodes
42 from ganeti import errors
43 from ganeti import mcpu
44 from ganeti import utils
45 from ganeti import jstore
46 from ganeti import rpc
47
48 from ganeti.rpc import RpcRunner
49
50 JOBQUEUE_THREADS = 25
51
52
53 def TimeStampNow():
54   return utils.SplitTime(time.time())
55
56
57 class _QueuedOpCode(object):
58   """Encasulates an opcode object.
59
60   The 'log' attribute holds the execution log and consists of tuples
61   of the form (log_serial, timestamp, level, message).
62
63   """
64   def __init__(self, op):
65     self.input = op
66     self.status = constants.OP_STATUS_QUEUED
67     self.result = None
68     self.log = []
69     self.start_timestamp = None
70     self.end_timestamp = None
71
72   @classmethod
73   def Restore(cls, state):
74     obj = _QueuedOpCode.__new__(cls)
75     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
76     obj.status = state["status"]
77     obj.result = state["result"]
78     obj.log = state["log"]
79     obj.start_timestamp = state.get("start_timestamp", None)
80     obj.end_timestamp = state.get("end_timestamp", None)
81     return obj
82
83   def Serialize(self):
84     return {
85       "input": self.input.__getstate__(),
86       "status": self.status,
87       "result": self.result,
88       "log": self.log,
89       "start_timestamp": self.start_timestamp,
90       "end_timestamp": self.end_timestamp,
91       }
92
93
94 class _QueuedJob(object):
95   """In-memory job representation.
96
97   This is what we use to track the user-submitted jobs. Locking must be taken
98   care of by users of this class.
99
100   """
101   def __init__(self, queue, job_id, ops):
102     if not ops:
103       # TODO
104       raise Exception("No opcodes")
105
106     self.queue = queue
107     self.id = job_id
108     self.ops = [_QueuedOpCode(op) for op in ops]
109     self.run_op_index = -1
110     self.log_serial = 0
111     self.received_timestamp = TimeStampNow()
112     self.start_timestamp = None
113     self.end_timestamp = None
114
115     # Condition to wait for changes
116     self.change = threading.Condition(self.queue._lock)
117
118   @classmethod
119   def Restore(cls, queue, state):
120     obj = _QueuedJob.__new__(cls)
121     obj.queue = queue
122     obj.id = state["id"]
123     obj.run_op_index = state["run_op_index"]
124     obj.received_timestamp = state.get("received_timestamp", None)
125     obj.start_timestamp = state.get("start_timestamp", None)
126     obj.end_timestamp = state.get("end_timestamp", None)
127
128     obj.ops = []
129     obj.log_serial = 0
130     for op_state in state["ops"]:
131       op = _QueuedOpCode.Restore(op_state)
132       for log_entry in op.log:
133         obj.log_serial = max(obj.log_serial, log_entry[0])
134       obj.ops.append(op)
135
136     # Condition to wait for changes
137     obj.change = threading.Condition(obj.queue._lock)
138
139     return obj
140
141   def Serialize(self):
142     return {
143       "id": self.id,
144       "ops": [op.Serialize() for op in self.ops],
145       "run_op_index": self.run_op_index,
146       "start_timestamp": self.start_timestamp,
147       "end_timestamp": self.end_timestamp,
148       "received_timestamp": self.received_timestamp,
149       }
150
151   def CalcStatus(self):
152     status = constants.JOB_STATUS_QUEUED
153
154     all_success = True
155     for op in self.ops:
156       if op.status == constants.OP_STATUS_SUCCESS:
157         continue
158
159       all_success = False
160
161       if op.status == constants.OP_STATUS_QUEUED:
162         pass
163       elif op.status == constants.OP_STATUS_WAITLOCK:
164         status = constants.JOB_STATUS_WAITLOCK
165       elif op.status == constants.OP_STATUS_RUNNING:
166         status = constants.JOB_STATUS_RUNNING
167       elif op.status == constants.OP_STATUS_ERROR:
168         status = constants.JOB_STATUS_ERROR
169         # The whole job fails if one opcode failed
170         break
171       elif op.status == constants.OP_STATUS_CANCELED:
172         status = constants.OP_STATUS_CANCELED
173         break
174
175     if all_success:
176       status = constants.JOB_STATUS_SUCCESS
177
178     return status
179
180   def GetLogEntries(self, newer_than):
181     if newer_than is None:
182       serial = -1
183     else:
184       serial = newer_than
185
186     entries = []
187     for op in self.ops:
188       entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
189
190     return entries
191
192
193 class _JobQueueWorker(workerpool.BaseWorker):
194   def _NotifyStart(self):
195     """Mark the opcode as running, not lock-waiting.
196
197     This is called from the mcpu code as a notifier function, when the
198     LU is finally about to start the Exec() method. Of course, to have
199     end-user visible results, the opcode must be initially (before
200     calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
201
202     """
203     assert self.queue, "Queue attribute is missing"
204     assert self.opcode, "Opcode attribute is missing"
205
206     self.queue.acquire()
207     try:
208       self.opcode.status = constants.OP_STATUS_RUNNING
209     finally:
210       self.queue.release()
211
212   def RunTask(self, job):
213     """Job executor.
214
215     This functions processes a job. It is closely tied to the _QueuedJob and
216     _QueuedOpCode classes.
217
218     """
219     logging.debug("Worker %s processing job %s",
220                   self.worker_id, job.id)
221     proc = mcpu.Processor(self.pool.queue.context)
222     self.queue = queue = job.queue
223     try:
224       try:
225         count = len(job.ops)
226         for idx, op in enumerate(job.ops):
227           try:
228             logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
229
230             queue.acquire()
231             try:
232               job.run_op_index = idx
233               op.status = constants.OP_STATUS_WAITLOCK
234               op.result = None
235               op.start_timestamp = TimeStampNow()
236               if idx == 0: # first opcode
237                 job.start_timestamp = op.start_timestamp
238               queue.UpdateJobUnlocked(job)
239
240               input_opcode = op.input
241             finally:
242               queue.release()
243
244             def _Log(*args):
245               """Append a log entry.
246
247               """
248               assert len(args) < 3
249
250               if len(args) == 1:
251                 log_type = constants.ELOG_MESSAGE
252                 log_msg = args[0]
253               else:
254                 log_type, log_msg = args
255
256               # The time is split to make serialization easier and not lose
257               # precision.
258               timestamp = utils.SplitTime(time.time())
259
260               queue.acquire()
261               try:
262                 job.log_serial += 1
263                 op.log.append((job.log_serial, timestamp, log_type, log_msg))
264
265                 job.change.notifyAll()
266               finally:
267                 queue.release()
268
269             # Make sure not to hold lock while _Log is called
270             self.opcode = op
271             result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
272
273             queue.acquire()
274             try:
275               op.status = constants.OP_STATUS_SUCCESS
276               op.result = result
277               op.end_timestamp = TimeStampNow()
278               queue.UpdateJobUnlocked(job)
279             finally:
280               queue.release()
281
282             logging.debug("Op %s/%s: Successfully finished %s",
283                           idx + 1, count, op)
284           except Exception, err:
285             queue.acquire()
286             try:
287               try:
288                 op.status = constants.OP_STATUS_ERROR
289                 op.result = str(err)
290                 op.end_timestamp = TimeStampNow()
291                 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
292               finally:
293                 queue.UpdateJobUnlocked(job)
294             finally:
295               queue.release()
296             raise
297
298       except errors.GenericError, err:
299         logging.exception("Ganeti exception")
300       except:
301         logging.exception("Unhandled exception")
302     finally:
303       queue.acquire()
304       try:
305         try:
306           job.run_op_idx = -1
307           job.end_timestamp = TimeStampNow()
308           queue.UpdateJobUnlocked(job)
309         finally:
310           job_id = job.id
311           status = job.CalcStatus()
312       finally:
313         queue.release()
314       logging.debug("Worker %s finished job %s, status = %s",
315                     self.worker_id, job_id, status)
316
317
318 class _JobQueueWorkerPool(workerpool.WorkerPool):
319   def __init__(self, queue):
320     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
321                                               _JobQueueWorker)
322     self.queue = queue
323
324
325 class JobQueue(object):
326   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
327
328   def _RequireOpenQueue(fn):
329     """Decorator for "public" functions.
330
331     This function should be used for all "public" functions. That is, functions
332     usually called from other classes.
333
334     Important: Use this decorator only after utils.LockedMethod!
335
336     Example:
337       @utils.LockedMethod
338       @_RequireOpenQueue
339       def Example(self):
340         pass
341
342     """
343     def wrapper(self, *args, **kwargs):
344       assert self._queue_lock is not None, "Queue should be open"
345       return fn(self, *args, **kwargs)
346     return wrapper
347
348   def __init__(self, context):
349     self.context = context
350     self._memcache = weakref.WeakValueDictionary()
351     self._my_hostname = utils.HostInfo().name
352
353     # Locking
354     self._lock = threading.Lock()
355     self.acquire = self._lock.acquire
356     self.release = self._lock.release
357
358     # Initialize
359     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
360
361     # Read serial file
362     self._last_serial = jstore.ReadSerial()
363     assert self._last_serial is not None, ("Serial file was modified between"
364                                            " check in jstore and here")
365
366     # Get initial list of nodes
367     self._nodes = dict((n.name, n.primary_ip)
368                        for n in self.context.cfg.GetAllNodesInfo().values())
369
370     # Remove master node
371     try:
372       del self._nodes[self._my_hostname]
373     except ValueError:
374       pass
375
376     # TODO: Check consistency across nodes
377
378     # Setup worker pool
379     self._wpool = _JobQueueWorkerPool(self)
380
381     # We need to lock here because WorkerPool.AddTask() may start a job while
382     # we're still doing our work.
383     self.acquire()
384     try:
385       for job in self._GetJobsUnlocked(None):
386         # a failure in loading the job can cause 'None' to be returned
387         if job is None:
388           continue
389
390         status = job.CalcStatus()
391
392         if status in (constants.JOB_STATUS_QUEUED, ):
393           self._wpool.AddTask(job)
394
395         elif status in (constants.JOB_STATUS_RUNNING,
396                         constants.JOB_STATUS_WAITLOCK):
397           logging.warning("Unfinished job %s found: %s", job.id, job)
398           try:
399             for op in job.ops:
400               op.status = constants.OP_STATUS_ERROR
401               op.result = "Unclean master daemon shutdown"
402           finally:
403             self.UpdateJobUnlocked(job)
404     finally:
405       self.release()
406
407   @utils.LockedMethod
408   @_RequireOpenQueue
409   def AddNode(self, node):
410     """Register a new node with the queue.
411
412     @type node: L{objects.Node}
413     @param node: the node object to be added
414
415     """
416     node_name = node.name
417     assert node_name != self._my_hostname
418
419     # Clean queue directory on added node
420     RpcRunner.call_jobqueue_purge(node_name)
421
422     # Upload the whole queue excluding archived jobs
423     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
424
425     # Upload current serial file
426     files.append(constants.JOB_QUEUE_SERIAL_FILE)
427
428     for file_name in files:
429       # Read file content
430       fd = open(file_name, "r")
431       try:
432         content = fd.read()
433       finally:
434         fd.close()
435
436       result = RpcRunner.call_jobqueue_update([node_name], [node.primary_ip],
437                                               file_name, content)
438       if not result[node_name]:
439         logging.error("Failed to upload %s to %s", file_name, node_name)
440
441     self._nodes[node_name] = node.primary_ip
442
443   @utils.LockedMethod
444   @_RequireOpenQueue
445   def RemoveNode(self, node_name):
446     try:
447       # The queue is removed by the "leave node" RPC call.
448       del self._nodes[node_name]
449     except KeyError:
450       pass
451
452   def _CheckRpcResult(self, result, nodes, failmsg):
453     failed = []
454     success = []
455
456     for node in nodes:
457       if result[node]:
458         success.append(node)
459       else:
460         failed.append(node)
461
462     if failed:
463       logging.error("%s failed on %s", failmsg, ", ".join(failed))
464
465     # +1 for the master node
466     if (len(success) + 1) < len(failed):
467       # TODO: Handle failing nodes
468       logging.error("More than half of the nodes failed")
469
470   def _GetNodeIp(self):
471     """Helper for returning the node name/ip list.
472
473     """
474     name_list = self._nodes.keys()
475     addr_list = [self._nodes[name] for name in name_list]
476     return name_list, addr_list
477
478   def _WriteAndReplicateFileUnlocked(self, file_name, data):
479     """Writes a file locally and then replicates it to all nodes.
480
481     """
482     utils.WriteFile(file_name, data=data)
483
484     names, addrs = self._GetNodeIp()
485     result = RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
486     self._CheckRpcResult(result, self._nodes,
487                          "Updating %s" % file_name)
488
489   def _RenameFileUnlocked(self, old, new):
490     os.rename(old, new)
491
492     names, addrs = self._GetNodeIp()
493     result = RpcRunner.call_jobqueue_rename(names, addrs, old, new)
494     self._CheckRpcResult(result, self._nodes,
495                          "Moving %s to %s" % (old, new))
496
497   def _FormatJobID(self, job_id):
498     if not isinstance(job_id, (int, long)):
499       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
500     if job_id < 0:
501       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
502
503     return str(job_id)
504
505   def _NewSerialUnlocked(self):
506     """Generates a new job identifier.
507
508     Job identifiers are unique during the lifetime of a cluster.
509
510     Returns: A string representing the job identifier.
511
512     """
513     # New number
514     serial = self._last_serial + 1
515
516     # Write to file
517     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
518                                         "%s\n" % serial)
519
520     # Keep it only if we were able to write the file
521     self._last_serial = serial
522
523     return self._FormatJobID(serial)
524
525   @staticmethod
526   def _GetJobPath(job_id):
527     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
528
529   @staticmethod
530   def _GetArchivedJobPath(job_id):
531     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
532
533   @classmethod
534   def _ExtractJobID(cls, name):
535     m = cls._RE_JOB_FILE.match(name)
536     if m:
537       return m.group(1)
538     else:
539       return None
540
541   def _GetJobIDsUnlocked(self, archived=False):
542     """Return all known job IDs.
543
544     If the parameter archived is True, archived jobs IDs will be
545     included. Currently this argument is unused.
546
547     The method only looks at disk because it's a requirement that all
548     jobs are present on disk (so in the _memcache we don't have any
549     extra IDs).
550
551     """
552     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
553     jlist = utils.NiceSort(jlist)
554     return jlist
555
556   def _ListJobFiles(self):
557     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
558             if self._RE_JOB_FILE.match(name)]
559
560   def _LoadJobUnlocked(self, job_id):
561     job = self._memcache.get(job_id, None)
562     if job:
563       logging.debug("Found job %s in memcache", job_id)
564       return job
565
566     filepath = self._GetJobPath(job_id)
567     logging.debug("Loading job from %s", filepath)
568     try:
569       fd = open(filepath, "r")
570     except IOError, err:
571       if err.errno in (errno.ENOENT, ):
572         return None
573       raise
574     try:
575       data = serializer.LoadJson(fd.read())
576     finally:
577       fd.close()
578
579     try:
580       job = _QueuedJob.Restore(self, data)
581     except Exception, err:
582       new_path = self._GetArchivedJobPath(job_id)
583       if filepath == new_path:
584         # job already archived (future case)
585         logging.exception("Can't parse job %s", job_id)
586       else:
587         # non-archived case
588         logging.exception("Can't parse job %s, will archive.", job_id)
589         self._RenameFileUnlocked(filepath, new_path)
590       return None
591
592     self._memcache[job_id] = job
593     logging.debug("Added job %s to the cache", job_id)
594     return job
595
596   def _GetJobsUnlocked(self, job_ids):
597     if not job_ids:
598       job_ids = self._GetJobIDsUnlocked()
599
600     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
601
602   @staticmethod
603   def _IsQueueMarkedDrain():
604     """Check if the queue is marked from drain.
605
606     This currently uses the queue drain file, which makes it a
607     per-node flag. In the future this can be moved to the config file.
608
609     """
610     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
611
612   @staticmethod
613   def SetDrainFlag(drain_flag):
614     """Sets the drain flag for the queue.
615
616     This is similar to the function L{backend.JobQueueSetDrainFlag},
617     and in the future we might merge them.
618
619     """
620     if drain_flag:
621       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
622     else:
623       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
624     return True
625
626   @utils.LockedMethod
627   @_RequireOpenQueue
628   def SubmitJob(self, ops):
629     """Create and store a new job.
630
631     This enters the job into our job queue and also puts it on the new
632     queue, in order for it to be picked up by the queue processors.
633
634     @type ops: list
635     @param ops: The list of OpCodes that will become the new job.
636
637     """
638     if self._IsQueueMarkedDrain():
639       raise errors.JobQueueDrainError()
640     # Get job identifier
641     job_id = self._NewSerialUnlocked()
642     job = _QueuedJob(self, job_id, ops)
643
644     # Write to disk
645     self.UpdateJobUnlocked(job)
646
647     logging.debug("Adding new job %s to the cache", job_id)
648     self._memcache[job_id] = job
649
650     # Add to worker pool
651     self._wpool.AddTask(job)
652
653     return job.id
654
655   @_RequireOpenQueue
656   def UpdateJobUnlocked(self, job):
657     filename = self._GetJobPath(job.id)
658     data = serializer.DumpJson(job.Serialize(), indent=False)
659     logging.debug("Writing job %s to %s", job.id, filename)
660     self._WriteAndReplicateFileUnlocked(filename, data)
661
662     # Notify waiters about potential changes
663     job.change.notifyAll()
664
665   @utils.LockedMethod
666   @_RequireOpenQueue
667   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
668                         timeout):
669     """Waits for changes in a job.
670
671     @type job_id: string
672     @param job_id: Job identifier
673     @type fields: list of strings
674     @param fields: Which fields to check for changes
675     @type prev_job_info: list or None
676     @param prev_job_info: Last job information returned
677     @type prev_log_serial: int
678     @param prev_log_serial: Last job message serial number
679     @type timeout: float
680     @param timeout: maximum time to wait
681
682     """
683     logging.debug("Waiting for changes in job %s", job_id)
684     end_time = time.time() + timeout
685     while True:
686       delta_time = end_time - time.time()
687       if delta_time < 0:
688         return constants.JOB_NOTCHANGED
689
690       job = self._LoadJobUnlocked(job_id)
691       if not job:
692         logging.debug("Job %s not found", job_id)
693         break
694
695       status = job.CalcStatus()
696       job_info = self._GetJobInfoUnlocked(job, fields)
697       log_entries = job.GetLogEntries(prev_log_serial)
698
699       # Serializing and deserializing data can cause type changes (e.g. from
700       # tuple to list) or precision loss. We're doing it here so that we get
701       # the same modifications as the data received from the client. Without
702       # this, the comparison afterwards might fail without the data being
703       # significantly different.
704       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
705       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
706
707       if status not in (constants.JOB_STATUS_QUEUED,
708                         constants.JOB_STATUS_RUNNING,
709                         constants.JOB_STATUS_WAITLOCK):
710         # Don't even try to wait if the job is no longer running, there will be
711         # no changes.
712         break
713
714       if (prev_job_info != job_info or
715           (log_entries and prev_log_serial != log_entries[0][0])):
716         break
717
718       logging.debug("Waiting again")
719
720       # Release the queue lock while waiting
721       job.change.wait(delta_time)
722
723     logging.debug("Job %s changed", job_id)
724
725     return (job_info, log_entries)
726
727   @utils.LockedMethod
728   @_RequireOpenQueue
729   def CancelJob(self, job_id):
730     """Cancels a job.
731
732     @type job_id: string
733     @param job_id: Job ID of job to be cancelled.
734
735     """
736     logging.debug("Cancelling job %s", job_id)
737
738     job = self._LoadJobUnlocked(job_id)
739     if not job:
740       logging.debug("Job %s not found", job_id)
741       return
742
743     if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
744       logging.debug("Job %s is no longer in the queue", job.id)
745       return
746
747     try:
748       for op in job.ops:
749         op.status = constants.OP_STATUS_ERROR
750         op.result = "Job cancelled by request"
751     finally:
752       self.UpdateJobUnlocked(job)
753
754   @_RequireOpenQueue
755   def _ArchiveJobUnlocked(self, job_id):
756     """Archives a job.
757
758     @type job_id: string
759     @param job_id: Job ID of job to be archived.
760
761     """
762     logging.info("Archiving job %s", job_id)
763
764     job = self._LoadJobUnlocked(job_id)
765     if not job:
766       logging.debug("Job %s not found", job_id)
767       return
768
769     if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
770                                 constants.JOB_STATUS_SUCCESS,
771                                 constants.JOB_STATUS_ERROR):
772       logging.debug("Job %s is not yet done", job.id)
773       return
774
775     old = self._GetJobPath(job.id)
776     new = self._GetArchivedJobPath(job.id)
777
778     self._RenameFileUnlocked(old, new)
779
780     logging.debug("Successfully archived job %s", job.id)
781
782   @utils.LockedMethod
783   @_RequireOpenQueue
784   def ArchiveJob(self, job_id):
785     """Archives a job.
786
787     @type job_id: string
788     @param job_id: Job ID of job to be archived.
789
790     """
791     return self._ArchiveJobUnlocked(job_id)
792
793   @utils.LockedMethod
794   @_RequireOpenQueue
795   def AutoArchiveJobs(self, age):
796     """Archives all jobs based on age.
797
798     The method will archive all jobs which are older than the age
799     parameter. For jobs that don't have an end timestamp, the start
800     timestamp will be considered. The special '-1' age will cause
801     archival of all jobs (that are not running or queued).
802
803     @type age: int
804     @param age: the minimum age in seconds
805
806     """
807     logging.info("Archiving jobs with age more than %s seconds", age)
808
809     now = time.time()
810     for jid in self._GetJobIDsUnlocked(archived=False):
811       job = self._LoadJobUnlocked(jid)
812       if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
813                                   constants.OP_STATUS_ERROR,
814                                   constants.OP_STATUS_CANCELED):
815         continue
816       if job.end_timestamp is None:
817         if job.start_timestamp is None:
818           job_age = job.received_timestamp
819         else:
820           job_age = job.start_timestamp
821       else:
822         job_age = job.end_timestamp
823
824       if age == -1 or now - job_age[0] > age:
825         self._ArchiveJobUnlocked(jid)
826
827   def _GetJobInfoUnlocked(self, job, fields):
828     row = []
829     for fname in fields:
830       if fname == "id":
831         row.append(job.id)
832       elif fname == "status":
833         row.append(job.CalcStatus())
834       elif fname == "ops":
835         row.append([op.input.__getstate__() for op in job.ops])
836       elif fname == "opresult":
837         row.append([op.result for op in job.ops])
838       elif fname == "opstatus":
839         row.append([op.status for op in job.ops])
840       elif fname == "oplog":
841         row.append([op.log for op in job.ops])
842       elif fname == "opstart":
843         row.append([op.start_timestamp for op in job.ops])
844       elif fname == "opend":
845         row.append([op.end_timestamp for op in job.ops])
846       elif fname == "received_ts":
847         row.append(job.received_timestamp)
848       elif fname == "start_ts":
849         row.append(job.start_timestamp)
850       elif fname == "end_ts":
851         row.append(job.end_timestamp)
852       elif fname == "summary":
853         row.append([op.input.Summary() for op in job.ops])
854       else:
855         raise errors.OpExecError("Invalid job query field '%s'" % fname)
856     return row
857
858   @utils.LockedMethod
859   @_RequireOpenQueue
860   def QueryJobs(self, job_ids, fields):
861     """Returns a list of jobs in queue.
862
863     Args:
864     - job_ids: Sequence of job identifiers or None for all
865     - fields: Names of fields to return
866
867     """
868     jobs = []
869
870     for job in self._GetJobsUnlocked(job_ids):
871       if job is None:
872         jobs.append(None)
873       else:
874         jobs.append(self._GetJobInfoUnlocked(job, fields))
875
876     return jobs
877
878   @utils.LockedMethod
879   @_RequireOpenQueue
880   def Shutdown(self):
881     """Stops the job queue.
882
883     """
884     self._wpool.TerminateWorkers()
885
886     self._queue_lock.Close()
887     self._queue_lock = None