jqueue: Add “writable” flag to memory objects
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import re
35 import time
36 import weakref
37 import threading
38
39 try:
40   # pylint: disable-msg=E0611
41   from pyinotify import pyinotify
42 except ImportError:
43   import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
59 from ganeti import ht
60
61
62 JOBQUEUE_THREADS = 25
63 JOBS_PER_ARCHIVE_DIRECTORY = 10000
64
65 # member lock names to be passed to @ssynchronized decorator
66 _LOCK = "_lock"
67 _QUEUE = "_queue"
68
69
70 class CancelJob(Exception):
71   """Special exception to cancel a job.
72
73   """
74
75
76 def TimeStampNow():
77   """Returns the current timestamp.
78
79   @rtype: tuple
80   @return: the current time in the (seconds, microseconds) format
81
82   """
83   return utils.SplitTime(time.time())
84
85
86 class _QueuedOpCode(object):
87   """Encapsulates an opcode object.
88
89   @ivar log: holds the execution log and consists of tuples
90   of the form C{(log_serial, timestamp, level, message)}
91   @ivar input: the OpCode we encapsulate
92   @ivar status: the current status
93   @ivar result: the result of the LU execution
94   @ivar start_timestamp: timestamp for the start of the execution
95   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
96   @ivar stop_timestamp: timestamp for the end of the execution
97
98   """
99   __slots__ = ["input", "status", "result", "log", "priority",
100                "start_timestamp", "exec_timestamp", "end_timestamp",
101                "__weakref__"]
102
103   def __init__(self, op):
104     """Constructor for the _QuededOpCode.
105
106     @type op: L{opcodes.OpCode}
107     @param op: the opcode we encapsulate
108
109     """
110     self.input = op
111     self.status = constants.OP_STATUS_QUEUED
112     self.result = None
113     self.log = []
114     self.start_timestamp = None
115     self.exec_timestamp = None
116     self.end_timestamp = None
117
118     # Get initial priority (it might change during the lifetime of this opcode)
119     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
120
121   @classmethod
122   def Restore(cls, state):
123     """Restore the _QueuedOpCode from the serialized form.
124
125     @type state: dict
126     @param state: the serialized state
127     @rtype: _QueuedOpCode
128     @return: a new _QueuedOpCode instance
129
130     """
131     obj = _QueuedOpCode.__new__(cls)
132     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
133     obj.status = state["status"]
134     obj.result = state["result"]
135     obj.log = state["log"]
136     obj.start_timestamp = state.get("start_timestamp", None)
137     obj.exec_timestamp = state.get("exec_timestamp", None)
138     obj.end_timestamp = state.get("end_timestamp", None)
139     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
140     return obj
141
142   def Serialize(self):
143     """Serializes this _QueuedOpCode.
144
145     @rtype: dict
146     @return: the dictionary holding the serialized state
147
148     """
149     return {
150       "input": self.input.__getstate__(),
151       "status": self.status,
152       "result": self.result,
153       "log": self.log,
154       "start_timestamp": self.start_timestamp,
155       "exec_timestamp": self.exec_timestamp,
156       "end_timestamp": self.end_timestamp,
157       "priority": self.priority,
158       }
159
160
161 class _QueuedJob(object):
162   """In-memory job representation.
163
164   This is what we use to track the user-submitted jobs. Locking must
165   be taken care of by users of this class.
166
167   @type queue: L{JobQueue}
168   @ivar queue: the parent queue
169   @ivar id: the job ID
170   @type ops: list
171   @ivar ops: the list of _QueuedOpCode that constitute the job
172   @type log_serial: int
173   @ivar log_serial: holds the index for the next log entry
174   @ivar received_timestamp: the timestamp for when the job was received
175   @ivar start_timestmap: the timestamp for start of execution
176   @ivar end_timestamp: the timestamp for end of execution
177   @ivar writable: Whether the job is allowed to be modified
178
179   """
180   # pylint: disable-msg=W0212
181   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
182                "received_timestamp", "start_timestamp", "end_timestamp",
183                "__weakref__", "processor_lock", "writable"]
184
185   def __init__(self, queue, job_id, ops, writable):
186     """Constructor for the _QueuedJob.
187
188     @type queue: L{JobQueue}
189     @param queue: our parent queue
190     @type job_id: job_id
191     @param job_id: our job id
192     @type ops: list
193     @param ops: the list of opcodes we hold, which will be encapsulated
194         in _QueuedOpCodes
195     @type writable: bool
196     @param writable: Whether job can be modified
197
198     """
199     if not ops:
200       raise errors.GenericError("A job needs at least one opcode")
201
202     self.queue = queue
203     self.id = job_id
204     self.ops = [_QueuedOpCode(op) for op in ops]
205     self.log_serial = 0
206     self.received_timestamp = TimeStampNow()
207     self.start_timestamp = None
208     self.end_timestamp = None
209
210     self._InitInMemory(self, writable)
211
212   @staticmethod
213   def _InitInMemory(obj, writable):
214     """Initializes in-memory variables.
215
216     """
217     obj.writable = writable
218     obj.ops_iter = None
219     obj.cur_opctx = None
220     obj.processor_lock = threading.Lock()
221
222   def __repr__(self):
223     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
224               "id=%s" % self.id,
225               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
226
227     return "<%s at %#x>" % (" ".join(status), id(self))
228
229   @classmethod
230   def Restore(cls, queue, state, writable):
231     """Restore a _QueuedJob from serialized state:
232
233     @type queue: L{JobQueue}
234     @param queue: to which queue the restored job belongs
235     @type state: dict
236     @param state: the serialized state
237     @type writable: bool
238     @param writable: Whether job can be modified
239     @rtype: _JobQueue
240     @return: the restored _JobQueue instance
241
242     """
243     obj = _QueuedJob.__new__(cls)
244     obj.queue = queue
245     obj.id = state["id"]
246     obj.received_timestamp = state.get("received_timestamp", None)
247     obj.start_timestamp = state.get("start_timestamp", None)
248     obj.end_timestamp = state.get("end_timestamp", None)
249
250     obj.ops = []
251     obj.log_serial = 0
252     for op_state in state["ops"]:
253       op = _QueuedOpCode.Restore(op_state)
254       for log_entry in op.log:
255         obj.log_serial = max(obj.log_serial, log_entry[0])
256       obj.ops.append(op)
257
258     cls._InitInMemory(obj, writable)
259
260     return obj
261
262   def Serialize(self):
263     """Serialize the _JobQueue instance.
264
265     @rtype: dict
266     @return: the serialized state
267
268     """
269     return {
270       "id": self.id,
271       "ops": [op.Serialize() for op in self.ops],
272       "start_timestamp": self.start_timestamp,
273       "end_timestamp": self.end_timestamp,
274       "received_timestamp": self.received_timestamp,
275       }
276
277   def CalcStatus(self):
278     """Compute the status of this job.
279
280     This function iterates over all the _QueuedOpCodes in the job and
281     based on their status, computes the job status.
282
283     The algorithm is:
284       - if we find a cancelled, or finished with error, the job
285         status will be the same
286       - otherwise, the last opcode with the status one of:
287           - waitlock
288           - canceling
289           - running
290
291         will determine the job status
292
293       - otherwise, it means either all opcodes are queued, or success,
294         and the job status will be the same
295
296     @return: the job status
297
298     """
299     status = constants.JOB_STATUS_QUEUED
300
301     all_success = True
302     for op in self.ops:
303       if op.status == constants.OP_STATUS_SUCCESS:
304         continue
305
306       all_success = False
307
308       if op.status == constants.OP_STATUS_QUEUED:
309         pass
310       elif op.status == constants.OP_STATUS_WAITLOCK:
311         status = constants.JOB_STATUS_WAITLOCK
312       elif op.status == constants.OP_STATUS_RUNNING:
313         status = constants.JOB_STATUS_RUNNING
314       elif op.status == constants.OP_STATUS_CANCELING:
315         status = constants.JOB_STATUS_CANCELING
316         break
317       elif op.status == constants.OP_STATUS_ERROR:
318         status = constants.JOB_STATUS_ERROR
319         # The whole job fails if one opcode failed
320         break
321       elif op.status == constants.OP_STATUS_CANCELED:
322         status = constants.OP_STATUS_CANCELED
323         break
324
325     if all_success:
326       status = constants.JOB_STATUS_SUCCESS
327
328     return status
329
330   def CalcPriority(self):
331     """Gets the current priority for this job.
332
333     Only unfinished opcodes are considered. When all are done, the default
334     priority is used.
335
336     @rtype: int
337
338     """
339     priorities = [op.priority for op in self.ops
340                   if op.status not in constants.OPS_FINALIZED]
341
342     if not priorities:
343       # All opcodes are done, assume default priority
344       return constants.OP_PRIO_DEFAULT
345
346     return min(priorities)
347
348   def GetLogEntries(self, newer_than):
349     """Selectively returns the log entries.
350
351     @type newer_than: None or int
352     @param newer_than: if this is None, return all log entries,
353         otherwise return only the log entries with serial higher
354         than this value
355     @rtype: list
356     @return: the list of the log entries selected
357
358     """
359     if newer_than is None:
360       serial = -1
361     else:
362       serial = newer_than
363
364     entries = []
365     for op in self.ops:
366       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
367
368     return entries
369
370   def GetInfo(self, fields):
371     """Returns information about a job.
372
373     @type fields: list
374     @param fields: names of fields to return
375     @rtype: list
376     @return: list with one element for each field
377     @raise errors.OpExecError: when an invalid field
378         has been passed
379
380     """
381     row = []
382     for fname in fields:
383       if fname == "id":
384         row.append(self.id)
385       elif fname == "status":
386         row.append(self.CalcStatus())
387       elif fname == "priority":
388         row.append(self.CalcPriority())
389       elif fname == "ops":
390         row.append([op.input.__getstate__() for op in self.ops])
391       elif fname == "opresult":
392         row.append([op.result for op in self.ops])
393       elif fname == "opstatus":
394         row.append([op.status for op in self.ops])
395       elif fname == "oplog":
396         row.append([op.log for op in self.ops])
397       elif fname == "opstart":
398         row.append([op.start_timestamp for op in self.ops])
399       elif fname == "opexec":
400         row.append([op.exec_timestamp for op in self.ops])
401       elif fname == "opend":
402         row.append([op.end_timestamp for op in self.ops])
403       elif fname == "oppriority":
404         row.append([op.priority for op in self.ops])
405       elif fname == "received_ts":
406         row.append(self.received_timestamp)
407       elif fname == "start_ts":
408         row.append(self.start_timestamp)
409       elif fname == "end_ts":
410         row.append(self.end_timestamp)
411       elif fname == "summary":
412         row.append([op.input.Summary() for op in self.ops])
413       else:
414         raise errors.OpExecError("Invalid self query field '%s'" % fname)
415     return row
416
417   def MarkUnfinishedOps(self, status, result):
418     """Mark unfinished opcodes with a given status and result.
419
420     This is an utility function for marking all running or waiting to
421     be run opcodes with a given status. Opcodes which are already
422     finalised are not changed.
423
424     @param status: a given opcode status
425     @param result: the opcode result
426
427     """
428     not_marked = True
429     for op in self.ops:
430       if op.status in constants.OPS_FINALIZED:
431         assert not_marked, "Finalized opcodes found after non-finalized ones"
432         continue
433       op.status = status
434       op.result = result
435       not_marked = False
436
437   def Finalize(self):
438     """Marks the job as finalized.
439
440     """
441     self.end_timestamp = TimeStampNow()
442
443   def Cancel(self):
444     """Marks job as canceled/-ing if possible.
445
446     @rtype: tuple; (bool, string)
447     @return: Boolean describing whether job was successfully canceled or marked
448       as canceling and a text message
449
450     """
451     status = self.CalcStatus()
452
453     if status == constants.JOB_STATUS_QUEUED:
454       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
455                              "Job canceled by request")
456       self.Finalize()
457       return (True, "Job %s canceled" % self.id)
458
459     elif status == constants.JOB_STATUS_WAITLOCK:
460       # The worker will notice the new status and cancel the job
461       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
462       return (True, "Job %s will be canceled" % self.id)
463
464     else:
465       logging.debug("Job %s is no longer waiting in the queue", self.id)
466       return (False, "Job %s is no longer waiting in the queue" % self.id)
467
468
469 class _OpExecCallbacks(mcpu.OpExecCbBase):
470   def __init__(self, queue, job, op):
471     """Initializes this class.
472
473     @type queue: L{JobQueue}
474     @param queue: Job queue
475     @type job: L{_QueuedJob}
476     @param job: Job object
477     @type op: L{_QueuedOpCode}
478     @param op: OpCode
479
480     """
481     assert queue, "Queue is missing"
482     assert job, "Job is missing"
483     assert op, "Opcode is missing"
484
485     self._queue = queue
486     self._job = job
487     self._op = op
488
489   def _CheckCancel(self):
490     """Raises an exception to cancel the job if asked to.
491
492     """
493     # Cancel here if we were asked to
494     if self._op.status == constants.OP_STATUS_CANCELING:
495       logging.debug("Canceling opcode")
496       raise CancelJob()
497
498   @locking.ssynchronized(_QUEUE, shared=1)
499   def NotifyStart(self):
500     """Mark the opcode as running, not lock-waiting.
501
502     This is called from the mcpu code as a notifier function, when the LU is
503     finally about to start the Exec() method. Of course, to have end-user
504     visible results, the opcode must be initially (before calling into
505     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
506
507     """
508     assert self._op in self._job.ops
509     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
510                                constants.OP_STATUS_CANCELING)
511
512     # Cancel here if we were asked to
513     self._CheckCancel()
514
515     logging.debug("Opcode is now running")
516
517     self._op.status = constants.OP_STATUS_RUNNING
518     self._op.exec_timestamp = TimeStampNow()
519
520     # And finally replicate the job status
521     self._queue.UpdateJobUnlocked(self._job)
522
523   @locking.ssynchronized(_QUEUE, shared=1)
524   def _AppendFeedback(self, timestamp, log_type, log_msg):
525     """Internal feedback append function, with locks
526
527     """
528     self._job.log_serial += 1
529     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
530     self._queue.UpdateJobUnlocked(self._job, replicate=False)
531
532   def Feedback(self, *args):
533     """Append a log entry.
534
535     """
536     assert len(args) < 3
537
538     if len(args) == 1:
539       log_type = constants.ELOG_MESSAGE
540       log_msg = args[0]
541     else:
542       (log_type, log_msg) = args
543
544     # The time is split to make serialization easier and not lose
545     # precision.
546     timestamp = utils.SplitTime(time.time())
547     self._AppendFeedback(timestamp, log_type, log_msg)
548
549   def CheckCancel(self):
550     """Check whether job has been cancelled.
551
552     """
553     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
554                                constants.OP_STATUS_CANCELING)
555
556     # Cancel here if we were asked to
557     self._CheckCancel()
558
559   def SubmitManyJobs(self, jobs):
560     """Submits jobs for processing.
561
562     See L{JobQueue.SubmitManyJobs}.
563
564     """
565     # Locking is done in job queue
566     return self._queue.SubmitManyJobs(jobs)
567
568
569 class _JobChangesChecker(object):
570   def __init__(self, fields, prev_job_info, prev_log_serial):
571     """Initializes this class.
572
573     @type fields: list of strings
574     @param fields: Fields requested by LUXI client
575     @type prev_job_info: string
576     @param prev_job_info: previous job info, as passed by the LUXI client
577     @type prev_log_serial: string
578     @param prev_log_serial: previous job serial, as passed by the LUXI client
579
580     """
581     self._fields = fields
582     self._prev_job_info = prev_job_info
583     self._prev_log_serial = prev_log_serial
584
585   def __call__(self, job):
586     """Checks whether job has changed.
587
588     @type job: L{_QueuedJob}
589     @param job: Job object
590
591     """
592     assert not job.writable, "Expected read-only job"
593
594     status = job.CalcStatus()
595     job_info = job.GetInfo(self._fields)
596     log_entries = job.GetLogEntries(self._prev_log_serial)
597
598     # Serializing and deserializing data can cause type changes (e.g. from
599     # tuple to list) or precision loss. We're doing it here so that we get
600     # the same modifications as the data received from the client. Without
601     # this, the comparison afterwards might fail without the data being
602     # significantly different.
603     # TODO: we just deserialized from disk, investigate how to make sure that
604     # the job info and log entries are compatible to avoid this further step.
605     # TODO: Doing something like in testutils.py:UnifyValueType might be more
606     # efficient, though floats will be tricky
607     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
608     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
609
610     # Don't even try to wait if the job is no longer running, there will be
611     # no changes.
612     if (status not in (constants.JOB_STATUS_QUEUED,
613                        constants.JOB_STATUS_RUNNING,
614                        constants.JOB_STATUS_WAITLOCK) or
615         job_info != self._prev_job_info or
616         (log_entries and self._prev_log_serial != log_entries[0][0])):
617       logging.debug("Job %s changed", job.id)
618       return (job_info, log_entries)
619
620     return None
621
622
623 class _JobFileChangesWaiter(object):
624   def __init__(self, filename):
625     """Initializes this class.
626
627     @type filename: string
628     @param filename: Path to job file
629     @raises errors.InotifyError: if the notifier cannot be setup
630
631     """
632     self._wm = pyinotify.WatchManager()
633     self._inotify_handler = \
634       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
635     self._notifier = \
636       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
637     try:
638       self._inotify_handler.enable()
639     except Exception:
640       # pyinotify doesn't close file descriptors automatically
641       self._notifier.stop()
642       raise
643
644   def _OnInotify(self, notifier_enabled):
645     """Callback for inotify.
646
647     """
648     if not notifier_enabled:
649       self._inotify_handler.enable()
650
651   def Wait(self, timeout):
652     """Waits for the job file to change.
653
654     @type timeout: float
655     @param timeout: Timeout in seconds
656     @return: Whether there have been events
657
658     """
659     assert timeout >= 0
660     have_events = self._notifier.check_events(timeout * 1000)
661     if have_events:
662       self._notifier.read_events()
663     self._notifier.process_events()
664     return have_events
665
666   def Close(self):
667     """Closes underlying notifier and its file descriptor.
668
669     """
670     self._notifier.stop()
671
672
673 class _JobChangesWaiter(object):
674   def __init__(self, filename):
675     """Initializes this class.
676
677     @type filename: string
678     @param filename: Path to job file
679
680     """
681     self._filewaiter = None
682     self._filename = filename
683
684   def Wait(self, timeout):
685     """Waits for a job to change.
686
687     @type timeout: float
688     @param timeout: Timeout in seconds
689     @return: Whether there have been events
690
691     """
692     if self._filewaiter:
693       return self._filewaiter.Wait(timeout)
694
695     # Lazy setup: Avoid inotify setup cost when job file has already changed.
696     # If this point is reached, return immediately and let caller check the job
697     # file again in case there were changes since the last check. This avoids a
698     # race condition.
699     self._filewaiter = _JobFileChangesWaiter(self._filename)
700
701     return True
702
703   def Close(self):
704     """Closes underlying waiter.
705
706     """
707     if self._filewaiter:
708       self._filewaiter.Close()
709
710
711 class _WaitForJobChangesHelper(object):
712   """Helper class using inotify to wait for changes in a job file.
713
714   This class takes a previous job status and serial, and alerts the client when
715   the current job status has changed.
716
717   """
718   @staticmethod
719   def _CheckForChanges(job_load_fn, check_fn):
720     job = job_load_fn()
721     if not job:
722       raise errors.JobLost()
723
724     result = check_fn(job)
725     if result is None:
726       raise utils.RetryAgain()
727
728     return result
729
730   def __call__(self, filename, job_load_fn,
731                fields, prev_job_info, prev_log_serial, timeout):
732     """Waits for changes on a job.
733
734     @type filename: string
735     @param filename: File on which to wait for changes
736     @type job_load_fn: callable
737     @param job_load_fn: Function to load job
738     @type fields: list of strings
739     @param fields: Which fields to check for changes
740     @type prev_job_info: list or None
741     @param prev_job_info: Last job information returned
742     @type prev_log_serial: int
743     @param prev_log_serial: Last job message serial number
744     @type timeout: float
745     @param timeout: maximum time to wait in seconds
746
747     """
748     try:
749       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
750       waiter = _JobChangesWaiter(filename)
751       try:
752         return utils.Retry(compat.partial(self._CheckForChanges,
753                                           job_load_fn, check_fn),
754                            utils.RETRY_REMAINING_TIME, timeout,
755                            wait_fn=waiter.Wait)
756       finally:
757         waiter.Close()
758     except (errors.InotifyError, errors.JobLost):
759       return None
760     except utils.RetryTimeout:
761       return constants.JOB_NOTCHANGED
762
763
764 def _EncodeOpError(err):
765   """Encodes an error which occurred while processing an opcode.
766
767   """
768   if isinstance(err, errors.GenericError):
769     to_encode = err
770   else:
771     to_encode = errors.OpExecError(str(err))
772
773   return errors.EncodeException(to_encode)
774
775
776 class _TimeoutStrategyWrapper:
777   def __init__(self, fn):
778     """Initializes this class.
779
780     """
781     self._fn = fn
782     self._next = None
783
784   def _Advance(self):
785     """Gets the next timeout if necessary.
786
787     """
788     if self._next is None:
789       self._next = self._fn()
790
791   def Peek(self):
792     """Returns the next timeout.
793
794     """
795     self._Advance()
796     return self._next
797
798   def Next(self):
799     """Returns the current timeout and advances the internal state.
800
801     """
802     self._Advance()
803     result = self._next
804     self._next = None
805     return result
806
807
808 class _OpExecContext:
809   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
810     """Initializes this class.
811
812     """
813     self.op = op
814     self.index = index
815     self.log_prefix = log_prefix
816     self.summary = op.input.Summary()
817
818     # Create local copy to modify
819     if getattr(op.input, opcodes.DEPEND_ATTR, None):
820       self.jobdeps = op.input.depends[:]
821     else:
822       self.jobdeps = None
823
824     self._timeout_strategy_factory = timeout_strategy_factory
825     self._ResetTimeoutStrategy()
826
827   def _ResetTimeoutStrategy(self):
828     """Creates a new timeout strategy.
829
830     """
831     self._timeout_strategy = \
832       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
833
834   def CheckPriorityIncrease(self):
835     """Checks whether priority can and should be increased.
836
837     Called when locks couldn't be acquired.
838
839     """
840     op = self.op
841
842     # Exhausted all retries and next round should not use blocking acquire
843     # for locks?
844     if (self._timeout_strategy.Peek() is None and
845         op.priority > constants.OP_PRIO_HIGHEST):
846       logging.debug("Increasing priority")
847       op.priority -= 1
848       self._ResetTimeoutStrategy()
849       return True
850
851     return False
852
853   def GetNextLockTimeout(self):
854     """Returns the next lock acquire timeout.
855
856     """
857     return self._timeout_strategy.Next()
858
859
860 class _JobProcessor(object):
861   def __init__(self, queue, opexec_fn, job,
862                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
863     """Initializes this class.
864
865     """
866     self.queue = queue
867     self.opexec_fn = opexec_fn
868     self.job = job
869     self._timeout_strategy_factory = _timeout_strategy_factory
870
871   @staticmethod
872   def _FindNextOpcode(job, timeout_strategy_factory):
873     """Locates the next opcode to run.
874
875     @type job: L{_QueuedJob}
876     @param job: Job object
877     @param timeout_strategy_factory: Callable to create new timeout strategy
878
879     """
880     # Create some sort of a cache to speed up locating next opcode for future
881     # lookups
882     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
883     # pending and one for processed ops.
884     if job.ops_iter is None:
885       job.ops_iter = enumerate(job.ops)
886
887     # Find next opcode to run
888     while True:
889       try:
890         (idx, op) = job.ops_iter.next()
891       except StopIteration:
892         raise errors.ProgrammerError("Called for a finished job")
893
894       if op.status == constants.OP_STATUS_RUNNING:
895         # Found an opcode already marked as running
896         raise errors.ProgrammerError("Called for job marked as running")
897
898       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
899                              timeout_strategy_factory)
900
901       if op.status not in constants.OPS_FINALIZED:
902         return opctx
903
904       # This is a job that was partially completed before master daemon
905       # shutdown, so it can be expected that some opcodes are already
906       # completed successfully (if any did error out, then the whole job
907       # should have been aborted and not resubmitted for processing).
908       logging.info("%s: opcode %s already processed, skipping",
909                    opctx.log_prefix, opctx.summary)
910
911   @staticmethod
912   def _MarkWaitlock(job, op):
913     """Marks an opcode as waiting for locks.
914
915     The job's start timestamp is also set if necessary.
916
917     @type job: L{_QueuedJob}
918     @param job: Job object
919     @type op: L{_QueuedOpCode}
920     @param op: Opcode object
921
922     """
923     assert op in job.ops
924     assert op.status in (constants.OP_STATUS_QUEUED,
925                          constants.OP_STATUS_WAITLOCK)
926
927     update = False
928
929     op.result = None
930
931     if op.status == constants.OP_STATUS_QUEUED:
932       op.status = constants.OP_STATUS_WAITLOCK
933       update = True
934
935     if op.start_timestamp is None:
936       op.start_timestamp = TimeStampNow()
937       update = True
938
939     if job.start_timestamp is None:
940       job.start_timestamp = op.start_timestamp
941       update = True
942
943     assert op.status == constants.OP_STATUS_WAITLOCK
944
945     return update
946
947   @staticmethod
948   def _CheckDependencies(queue, job, opctx):
949     """Checks if an opcode has dependencies and if so, processes them.
950
951     @type queue: L{JobQueue}
952     @param queue: Queue object
953     @type job: L{_QueuedJob}
954     @param job: Job object
955     @type opctx: L{_OpExecContext}
956     @param opctx: Opcode execution context
957     @rtype: bool
958     @return: Whether opcode will be re-scheduled by dependency tracker
959
960     """
961     op = opctx.op
962
963     result = False
964
965     while opctx.jobdeps:
966       (dep_job_id, dep_status) = opctx.jobdeps[0]
967
968       (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
969                                                           dep_status)
970       assert ht.TNonEmptyString(depmsg), "No dependency message"
971
972       logging.info("%s: %s", opctx.log_prefix, depmsg)
973
974       if depresult == _JobDependencyManager.CONTINUE:
975         # Remove dependency and continue
976         opctx.jobdeps.pop(0)
977
978       elif depresult == _JobDependencyManager.WAIT:
979         # Need to wait for notification, dependency tracker will re-add job
980         # to workerpool
981         result = True
982         break
983
984       elif depresult == _JobDependencyManager.CANCEL:
985         # Job was cancelled, cancel this job as well
986         job.Cancel()
987         assert op.status == constants.OP_STATUS_CANCELING
988         break
989
990       elif depresult in (_JobDependencyManager.WRONGSTATUS,
991                          _JobDependencyManager.ERROR):
992         # Job failed or there was an error, this job must fail
993         op.status = constants.OP_STATUS_ERROR
994         op.result = _EncodeOpError(errors.OpExecError(depmsg))
995         break
996
997       else:
998         raise errors.ProgrammerError("Unknown dependency result '%s'" %
999                                      depresult)
1000
1001     return result
1002
1003   def _ExecOpCodeUnlocked(self, opctx):
1004     """Processes one opcode and returns the result.
1005
1006     """
1007     op = opctx.op
1008
1009     assert op.status == constants.OP_STATUS_WAITLOCK
1010
1011     timeout = opctx.GetNextLockTimeout()
1012
1013     try:
1014       # Make sure not to hold queue lock while calling ExecOpCode
1015       result = self.opexec_fn(op.input,
1016                               _OpExecCallbacks(self.queue, self.job, op),
1017                               timeout=timeout, priority=op.priority)
1018     except mcpu.LockAcquireTimeout:
1019       assert timeout is not None, "Received timeout for blocking acquire"
1020       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1021
1022       assert op.status in (constants.OP_STATUS_WAITLOCK,
1023                            constants.OP_STATUS_CANCELING)
1024
1025       # Was job cancelled while we were waiting for the lock?
1026       if op.status == constants.OP_STATUS_CANCELING:
1027         return (constants.OP_STATUS_CANCELING, None)
1028
1029       # Stay in waitlock while trying to re-acquire lock
1030       return (constants.OP_STATUS_WAITLOCK, None)
1031     except CancelJob:
1032       logging.exception("%s: Canceling job", opctx.log_prefix)
1033       assert op.status == constants.OP_STATUS_CANCELING
1034       return (constants.OP_STATUS_CANCELING, None)
1035     except Exception, err: # pylint: disable-msg=W0703
1036       logging.exception("%s: Caught exception in %s",
1037                         opctx.log_prefix, opctx.summary)
1038       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1039     else:
1040       logging.debug("%s: %s successful",
1041                     opctx.log_prefix, opctx.summary)
1042       return (constants.OP_STATUS_SUCCESS, result)
1043
1044   def __call__(self, _nextop_fn=None):
1045     """Continues execution of a job.
1046
1047     @param _nextop_fn: Callback function for tests
1048     @rtype: bool
1049     @return: True if job is finished, False if processor needs to be called
1050              again
1051
1052     """
1053     queue = self.queue
1054     job = self.job
1055
1056     logging.debug("Processing job %s", job.id)
1057
1058     queue.acquire(shared=1)
1059     try:
1060       opcount = len(job.ops)
1061
1062       assert job.writable, "Expected writable job"
1063
1064       # Don't do anything for finalized jobs
1065       if job.CalcStatus() in constants.JOBS_FINALIZED:
1066         return True
1067
1068       # Is a previous opcode still pending?
1069       if job.cur_opctx:
1070         opctx = job.cur_opctx
1071         job.cur_opctx = None
1072       else:
1073         if __debug__ and _nextop_fn:
1074           _nextop_fn()
1075         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1076
1077       op = opctx.op
1078
1079       # Consistency check
1080       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1081                                      constants.OP_STATUS_CANCELING)
1082                         for i in job.ops[opctx.index + 1:])
1083
1084       assert op.status in (constants.OP_STATUS_QUEUED,
1085                            constants.OP_STATUS_WAITLOCK,
1086                            constants.OP_STATUS_CANCELING)
1087
1088       assert (op.priority <= constants.OP_PRIO_LOWEST and
1089               op.priority >= constants.OP_PRIO_HIGHEST)
1090
1091       waitjob = None
1092
1093       if op.status != constants.OP_STATUS_CANCELING:
1094         assert op.status in (constants.OP_STATUS_QUEUED,
1095                              constants.OP_STATUS_WAITLOCK)
1096
1097         # Prepare to start opcode
1098         if self._MarkWaitlock(job, op):
1099           # Write to disk
1100           queue.UpdateJobUnlocked(job)
1101
1102         assert op.status == constants.OP_STATUS_WAITLOCK
1103         assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1104         assert job.start_timestamp and op.start_timestamp
1105         assert waitjob is None
1106
1107         # Check if waiting for a job is necessary
1108         waitjob = self._CheckDependencies(queue, job, opctx)
1109
1110         assert op.status in (constants.OP_STATUS_WAITLOCK,
1111                              constants.OP_STATUS_CANCELING,
1112                              constants.OP_STATUS_ERROR)
1113
1114         if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1115                                          constants.OP_STATUS_ERROR)):
1116           logging.info("%s: opcode %s waiting for locks",
1117                        opctx.log_prefix, opctx.summary)
1118
1119           assert not opctx.jobdeps, "Not all dependencies were removed"
1120
1121           queue.release()
1122           try:
1123             (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1124           finally:
1125             queue.acquire(shared=1)
1126
1127           op.status = op_status
1128           op.result = op_result
1129
1130           assert not waitjob
1131
1132         if op.status == constants.OP_STATUS_WAITLOCK:
1133           # Couldn't get locks in time
1134           assert not op.end_timestamp
1135         else:
1136           # Finalize opcode
1137           op.end_timestamp = TimeStampNow()
1138
1139           if op.status == constants.OP_STATUS_CANCELING:
1140             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1141                                   for i in job.ops[opctx.index:])
1142           else:
1143             assert op.status in constants.OPS_FINALIZED
1144
1145       if op.status == constants.OP_STATUS_WAITLOCK or waitjob:
1146         finalize = False
1147
1148         if not waitjob and opctx.CheckPriorityIncrease():
1149           # Priority was changed, need to update on-disk file
1150           queue.UpdateJobUnlocked(job)
1151
1152         # Keep around for another round
1153         job.cur_opctx = opctx
1154
1155         assert (op.priority <= constants.OP_PRIO_LOWEST and
1156                 op.priority >= constants.OP_PRIO_HIGHEST)
1157
1158         # In no case must the status be finalized here
1159         assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1160
1161       else:
1162         # Ensure all opcodes so far have been successful
1163         assert (opctx.index == 0 or
1164                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1165                            for i in job.ops[:opctx.index]))
1166
1167         # Reset context
1168         job.cur_opctx = None
1169
1170         if op.status == constants.OP_STATUS_SUCCESS:
1171           finalize = False
1172
1173         elif op.status == constants.OP_STATUS_ERROR:
1174           # Ensure failed opcode has an exception as its result
1175           assert errors.GetEncodedError(job.ops[opctx.index].result)
1176
1177           to_encode = errors.OpExecError("Preceding opcode failed")
1178           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1179                                 _EncodeOpError(to_encode))
1180           finalize = True
1181
1182           # Consistency check
1183           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1184                             errors.GetEncodedError(i.result)
1185                             for i in job.ops[opctx.index:])
1186
1187         elif op.status == constants.OP_STATUS_CANCELING:
1188           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1189                                 "Job canceled by request")
1190           finalize = True
1191
1192         else:
1193           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1194
1195         if opctx.index == (opcount - 1):
1196           # Finalize on last opcode
1197           finalize = True
1198
1199         if finalize:
1200           # All opcodes have been run, finalize job
1201           job.Finalize()
1202
1203         # Write to disk. If the job status is final, this is the final write
1204         # allowed. Once the file has been written, it can be archived anytime.
1205         queue.UpdateJobUnlocked(job)
1206
1207         assert not waitjob
1208
1209         if finalize:
1210           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1211           # TODO: Check locking
1212           queue.depmgr.NotifyWaiters(job.id)
1213           return True
1214
1215       assert not waitjob or queue.depmgr.JobWaiting(job)
1216
1217       return bool(waitjob)
1218     finally:
1219       assert job.writable, "Job became read-only while being processed"
1220       queue.release()
1221
1222
1223 class _JobQueueWorker(workerpool.BaseWorker):
1224   """The actual job workers.
1225
1226   """
1227   def RunTask(self, job): # pylint: disable-msg=W0221
1228     """Job executor.
1229
1230     @type job: L{_QueuedJob}
1231     @param job: the job to be processed
1232
1233     """
1234     # Ensure only one worker is active on a single job. If a job registers for
1235     # a dependency job, and the other job notifies before the first worker is
1236     # done, the job can end up in the tasklist more than once.
1237     job.processor_lock.acquire()
1238     try:
1239       return self._RunTaskInner(job)
1240     finally:
1241       job.processor_lock.release()
1242
1243   def _RunTaskInner(self, job):
1244     """Executes a job.
1245
1246     Must be called with per-job lock acquired.
1247
1248     """
1249     queue = job.queue
1250     assert queue == self.pool.queue
1251
1252     setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1253     setname_fn(None)
1254
1255     proc = mcpu.Processor(queue.context, job.id)
1256
1257     # Create wrapper for setting thread name
1258     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1259                                     proc.ExecOpCode)
1260
1261     if not _JobProcessor(queue, wrap_execop_fn, job)():
1262       # Schedule again
1263       raise workerpool.DeferTask(priority=job.CalcPriority())
1264
1265   @staticmethod
1266   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1267     """Updates the worker thread name to include a short summary of the opcode.
1268
1269     @param setname_fn: Callable setting worker thread name
1270     @param execop_fn: Callable for executing opcode (usually
1271                       L{mcpu.Processor.ExecOpCode})
1272
1273     """
1274     setname_fn(op)
1275     try:
1276       return execop_fn(op, *args, **kwargs)
1277     finally:
1278       setname_fn(None)
1279
1280   @staticmethod
1281   def _GetWorkerName(job, op):
1282     """Sets the worker thread name.
1283
1284     @type job: L{_QueuedJob}
1285     @type op: L{opcodes.OpCode}
1286
1287     """
1288     parts = ["Job%s" % job.id]
1289
1290     if op:
1291       parts.append(op.TinySummary())
1292
1293     return "/".join(parts)
1294
1295
1296 class _JobQueueWorkerPool(workerpool.WorkerPool):
1297   """Simple class implementing a job-processing workerpool.
1298
1299   """
1300   def __init__(self, queue):
1301     super(_JobQueueWorkerPool, self).__init__("Jq",
1302                                               JOBQUEUE_THREADS,
1303                                               _JobQueueWorker)
1304     self.queue = queue
1305
1306
1307 class _JobDependencyManager:
1308   """Keeps track of job dependencies.
1309
1310   """
1311   (WAIT,
1312    ERROR,
1313    CANCEL,
1314    CONTINUE,
1315    WRONGSTATUS) = range(1, 6)
1316
1317   # TODO: Export waiter information to lock monitor
1318
1319   def __init__(self, getstatus_fn, enqueue_fn):
1320     """Initializes this class.
1321
1322     """
1323     self._getstatus_fn = getstatus_fn
1324     self._enqueue_fn = enqueue_fn
1325
1326     self._waiters = {}
1327     self._lock = locking.SharedLock("JobDepMgr")
1328
1329   @locking.ssynchronized(_LOCK, shared=1)
1330   def JobWaiting(self, job):
1331     """Checks if a job is waiting.
1332
1333     """
1334     return compat.any(job in jobs
1335                       for jobs in self._waiters.values())
1336
1337   @locking.ssynchronized(_LOCK)
1338   def CheckAndRegister(self, job, dep_job_id, dep_status):
1339     """Checks if a dependency job has the requested status.
1340
1341     If the other job is not yet in a finalized status, the calling job will be
1342     notified (re-added to the workerpool) at a later point.
1343
1344     @type job: L{_QueuedJob}
1345     @param job: Job object
1346     @type dep_job_id: string
1347     @param dep_job_id: ID of dependency job
1348     @type dep_status: list
1349     @param dep_status: Required status
1350
1351     """
1352     assert ht.TString(job.id)
1353     assert ht.TString(dep_job_id)
1354     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1355
1356     if job.id == dep_job_id:
1357       return (self.ERROR, "Job can't depend on itself")
1358
1359     # Get status of dependency job
1360     try:
1361       status = self._getstatus_fn(dep_job_id)
1362     except errors.JobLost, err:
1363       return (self.ERROR, "Dependency error: %s" % err)
1364
1365     assert status in constants.JOB_STATUS_ALL
1366
1367     job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1368
1369     if status not in constants.JOBS_FINALIZED:
1370       # Register for notification and wait for job to finish
1371       job_id_waiters.add(job)
1372       return (self.WAIT,
1373               "Need to wait for job %s, wanted status '%s'" %
1374               (dep_job_id, dep_status))
1375
1376     # Remove from waiters list
1377     if job in job_id_waiters:
1378       job_id_waiters.remove(job)
1379
1380     if (status == constants.JOB_STATUS_CANCELED and
1381         constants.JOB_STATUS_CANCELED not in dep_status):
1382       return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1383
1384     elif not dep_status or status in dep_status:
1385       return (self.CONTINUE,
1386               "Dependency job %s finished with status '%s'" %
1387               (dep_job_id, status))
1388
1389     else:
1390       return (self.WRONGSTATUS,
1391               "Dependency job %s finished with status '%s',"
1392               " not one of '%s' as required" %
1393               (dep_job_id, status, utils.CommaJoin(dep_status)))
1394
1395   @locking.ssynchronized(_LOCK)
1396   def NotifyWaiters(self, job_id):
1397     """Notifies all jobs waiting for a certain job ID.
1398
1399     @type job_id: string
1400     @param job_id: Job ID
1401
1402     """
1403     assert ht.TString(job_id)
1404
1405     jobs = self._waiters.pop(job_id, None)
1406     if jobs:
1407       # Re-add jobs to workerpool
1408       logging.debug("Re-adding %s jobs which were waiting for job %s",
1409                     len(jobs), job_id)
1410       self._enqueue_fn(jobs)
1411
1412     # Remove all jobs without actual waiters
1413     for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1414                    if not waiters]:
1415       del self._waiters[job_id]
1416
1417
1418 def _RequireOpenQueue(fn):
1419   """Decorator for "public" functions.
1420
1421   This function should be used for all 'public' functions. That is,
1422   functions usually called from other classes. Note that this should
1423   be applied only to methods (not plain functions), since it expects
1424   that the decorated function is called with a first argument that has
1425   a '_queue_filelock' argument.
1426
1427   @warning: Use this decorator only after locking.ssynchronized
1428
1429   Example::
1430     @locking.ssynchronized(_LOCK)
1431     @_RequireOpenQueue
1432     def Example(self):
1433       pass
1434
1435   """
1436   def wrapper(self, *args, **kwargs):
1437     # pylint: disable-msg=W0212
1438     assert self._queue_filelock is not None, "Queue should be open"
1439     return fn(self, *args, **kwargs)
1440   return wrapper
1441
1442
1443 class JobQueue(object):
1444   """Queue used to manage the jobs.
1445
1446   @cvar _RE_JOB_FILE: regex matching the valid job file names
1447
1448   """
1449   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1450
1451   def __init__(self, context):
1452     """Constructor for JobQueue.
1453
1454     The constructor will initialize the job queue object and then
1455     start loading the current jobs from disk, either for starting them
1456     (if they were queue) or for aborting them (if they were already
1457     running).
1458
1459     @type context: GanetiContext
1460     @param context: the context object for access to the configuration
1461         data and other ganeti objects
1462
1463     """
1464     self.context = context
1465     self._memcache = weakref.WeakValueDictionary()
1466     self._my_hostname = netutils.Hostname.GetSysName()
1467
1468     # The Big JobQueue lock. If a code block or method acquires it in shared
1469     # mode safe it must guarantee concurrency with all the code acquiring it in
1470     # shared mode, including itself. In order not to acquire it at all
1471     # concurrency must be guaranteed with all code acquiring it in shared mode
1472     # and all code acquiring it exclusively.
1473     self._lock = locking.SharedLock("JobQueue")
1474
1475     self.acquire = self._lock.acquire
1476     self.release = self._lock.release
1477
1478     # Initialize the queue, and acquire the filelock.
1479     # This ensures no other process is working on the job queue.
1480     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1481
1482     # Read serial file
1483     self._last_serial = jstore.ReadSerial()
1484     assert self._last_serial is not None, ("Serial file was modified between"
1485                                            " check in jstore and here")
1486
1487     # Get initial list of nodes
1488     self._nodes = dict((n.name, n.primary_ip)
1489                        for n in self.context.cfg.GetAllNodesInfo().values()
1490                        if n.master_candidate)
1491
1492     # Remove master node
1493     self._nodes.pop(self._my_hostname, None)
1494
1495     # TODO: Check consistency across nodes
1496
1497     self._queue_size = 0
1498     self._UpdateQueueSizeUnlocked()
1499     self._drained = jstore.CheckDrainFlag()
1500
1501     # Job dependencies
1502     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1503                                         self._EnqueueJobs)
1504
1505     # Setup worker pool
1506     self._wpool = _JobQueueWorkerPool(self)
1507     try:
1508       self._InspectQueue()
1509     except:
1510       self._wpool.TerminateWorkers()
1511       raise
1512
1513   @locking.ssynchronized(_LOCK)
1514   @_RequireOpenQueue
1515   def _InspectQueue(self):
1516     """Loads the whole job queue and resumes unfinished jobs.
1517
1518     This function needs the lock here because WorkerPool.AddTask() may start a
1519     job while we're still doing our work.
1520
1521     """
1522     logging.info("Inspecting job queue")
1523
1524     restartjobs = []
1525
1526     all_job_ids = self._GetJobIDsUnlocked()
1527     jobs_count = len(all_job_ids)
1528     lastinfo = time.time()
1529     for idx, job_id in enumerate(all_job_ids):
1530       # Give an update every 1000 jobs or 10 seconds
1531       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1532           idx == (jobs_count - 1)):
1533         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1534                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1535         lastinfo = time.time()
1536
1537       job = self._LoadJobUnlocked(job_id)
1538
1539       # a failure in loading the job can cause 'None' to be returned
1540       if job is None:
1541         continue
1542
1543       status = job.CalcStatus()
1544
1545       if status == constants.JOB_STATUS_QUEUED:
1546         restartjobs.append(job)
1547
1548       elif status in (constants.JOB_STATUS_RUNNING,
1549                       constants.JOB_STATUS_WAITLOCK,
1550                       constants.JOB_STATUS_CANCELING):
1551         logging.warning("Unfinished job %s found: %s", job.id, job)
1552
1553         if status == constants.JOB_STATUS_WAITLOCK:
1554           # Restart job
1555           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1556           restartjobs.append(job)
1557         else:
1558           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1559                                 "Unclean master daemon shutdown")
1560           job.Finalize()
1561
1562         self.UpdateJobUnlocked(job)
1563
1564     if restartjobs:
1565       logging.info("Restarting %s jobs", len(restartjobs))
1566       self._EnqueueJobs(restartjobs)
1567
1568     logging.info("Job queue inspection finished")
1569
1570   @locking.ssynchronized(_LOCK)
1571   @_RequireOpenQueue
1572   def AddNode(self, node):
1573     """Register a new node with the queue.
1574
1575     @type node: L{objects.Node}
1576     @param node: the node object to be added
1577
1578     """
1579     node_name = node.name
1580     assert node_name != self._my_hostname
1581
1582     # Clean queue directory on added node
1583     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1584     msg = result.fail_msg
1585     if msg:
1586       logging.warning("Cannot cleanup queue directory on node %s: %s",
1587                       node_name, msg)
1588
1589     if not node.master_candidate:
1590       # remove if existing, ignoring errors
1591       self._nodes.pop(node_name, None)
1592       # and skip the replication of the job ids
1593       return
1594
1595     # Upload the whole queue excluding archived jobs
1596     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1597
1598     # Upload current serial file
1599     files.append(constants.JOB_QUEUE_SERIAL_FILE)
1600
1601     for file_name in files:
1602       # Read file content
1603       content = utils.ReadFile(file_name)
1604
1605       result = rpc.RpcRunner.call_jobqueue_update([node_name],
1606                                                   [node.primary_ip],
1607                                                   file_name, content)
1608       msg = result[node_name].fail_msg
1609       if msg:
1610         logging.error("Failed to upload file %s to node %s: %s",
1611                       file_name, node_name, msg)
1612
1613     self._nodes[node_name] = node.primary_ip
1614
1615   @locking.ssynchronized(_LOCK)
1616   @_RequireOpenQueue
1617   def RemoveNode(self, node_name):
1618     """Callback called when removing nodes from the cluster.
1619
1620     @type node_name: str
1621     @param node_name: the name of the node to remove
1622
1623     """
1624     self._nodes.pop(node_name, None)
1625
1626   @staticmethod
1627   def _CheckRpcResult(result, nodes, failmsg):
1628     """Verifies the status of an RPC call.
1629
1630     Since we aim to keep consistency should this node (the current
1631     master) fail, we will log errors if our rpc fail, and especially
1632     log the case when more than half of the nodes fails.
1633
1634     @param result: the data as returned from the rpc call
1635     @type nodes: list
1636     @param nodes: the list of nodes we made the call to
1637     @type failmsg: str
1638     @param failmsg: the identifier to be used for logging
1639
1640     """
1641     failed = []
1642     success = []
1643
1644     for node in nodes:
1645       msg = result[node].fail_msg
1646       if msg:
1647         failed.append(node)
1648         logging.error("RPC call %s (%s) failed on node %s: %s",
1649                       result[node].call, failmsg, node, msg)
1650       else:
1651         success.append(node)
1652
1653     # +1 for the master node
1654     if (len(success) + 1) < len(failed):
1655       # TODO: Handle failing nodes
1656       logging.error("More than half of the nodes failed")
1657
1658   def _GetNodeIp(self):
1659     """Helper for returning the node name/ip list.
1660
1661     @rtype: (list, list)
1662     @return: a tuple of two lists, the first one with the node
1663         names and the second one with the node addresses
1664
1665     """
1666     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1667     name_list = self._nodes.keys()
1668     addr_list = [self._nodes[name] for name in name_list]
1669     return name_list, addr_list
1670
1671   def _UpdateJobQueueFile(self, file_name, data, replicate):
1672     """Writes a file locally and then replicates it to all nodes.
1673
1674     This function will replace the contents of a file on the local
1675     node and then replicate it to all the other nodes we have.
1676
1677     @type file_name: str
1678     @param file_name: the path of the file to be replicated
1679     @type data: str
1680     @param data: the new contents of the file
1681     @type replicate: boolean
1682     @param replicate: whether to spread the changes to the remote nodes
1683
1684     """
1685     getents = runtime.GetEnts()
1686     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1687                     gid=getents.masterd_gid)
1688
1689     if replicate:
1690       names, addrs = self._GetNodeIp()
1691       result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1692       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1693
1694   def _RenameFilesUnlocked(self, rename):
1695     """Renames a file locally and then replicate the change.
1696
1697     This function will rename a file in the local queue directory
1698     and then replicate this rename to all the other nodes we have.
1699
1700     @type rename: list of (old, new)
1701     @param rename: List containing tuples mapping old to new names
1702
1703     """
1704     # Rename them locally
1705     for old, new in rename:
1706       utils.RenameFile(old, new, mkdir=True)
1707
1708     # ... and on all nodes
1709     names, addrs = self._GetNodeIp()
1710     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1711     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1712
1713   @staticmethod
1714   def _FormatJobID(job_id):
1715     """Convert a job ID to string format.
1716
1717     Currently this just does C{str(job_id)} after performing some
1718     checks, but if we want to change the job id format this will
1719     abstract this change.
1720
1721     @type job_id: int or long
1722     @param job_id: the numeric job id
1723     @rtype: str
1724     @return: the formatted job id
1725
1726     """
1727     if not isinstance(job_id, (int, long)):
1728       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1729     if job_id < 0:
1730       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1731
1732     return str(job_id)
1733
1734   @classmethod
1735   def _GetArchiveDirectory(cls, job_id):
1736     """Returns the archive directory for a job.
1737
1738     @type job_id: str
1739     @param job_id: Job identifier
1740     @rtype: str
1741     @return: Directory name
1742
1743     """
1744     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1745
1746   def _NewSerialsUnlocked(self, count):
1747     """Generates a new job identifier.
1748
1749     Job identifiers are unique during the lifetime of a cluster.
1750
1751     @type count: integer
1752     @param count: how many serials to return
1753     @rtype: str
1754     @return: a string representing the job identifier.
1755
1756     """
1757     assert count > 0
1758     # New number
1759     serial = self._last_serial + count
1760
1761     # Write to file
1762     self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1763                              "%s\n" % serial, True)
1764
1765     result = [self._FormatJobID(v)
1766               for v in range(self._last_serial + 1, serial + 1)]
1767
1768     # Keep it only if we were able to write the file
1769     self._last_serial = serial
1770
1771     assert len(result) == count
1772
1773     return result
1774
1775   @staticmethod
1776   def _GetJobPath(job_id):
1777     """Returns the job file for a given job id.
1778
1779     @type job_id: str
1780     @param job_id: the job identifier
1781     @rtype: str
1782     @return: the path to the job file
1783
1784     """
1785     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1786
1787   @classmethod
1788   def _GetArchivedJobPath(cls, job_id):
1789     """Returns the archived job file for a give job id.
1790
1791     @type job_id: str
1792     @param job_id: the job identifier
1793     @rtype: str
1794     @return: the path to the archived job file
1795
1796     """
1797     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1798                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1799
1800   def _GetJobIDsUnlocked(self, sort=True):
1801     """Return all known job IDs.
1802
1803     The method only looks at disk because it's a requirement that all
1804     jobs are present on disk (so in the _memcache we don't have any
1805     extra IDs).
1806
1807     @type sort: boolean
1808     @param sort: perform sorting on the returned job ids
1809     @rtype: list
1810     @return: the list of job IDs
1811
1812     """
1813     jlist = []
1814     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1815       m = self._RE_JOB_FILE.match(filename)
1816       if m:
1817         jlist.append(m.group(1))
1818     if sort:
1819       jlist = utils.NiceSort(jlist)
1820     return jlist
1821
1822   def _LoadJobUnlocked(self, job_id):
1823     """Loads a job from the disk or memory.
1824
1825     Given a job id, this will return the cached job object if
1826     existing, or try to load the job from the disk. If loading from
1827     disk, it will also add the job to the cache.
1828
1829     @param job_id: the job id
1830     @rtype: L{_QueuedJob} or None
1831     @return: either None or the job object
1832
1833     """
1834     job = self._memcache.get(job_id, None)
1835     if job:
1836       logging.debug("Found job %s in memcache", job_id)
1837       assert job.writable, "Found read-only job in memcache"
1838       return job
1839
1840     try:
1841       job = self._LoadJobFromDisk(job_id, False)
1842       if job is None:
1843         return job
1844     except errors.JobFileCorrupted:
1845       old_path = self._GetJobPath(job_id)
1846       new_path = self._GetArchivedJobPath(job_id)
1847       if old_path == new_path:
1848         # job already archived (future case)
1849         logging.exception("Can't parse job %s", job_id)
1850       else:
1851         # non-archived case
1852         logging.exception("Can't parse job %s, will archive.", job_id)
1853         self._RenameFilesUnlocked([(old_path, new_path)])
1854       return None
1855
1856     assert job.writable, "Job just loaded is not writable"
1857
1858     self._memcache[job_id] = job
1859     logging.debug("Added job %s to the cache", job_id)
1860     return job
1861
1862   def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1863     """Load the given job file from disk.
1864
1865     Given a job file, read, load and restore it in a _QueuedJob format.
1866
1867     @type job_id: string
1868     @param job_id: job identifier
1869     @type try_archived: bool
1870     @param try_archived: Whether to try loading an archived job
1871     @rtype: L{_QueuedJob} or None
1872     @return: either None or the job object
1873
1874     """
1875     path_functions = [(self._GetJobPath, True)]
1876
1877     if try_archived:
1878       path_functions.append((self._GetArchivedJobPath, False))
1879
1880     raw_data = None
1881     writable_default = None
1882
1883     for (fn, writable_default) in path_functions:
1884       filepath = fn(job_id)
1885       logging.debug("Loading job from %s", filepath)
1886       try:
1887         raw_data = utils.ReadFile(filepath)
1888       except EnvironmentError, err:
1889         if err.errno != errno.ENOENT:
1890           raise
1891       else:
1892         break
1893
1894     if not raw_data:
1895       return None
1896
1897     if writable is None:
1898       writable = writable_default
1899
1900     try:
1901       data = serializer.LoadJson(raw_data)
1902       job = _QueuedJob.Restore(self, data, writable)
1903     except Exception, err: # pylint: disable-msg=W0703
1904       raise errors.JobFileCorrupted(err)
1905
1906     return job
1907
1908   def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1909     """Load the given job file from disk.
1910
1911     Given a job file, read, load and restore it in a _QueuedJob format.
1912     In case of error reading the job, it gets returned as None, and the
1913     exception is logged.
1914
1915     @type job_id: string
1916     @param job_id: job identifier
1917     @type try_archived: bool
1918     @param try_archived: Whether to try loading an archived job
1919     @rtype: L{_QueuedJob} or None
1920     @return: either None or the job object
1921
1922     """
1923     try:
1924       return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1925     except (errors.JobFileCorrupted, EnvironmentError):
1926       logging.exception("Can't load/parse job %s", job_id)
1927       return None
1928
1929   def _UpdateQueueSizeUnlocked(self):
1930     """Update the queue size.
1931
1932     """
1933     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1934
1935   @locking.ssynchronized(_LOCK)
1936   @_RequireOpenQueue
1937   def SetDrainFlag(self, drain_flag):
1938     """Sets the drain flag for the queue.
1939
1940     @type drain_flag: boolean
1941     @param drain_flag: Whether to set or unset the drain flag
1942
1943     """
1944     jstore.SetDrainFlag(drain_flag)
1945
1946     self._drained = drain_flag
1947
1948     return True
1949
1950   @_RequireOpenQueue
1951   def _SubmitJobUnlocked(self, job_id, ops):
1952     """Create and store a new job.
1953
1954     This enters the job into our job queue and also puts it on the new
1955     queue, in order for it to be picked up by the queue processors.
1956
1957     @type job_id: job ID
1958     @param job_id: the job ID for the new job
1959     @type ops: list
1960     @param ops: The list of OpCodes that will become the new job.
1961     @rtype: L{_QueuedJob}
1962     @return: the job object to be queued
1963     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1964     @raise errors.JobQueueFull: if the job queue has too many jobs in it
1965     @raise errors.GenericError: If an opcode is not valid
1966
1967     """
1968     # Ok when sharing the big job queue lock, as the drain file is created when
1969     # the lock is exclusive.
1970     if self._drained:
1971       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1972
1973     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1974       raise errors.JobQueueFull()
1975
1976     job = _QueuedJob(self, job_id, ops, True)
1977
1978     # Check priority
1979     for idx, op in enumerate(job.ops):
1980       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1981         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1982         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1983                                   " are %s" % (idx, op.priority, allowed))
1984
1985     # Write to disk
1986     self.UpdateJobUnlocked(job)
1987
1988     self._queue_size += 1
1989
1990     logging.debug("Adding new job %s to the cache", job_id)
1991     self._memcache[job_id] = job
1992
1993     return job
1994
1995   @locking.ssynchronized(_LOCK)
1996   @_RequireOpenQueue
1997   def SubmitJob(self, ops):
1998     """Create and store a new job.
1999
2000     @see: L{_SubmitJobUnlocked}
2001
2002     """
2003     job_id = self._NewSerialsUnlocked(1)[0]
2004     self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
2005     return job_id
2006
2007   @locking.ssynchronized(_LOCK)
2008   @_RequireOpenQueue
2009   def SubmitManyJobs(self, jobs):
2010     """Create and store multiple jobs.
2011
2012     @see: L{_SubmitJobUnlocked}
2013
2014     """
2015     results = []
2016     added_jobs = []
2017     all_job_ids = self._NewSerialsUnlocked(len(jobs))
2018     for job_id, ops in zip(all_job_ids, jobs):
2019       try:
2020         added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
2021         status = True
2022         data = job_id
2023       except errors.GenericError, err:
2024         data = ("%s; opcodes %s" %
2025                 (err, utils.CommaJoin(op.Summary() for op in ops)))
2026         status = False
2027       results.append((status, data))
2028
2029     self._EnqueueJobs(added_jobs)
2030
2031     return results
2032
2033   def _EnqueueJobs(self, jobs):
2034     """Helper function to add jobs to worker pool's queue.
2035
2036     @type jobs: list
2037     @param jobs: List of all jobs
2038
2039     """
2040     self._wpool.AddManyTasks([(job, ) for job in jobs],
2041                              priority=[job.CalcPriority() for job in jobs])
2042
2043   def _GetJobStatusForDependencies(self, job_id):
2044     """Gets the status of a job for dependencies.
2045
2046     @type job_id: string
2047     @param job_id: Job ID
2048     @raise errors.JobLost: If job can't be found
2049
2050     """
2051     if not isinstance(job_id, basestring):
2052       job_id = self._FormatJobID(job_id)
2053
2054     # Not using in-memory cache as doing so would require an exclusive lock
2055
2056     # Try to load from disk
2057     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2058
2059     assert not job.writable, "Got writable job"
2060
2061     if job:
2062       return job.CalcStatus()
2063
2064     raise errors.JobLost("Job %s not found" % job_id)
2065
2066   @_RequireOpenQueue
2067   def UpdateJobUnlocked(self, job, replicate=True):
2068     """Update a job's on disk storage.
2069
2070     After a job has been modified, this function needs to be called in
2071     order to write the changes to disk and replicate them to the other
2072     nodes.
2073
2074     @type job: L{_QueuedJob}
2075     @param job: the changed job
2076     @type replicate: boolean
2077     @param replicate: whether to replicate the change to remote nodes
2078
2079     """
2080     if __debug__:
2081       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2082       assert (finalized ^ (job.end_timestamp is None))
2083       assert job.writable, "Can't update read-only job"
2084
2085     filename = self._GetJobPath(job.id)
2086     data = serializer.DumpJson(job.Serialize(), indent=False)
2087     logging.debug("Writing job %s to %s", job.id, filename)
2088     self._UpdateJobQueueFile(filename, data, replicate)
2089
2090   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2091                         timeout):
2092     """Waits for changes in a job.
2093
2094     @type job_id: string
2095     @param job_id: Job identifier
2096     @type fields: list of strings
2097     @param fields: Which fields to check for changes
2098     @type prev_job_info: list or None
2099     @param prev_job_info: Last job information returned
2100     @type prev_log_serial: int
2101     @param prev_log_serial: Last job message serial number
2102     @type timeout: float
2103     @param timeout: maximum time to wait in seconds
2104     @rtype: tuple (job info, log entries)
2105     @return: a tuple of the job information as required via
2106         the fields parameter, and the log entries as a list
2107
2108         if the job has not changed and the timeout has expired,
2109         we instead return a special value,
2110         L{constants.JOB_NOTCHANGED}, which should be interpreted
2111         as such by the clients
2112
2113     """
2114     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2115                              writable=False)
2116
2117     helper = _WaitForJobChangesHelper()
2118
2119     return helper(self._GetJobPath(job_id), load_fn,
2120                   fields, prev_job_info, prev_log_serial, timeout)
2121
2122   @locking.ssynchronized(_LOCK)
2123   @_RequireOpenQueue
2124   def CancelJob(self, job_id):
2125     """Cancels a job.
2126
2127     This will only succeed if the job has not started yet.
2128
2129     @type job_id: string
2130     @param job_id: job ID of job to be cancelled.
2131
2132     """
2133     logging.info("Cancelling job %s", job_id)
2134
2135     job = self._LoadJobUnlocked(job_id)
2136     if not job:
2137       logging.debug("Job %s not found", job_id)
2138       return (False, "Job %s not found" % job_id)
2139
2140     assert job.writable, "Can't cancel read-only job"
2141
2142     (success, msg) = job.Cancel()
2143
2144     if success:
2145       # If the job was finalized (e.g. cancelled), this is the final write
2146       # allowed. The job can be archived anytime.
2147       self.UpdateJobUnlocked(job)
2148
2149     return (success, msg)
2150
2151   @_RequireOpenQueue
2152   def _ArchiveJobsUnlocked(self, jobs):
2153     """Archives jobs.
2154
2155     @type jobs: list of L{_QueuedJob}
2156     @param jobs: Job objects
2157     @rtype: int
2158     @return: Number of archived jobs
2159
2160     """
2161     archive_jobs = []
2162     rename_files = []
2163     for job in jobs:
2164       assert job.writable, "Can't archive read-only job"
2165
2166       if job.CalcStatus() not in constants.JOBS_FINALIZED:
2167         logging.debug("Job %s is not yet done", job.id)
2168         continue
2169
2170       archive_jobs.append(job)
2171
2172       old = self._GetJobPath(job.id)
2173       new = self._GetArchivedJobPath(job.id)
2174       rename_files.append((old, new))
2175
2176     # TODO: What if 1..n files fail to rename?
2177     self._RenameFilesUnlocked(rename_files)
2178
2179     logging.debug("Successfully archived job(s) %s",
2180                   utils.CommaJoin(job.id for job in archive_jobs))
2181
2182     # Since we haven't quite checked, above, if we succeeded or failed renaming
2183     # the files, we update the cached queue size from the filesystem. When we
2184     # get around to fix the TODO: above, we can use the number of actually
2185     # archived jobs to fix this.
2186     self._UpdateQueueSizeUnlocked()
2187     return len(archive_jobs)
2188
2189   @locking.ssynchronized(_LOCK)
2190   @_RequireOpenQueue
2191   def ArchiveJob(self, job_id):
2192     """Archives a job.
2193
2194     This is just a wrapper over L{_ArchiveJobsUnlocked}.
2195
2196     @type job_id: string
2197     @param job_id: Job ID of job to be archived.
2198     @rtype: bool
2199     @return: Whether job was archived
2200
2201     """
2202     logging.info("Archiving job %s", job_id)
2203
2204     job = self._LoadJobUnlocked(job_id)
2205     if not job:
2206       logging.debug("Job %s not found", job_id)
2207       return False
2208
2209     return self._ArchiveJobsUnlocked([job]) == 1
2210
2211   @locking.ssynchronized(_LOCK)
2212   @_RequireOpenQueue
2213   def AutoArchiveJobs(self, age, timeout):
2214     """Archives all jobs based on age.
2215
2216     The method will archive all jobs which are older than the age
2217     parameter. For jobs that don't have an end timestamp, the start
2218     timestamp will be considered. The special '-1' age will cause
2219     archival of all jobs (that are not running or queued).
2220
2221     @type age: int
2222     @param age: the minimum age in seconds
2223
2224     """
2225     logging.info("Archiving jobs with age more than %s seconds", age)
2226
2227     now = time.time()
2228     end_time = now + timeout
2229     archived_count = 0
2230     last_touched = 0
2231
2232     all_job_ids = self._GetJobIDsUnlocked()
2233     pending = []
2234     for idx, job_id in enumerate(all_job_ids):
2235       last_touched = idx + 1
2236
2237       # Not optimal because jobs could be pending
2238       # TODO: Measure average duration for job archival and take number of
2239       # pending jobs into account.
2240       if time.time() > end_time:
2241         break
2242
2243       # Returns None if the job failed to load
2244       job = self._LoadJobUnlocked(job_id)
2245       if job:
2246         if job.end_timestamp is None:
2247           if job.start_timestamp is None:
2248             job_age = job.received_timestamp
2249           else:
2250             job_age = job.start_timestamp
2251         else:
2252           job_age = job.end_timestamp
2253
2254         if age == -1 or now - job_age[0] > age:
2255           pending.append(job)
2256
2257           # Archive 10 jobs at a time
2258           if len(pending) >= 10:
2259             archived_count += self._ArchiveJobsUnlocked(pending)
2260             pending = []
2261
2262     if pending:
2263       archived_count += self._ArchiveJobsUnlocked(pending)
2264
2265     return (archived_count, len(all_job_ids) - last_touched)
2266
2267   def QueryJobs(self, job_ids, fields):
2268     """Returns a list of jobs in queue.
2269
2270     @type job_ids: list
2271     @param job_ids: sequence of job identifiers or None for all
2272     @type fields: list
2273     @param fields: names of fields to return
2274     @rtype: list
2275     @return: list one element per job, each element being list with
2276         the requested fields
2277
2278     """
2279     jobs = []
2280     list_all = False
2281     if not job_ids:
2282       # Since files are added to/removed from the queue atomically, there's no
2283       # risk of getting the job ids in an inconsistent state.
2284       job_ids = self._GetJobIDsUnlocked()
2285       list_all = True
2286
2287     for job_id in job_ids:
2288       job = self.SafeLoadJobFromDisk(job_id, True)
2289       if job is not None:
2290         jobs.append(job.GetInfo(fields))
2291       elif not list_all:
2292         jobs.append(None)
2293
2294     return jobs
2295
2296   @locking.ssynchronized(_LOCK)
2297   @_RequireOpenQueue
2298   def Shutdown(self):
2299     """Stops the job queue.
2300
2301     This shutdowns all the worker threads an closes the queue.
2302
2303     """
2304     self._wpool.TerminateWorkers()
2305
2306     self._queue_filelock.Close()
2307     self._queue_filelock = None