Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ b91a34a5

History | View | Annotate | Download (19.7 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 6c5a7090 Michael Hanselmann
Locking:
25 6c5a7090 Michael Hanselmann
There's a single, large lock in the JobQueue class. It's used by all other
26 6c5a7090 Michael Hanselmann
classes in this module.
27 6c5a7090 Michael Hanselmann

28 6c5a7090 Michael Hanselmann
"""
29 498ae1cc Iustin Pop
30 f1da30e6 Michael Hanselmann
import os
31 e2715f69 Michael Hanselmann
import logging
32 e2715f69 Michael Hanselmann
import threading
33 f1da30e6 Michael Hanselmann
import errno
34 f1da30e6 Michael Hanselmann
import re
35 f1048938 Iustin Pop
import time
36 5685c1a5 Michael Hanselmann
import weakref
37 498ae1cc Iustin Pop
38 e2715f69 Michael Hanselmann
from ganeti import constants
39 f1da30e6 Michael Hanselmann
from ganeti import serializer
40 e2715f69 Michael Hanselmann
from ganeti import workerpool
41 f1da30e6 Michael Hanselmann
from ganeti import opcodes
42 7a1ecaed Iustin Pop
from ganeti import errors
43 e2715f69 Michael Hanselmann
from ganeti import mcpu
44 7996a135 Iustin Pop
from ganeti import utils
45 04ab05ce Michael Hanselmann
from ganeti import jstore
46 c3f0a12f Iustin Pop
from ganeti import rpc
47 e2715f69 Michael Hanselmann
48 e2715f69 Michael Hanselmann
49 e2715f69 Michael Hanselmann
JOBQUEUE_THREADS = 5
50 e2715f69 Michael Hanselmann
51 498ae1cc Iustin Pop
52 70552c46 Michael Hanselmann
def TimeStampNow():
53 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
54 70552c46 Michael Hanselmann
55 70552c46 Michael Hanselmann
56 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
57 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
58 e2715f69 Michael Hanselmann

59 f1048938 Iustin Pop
  The 'log' attribute holds the execution log and consists of tuples
60 6c5a7090 Michael Hanselmann
  of the form (log_serial, timestamp, level, message).
61 f1048938 Iustin Pop

62 e2715f69 Michael Hanselmann
  """
63 85f03e0d Michael Hanselmann
  def __init__(self, op):
64 85f03e0d Michael Hanselmann
    self.input = op
65 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
66 85f03e0d Michael Hanselmann
    self.result = None
67 85f03e0d Michael Hanselmann
    self.log = []
68 70552c46 Michael Hanselmann
    self.start_timestamp = None
69 70552c46 Michael Hanselmann
    self.end_timestamp = None
70 f1da30e6 Michael Hanselmann
71 f1da30e6 Michael Hanselmann
  @classmethod
72 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
73 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
74 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
75 85f03e0d Michael Hanselmann
    obj.status = state["status"]
76 85f03e0d Michael Hanselmann
    obj.result = state["result"]
77 85f03e0d Michael Hanselmann
    obj.log = state["log"]
78 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
79 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
80 f1da30e6 Michael Hanselmann
    return obj
81 f1da30e6 Michael Hanselmann
82 f1da30e6 Michael Hanselmann
  def Serialize(self):
83 6c5a7090 Michael Hanselmann
    return {
84 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
85 6c5a7090 Michael Hanselmann
      "status": self.status,
86 6c5a7090 Michael Hanselmann
      "result": self.result,
87 6c5a7090 Michael Hanselmann
      "log": self.log,
88 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
89 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
90 6c5a7090 Michael Hanselmann
      }
91 f1048938 Iustin Pop
92 e2715f69 Michael Hanselmann
93 e2715f69 Michael Hanselmann
class _QueuedJob(object):
94 e2715f69 Michael Hanselmann
  """In-memory job representation.
95 e2715f69 Michael Hanselmann

96 6c5a7090 Michael Hanselmann
  This is what we use to track the user-submitted jobs. Locking must be taken
97 6c5a7090 Michael Hanselmann
  care of by users of this class.
98 e2715f69 Michael Hanselmann

99 e2715f69 Michael Hanselmann
  """
100 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
101 e2715f69 Michael Hanselmann
    if not ops:
102 e2715f69 Michael Hanselmann
      # TODO
103 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
104 e2715f69 Michael Hanselmann
105 85f03e0d Michael Hanselmann
    self.queue = queue
106 f1da30e6 Michael Hanselmann
    self.id = job_id
107 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
108 85f03e0d Michael Hanselmann
    self.run_op_index = -1
109 6c5a7090 Michael Hanselmann
    self.log_serial = 0
110 6c5a7090 Michael Hanselmann
111 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
112 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
113 f1da30e6 Michael Hanselmann
114 f1da30e6 Michael Hanselmann
  @classmethod
115 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
116 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
117 85f03e0d Michael Hanselmann
    obj.queue = queue
118 85f03e0d Michael Hanselmann
    obj.id = state["id"]
119 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
120 6c5a7090 Michael Hanselmann
121 6c5a7090 Michael Hanselmann
    obj.ops = []
122 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
123 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
124 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
125 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
126 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
127 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
128 6c5a7090 Michael Hanselmann
129 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
130 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
131 6c5a7090 Michael Hanselmann
132 f1da30e6 Michael Hanselmann
    return obj
133 f1da30e6 Michael Hanselmann
134 f1da30e6 Michael Hanselmann
  def Serialize(self):
135 f1da30e6 Michael Hanselmann
    return {
136 f1da30e6 Michael Hanselmann
      "id": self.id,
137 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
138 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
139 f1da30e6 Michael Hanselmann
      }
140 f1da30e6 Michael Hanselmann
141 85f03e0d Michael Hanselmann
  def CalcStatus(self):
142 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
143 e2715f69 Michael Hanselmann
144 e2715f69 Michael Hanselmann
    all_success = True
145 85f03e0d Michael Hanselmann
    for op in self.ops:
146 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
147 e2715f69 Michael Hanselmann
        continue
148 e2715f69 Michael Hanselmann
149 e2715f69 Michael Hanselmann
      all_success = False
150 e2715f69 Michael Hanselmann
151 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
152 e2715f69 Michael Hanselmann
        pass
153 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
154 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
155 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
156 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
157 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
158 f1da30e6 Michael Hanselmann
        break
159 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
160 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
161 4cb1d919 Michael Hanselmann
        break
162 e2715f69 Michael Hanselmann
163 e2715f69 Michael Hanselmann
    if all_success:
164 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
165 e2715f69 Michael Hanselmann
166 e2715f69 Michael Hanselmann
    return status
167 e2715f69 Michael Hanselmann
168 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
169 6c5a7090 Michael Hanselmann
    if newer_than is None:
170 6c5a7090 Michael Hanselmann
      serial = -1
171 6c5a7090 Michael Hanselmann
    else:
172 6c5a7090 Michael Hanselmann
      serial = newer_than
173 6c5a7090 Michael Hanselmann
174 6c5a7090 Michael Hanselmann
    entries = []
175 6c5a7090 Michael Hanselmann
    for op in self.ops:
176 6c5a7090 Michael Hanselmann
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
177 6c5a7090 Michael Hanselmann
178 6c5a7090 Michael Hanselmann
    return entries
179 6c5a7090 Michael Hanselmann
180 f1048938 Iustin Pop
181 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
182 85f03e0d Michael Hanselmann
  def RunTask(self, job):
183 e2715f69 Michael Hanselmann
    """Job executor.
184 e2715f69 Michael Hanselmann

185 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
186 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
187 e2715f69 Michael Hanselmann

188 e2715f69 Michael Hanselmann
    """
189 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
190 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
191 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
192 85f03e0d Michael Hanselmann
    queue = job.queue
193 e2715f69 Michael Hanselmann
    try:
194 85f03e0d Michael Hanselmann
      try:
195 85f03e0d Michael Hanselmann
        count = len(job.ops)
196 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
197 85f03e0d Michael Hanselmann
          try:
198 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
199 85f03e0d Michael Hanselmann
200 85f03e0d Michael Hanselmann
            queue.acquire()
201 85f03e0d Michael Hanselmann
            try:
202 85f03e0d Michael Hanselmann
              job.run_op_index = idx
203 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_RUNNING
204 85f03e0d Michael Hanselmann
              op.result = None
205 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
206 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
207 85f03e0d Michael Hanselmann
208 38206f3c Iustin Pop
              input_opcode = op.input
209 85f03e0d Michael Hanselmann
            finally:
210 85f03e0d Michael Hanselmann
              queue.release()
211 85f03e0d Michael Hanselmann
212 dfe57c22 Michael Hanselmann
            def _Log(*args):
213 6c5a7090 Michael Hanselmann
              """Append a log entry.
214 6c5a7090 Michael Hanselmann

215 6c5a7090 Michael Hanselmann
              """
216 6c5a7090 Michael Hanselmann
              assert len(args) < 3
217 6c5a7090 Michael Hanselmann
218 6c5a7090 Michael Hanselmann
              if len(args) == 1:
219 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
220 6c5a7090 Michael Hanselmann
                log_msg = args[0]
221 6c5a7090 Michael Hanselmann
              else:
222 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
223 6c5a7090 Michael Hanselmann
224 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
225 6c5a7090 Michael Hanselmann
              # precision.
226 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
227 dfe57c22 Michael Hanselmann
228 6c5a7090 Michael Hanselmann
              queue.acquire()
229 dfe57c22 Michael Hanselmann
              try:
230 6c5a7090 Michael Hanselmann
                job.log_serial += 1
231 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
232 6c5a7090 Michael Hanselmann
233 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
234 dfe57c22 Michael Hanselmann
              finally:
235 6c5a7090 Michael Hanselmann
                queue.release()
236 dfe57c22 Michael Hanselmann
237 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
238 dfe57c22 Michael Hanselmann
            result = proc.ExecOpCode(input_opcode, _Log)
239 85f03e0d Michael Hanselmann
240 85f03e0d Michael Hanselmann
            queue.acquire()
241 85f03e0d Michael Hanselmann
            try:
242 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
243 85f03e0d Michael Hanselmann
              op.result = result
244 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
245 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
246 85f03e0d Michael Hanselmann
            finally:
247 85f03e0d Michael Hanselmann
              queue.release()
248 85f03e0d Michael Hanselmann
249 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
250 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
251 85f03e0d Michael Hanselmann
          except Exception, err:
252 85f03e0d Michael Hanselmann
            queue.acquire()
253 85f03e0d Michael Hanselmann
            try:
254 85f03e0d Michael Hanselmann
              try:
255 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
256 85f03e0d Michael Hanselmann
                op.result = str(err)
257 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
258 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
259 85f03e0d Michael Hanselmann
              finally:
260 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
261 85f03e0d Michael Hanselmann
            finally:
262 85f03e0d Michael Hanselmann
              queue.release()
263 85f03e0d Michael Hanselmann
            raise
264 85f03e0d Michael Hanselmann
265 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
266 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
267 85f03e0d Michael Hanselmann
      except:
268 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
269 e2715f69 Michael Hanselmann
    finally:
270 85f03e0d Michael Hanselmann
      queue.acquire()
271 85f03e0d Michael Hanselmann
      try:
272 65548ed5 Michael Hanselmann
        try:
273 65548ed5 Michael Hanselmann
          job.run_op_idx = -1
274 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
275 65548ed5 Michael Hanselmann
        finally:
276 65548ed5 Michael Hanselmann
          job_id = job.id
277 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
278 85f03e0d Michael Hanselmann
      finally:
279 85f03e0d Michael Hanselmann
        queue.release()
280 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
281 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
282 e2715f69 Michael Hanselmann
283 e2715f69 Michael Hanselmann
284 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
285 5bdce580 Michael Hanselmann
  def __init__(self, queue):
286 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
287 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
288 5bdce580 Michael Hanselmann
    self.queue = queue
289 e2715f69 Michael Hanselmann
290 e2715f69 Michael Hanselmann
291 85f03e0d Michael Hanselmann
class JobQueue(object):
292 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
293 f1da30e6 Michael Hanselmann
294 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
295 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
296 db37da70 Michael Hanselmann

297 db37da70 Michael Hanselmann
    This function should be used for all "public" functions. That is, functions
298 db37da70 Michael Hanselmann
    usually called from other classes.
299 db37da70 Michael Hanselmann

300 db37da70 Michael Hanselmann
    Important: Use this decorator only after utils.LockedMethod!
301 db37da70 Michael Hanselmann

302 db37da70 Michael Hanselmann
    Example:
303 db37da70 Michael Hanselmann
      @utils.LockedMethod
304 db37da70 Michael Hanselmann
      @_RequireOpenQueue
305 db37da70 Michael Hanselmann
      def Example(self):
306 db37da70 Michael Hanselmann
        pass
307 db37da70 Michael Hanselmann

308 db37da70 Michael Hanselmann
    """
309 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
310 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
311 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
312 db37da70 Michael Hanselmann
    return wrapper
313 db37da70 Michael Hanselmann
314 85f03e0d Michael Hanselmann
  def __init__(self, context):
315 5bdce580 Michael Hanselmann
    self.context = context
316 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
317 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
318 f1da30e6 Michael Hanselmann
319 85f03e0d Michael Hanselmann
    # Locking
320 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
321 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
322 85f03e0d Michael Hanselmann
    self.release = self._lock.release
323 85f03e0d Michael Hanselmann
324 04ab05ce Michael Hanselmann
    # Initialize
325 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
326 f1da30e6 Michael Hanselmann
327 04ab05ce Michael Hanselmann
    # Read serial file
328 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
329 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
330 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
331 c4beba1c Iustin Pop
332 23752136 Michael Hanselmann
    # Get initial list of nodes
333 8e00939c Michael Hanselmann
    self._nodes = set(self.context.cfg.GetNodeList())
334 8e00939c Michael Hanselmann
335 8e00939c Michael Hanselmann
    # Remove master node
336 8e00939c Michael Hanselmann
    try:
337 8e00939c Michael Hanselmann
      self._nodes.remove(self._my_hostname)
338 8e00939c Michael Hanselmann
    except ValueError:
339 8e00939c Michael Hanselmann
      pass
340 23752136 Michael Hanselmann
341 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
342 23752136 Michael Hanselmann
343 85f03e0d Michael Hanselmann
    # Setup worker pool
344 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
345 85f03e0d Michael Hanselmann
346 85f03e0d Michael Hanselmann
    # We need to lock here because WorkerPool.AddTask() may start a job while
347 85f03e0d Michael Hanselmann
    # we're still doing our work.
348 85f03e0d Michael Hanselmann
    self.acquire()
349 85f03e0d Michael Hanselmann
    try:
350 85f03e0d Michael Hanselmann
      for job in self._GetJobsUnlocked(None):
351 85f03e0d Michael Hanselmann
        status = job.CalcStatus()
352 85f03e0d Michael Hanselmann
353 85f03e0d Michael Hanselmann
        if status in (constants.JOB_STATUS_QUEUED, ):
354 85f03e0d Michael Hanselmann
          self._wpool.AddTask(job)
355 85f03e0d Michael Hanselmann
356 85f03e0d Michael Hanselmann
        elif status in (constants.JOB_STATUS_RUNNING, ):
357 85f03e0d Michael Hanselmann
          logging.warning("Unfinished job %s found: %s", job.id, job)
358 85f03e0d Michael Hanselmann
          try:
359 85f03e0d Michael Hanselmann
            for op in job.ops:
360 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_ERROR
361 85f03e0d Michael Hanselmann
              op.result = "Unclean master daemon shutdown"
362 85f03e0d Michael Hanselmann
          finally:
363 85f03e0d Michael Hanselmann
            self.UpdateJobUnlocked(job)
364 85f03e0d Michael Hanselmann
    finally:
365 85f03e0d Michael Hanselmann
      self.release()
366 85f03e0d Michael Hanselmann
367 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
368 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
369 d2e03a33 Michael Hanselmann
  def AddNode(self, node_name):
370 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
371 23752136 Michael Hanselmann
372 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
373 9f774ee8 Michael Hanselmann
    rpc.call_jobqueue_purge(node_name)
374 23752136 Michael Hanselmann
375 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
376 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
377 23752136 Michael Hanselmann
378 d2e03a33 Michael Hanselmann
    # Upload current serial file
379 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
380 d2e03a33 Michael Hanselmann
381 d2e03a33 Michael Hanselmann
    for file_name in files:
382 9f774ee8 Michael Hanselmann
      # Read file content
383 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
384 9f774ee8 Michael Hanselmann
      try:
385 9f774ee8 Michael Hanselmann
        content = fd.read()
386 9f774ee8 Michael Hanselmann
      finally:
387 9f774ee8 Michael Hanselmann
        fd.close()
388 9f774ee8 Michael Hanselmann
389 9f774ee8 Michael Hanselmann
      result = rpc.call_jobqueue_update([node_name], file_name, content)
390 d2e03a33 Michael Hanselmann
      if not result[node_name]:
391 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
392 d2e03a33 Michael Hanselmann
393 d2e03a33 Michael Hanselmann
    self._nodes.add(node_name)
394 d2e03a33 Michael Hanselmann
395 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
396 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
397 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
398 23752136 Michael Hanselmann
    try:
399 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
400 d2e03a33 Michael Hanselmann
      self._nodes.remove(node_name)
401 d2e03a33 Michael Hanselmann
    except KeyError:
402 23752136 Michael Hanselmann
      pass
403 23752136 Michael Hanselmann
404 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
405 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
406 8e00939c Michael Hanselmann

407 8e00939c Michael Hanselmann
    """
408 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
409 8e00939c Michael Hanselmann
410 23752136 Michael Hanselmann
    failed_nodes = 0
411 9f774ee8 Michael Hanselmann
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
412 8e00939c Michael Hanselmann
    for node in self._nodes:
413 23752136 Michael Hanselmann
      if not result[node]:
414 23752136 Michael Hanselmann
        failed_nodes += 1
415 23752136 Michael Hanselmann
        logging.error("Copy of job queue file to node %s failed", node)
416 23752136 Michael Hanselmann
417 23752136 Michael Hanselmann
    # TODO: check failed_nodes
418 23752136 Michael Hanselmann
419 abc1f2ce Michael Hanselmann
  def _RenameFileUnlocked(self, old, new):
420 abc1f2ce Michael Hanselmann
    os.rename(old, new)
421 abc1f2ce Michael Hanselmann
422 abc1f2ce Michael Hanselmann
    result = rpc.call_jobqueue_rename(self._nodes, old, new)
423 abc1f2ce Michael Hanselmann
    for node in self._nodes:
424 abc1f2ce Michael Hanselmann
      if not result[node]:
425 abc1f2ce Michael Hanselmann
        logging.error("Moving %s to %s failed on %s", old, new, node)
426 abc1f2ce Michael Hanselmann
427 abc1f2ce Michael Hanselmann
    # TODO: check failed nodes
428 abc1f2ce Michael Hanselmann
429 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
430 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
431 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
432 85f03e0d Michael Hanselmann
    if job_id < 0:
433 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
434 85f03e0d Michael Hanselmann
435 85f03e0d Michael Hanselmann
    return str(job_id)
436 85f03e0d Michael Hanselmann
437 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
438 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
439 f1da30e6 Michael Hanselmann

440 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
441 f1da30e6 Michael Hanselmann

442 f1da30e6 Michael Hanselmann
    Returns: A string representing the job identifier.
443 f1da30e6 Michael Hanselmann

444 f1da30e6 Michael Hanselmann
    """
445 f1da30e6 Michael Hanselmann
    # New number
446 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
447 f1da30e6 Michael Hanselmann
448 f1da30e6 Michael Hanselmann
    # Write to file
449 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
450 23752136 Michael Hanselmann
                                        "%s\n" % serial)
451 f1da30e6 Michael Hanselmann
452 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
453 f1da30e6 Michael Hanselmann
    self._last_serial = serial
454 f1da30e6 Michael Hanselmann
455 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
456 f1da30e6 Michael Hanselmann
457 85f03e0d Michael Hanselmann
  @staticmethod
458 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
459 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
460 f1da30e6 Michael Hanselmann
461 85f03e0d Michael Hanselmann
  @staticmethod
462 85f03e0d Michael Hanselmann
  def _GetArchivedJobPath(job_id):
463 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
464 0cb94105 Michael Hanselmann
465 85f03e0d Michael Hanselmann
  @classmethod
466 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
467 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
468 fae737ac Michael Hanselmann
    if m:
469 fae737ac Michael Hanselmann
      return m.group(1)
470 fae737ac Michael Hanselmann
    else:
471 fae737ac Michael Hanselmann
      return None
472 fae737ac Michael Hanselmann
473 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
474 911a495b Iustin Pop
    """Return all known job IDs.
475 911a495b Iustin Pop

476 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
477 911a495b Iustin Pop
    included. Currently this argument is unused.
478 911a495b Iustin Pop

479 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
480 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
481 ac0930b9 Iustin Pop
    extra IDs).
482 ac0930b9 Iustin Pop

483 911a495b Iustin Pop
    """
484 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
485 f0d874fe Iustin Pop
    jlist.sort()
486 f0d874fe Iustin Pop
    return jlist
487 911a495b Iustin Pop
488 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
489 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
490 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
491 f1da30e6 Michael Hanselmann
492 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
493 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
494 5685c1a5 Michael Hanselmann
    if job:
495 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
496 5685c1a5 Michael Hanselmann
      return job
497 ac0930b9 Iustin Pop
498 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
499 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
500 f1da30e6 Michael Hanselmann
    try:
501 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
502 f1da30e6 Michael Hanselmann
    except IOError, err:
503 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
504 f1da30e6 Michael Hanselmann
        return None
505 f1da30e6 Michael Hanselmann
      raise
506 f1da30e6 Michael Hanselmann
    try:
507 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
508 f1da30e6 Michael Hanselmann
    finally:
509 f1da30e6 Michael Hanselmann
      fd.close()
510 f1da30e6 Michael Hanselmann
511 ac0930b9 Iustin Pop
    job = _QueuedJob.Restore(self, data)
512 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
513 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
514 ac0930b9 Iustin Pop
    return job
515 f1da30e6 Michael Hanselmann
516 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
517 911a495b Iustin Pop
    if not job_ids:
518 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
519 f1da30e6 Michael Hanselmann
520 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
521 f1da30e6 Michael Hanselmann
522 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
523 db37da70 Michael Hanselmann
  @_RequireOpenQueue
524 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
525 85f03e0d Michael Hanselmann
    """Create and store a new job.
526 f1da30e6 Michael Hanselmann

527 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
528 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
529 c3f0a12f Iustin Pop

530 c3f0a12f Iustin Pop
    @type ops: list
531 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
532 c3f0a12f Iustin Pop

533 c3f0a12f Iustin Pop
    """
534 f1da30e6 Michael Hanselmann
    # Get job identifier
535 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
536 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
537 f1da30e6 Michael Hanselmann
538 f1da30e6 Michael Hanselmann
    # Write to disk
539 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
540 f1da30e6 Michael Hanselmann
541 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
542 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
543 ac0930b9 Iustin Pop
544 85f03e0d Michael Hanselmann
    # Add to worker pool
545 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
546 85f03e0d Michael Hanselmann
547 85f03e0d Michael Hanselmann
    return job.id
548 f1da30e6 Michael Hanselmann
549 db37da70 Michael Hanselmann
  @_RequireOpenQueue
550 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
551 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
552 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
553 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
554 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
555 ac0930b9 Iustin Pop
556 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
557 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
558 dfe57c22 Michael Hanselmann
559 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
560 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
561 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
562 5c735209 Iustin Pop
                        timeout):
563 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
564 6c5a7090 Michael Hanselmann

565 6c5a7090 Michael Hanselmann
    @type job_id: string
566 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
567 6c5a7090 Michael Hanselmann
    @type fields: list of strings
568 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
569 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
570 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
571 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
572 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
573 5c735209 Iustin Pop
    @type timeout: float
574 5c735209 Iustin Pop
    @param timeout: maximum time to wait
575 6c5a7090 Michael Hanselmann

576 6c5a7090 Michael Hanselmann
    """
577 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
578 5c735209 Iustin Pop
    end_time = time.time() + timeout
579 dfe57c22 Michael Hanselmann
    while True:
580 5c735209 Iustin Pop
      delta_time = end_time - time.time()
581 5c735209 Iustin Pop
      if delta_time < 0:
582 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
583 5c735209 Iustin Pop
584 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
585 6c5a7090 Michael Hanselmann
      if not job:
586 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
587 6c5a7090 Michael Hanselmann
        break
588 dfe57c22 Michael Hanselmann
589 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
590 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
591 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
592 dfe57c22 Michael Hanselmann
593 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
594 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
595 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
596 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
597 dfe57c22 Michael Hanselmann
      # significantly different.
598 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
599 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
600 dfe57c22 Michael Hanselmann
601 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
602 6c5a7090 Michael Hanselmann
                        constants.JOB_STATUS_RUNNING):
603 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
604 6c5a7090 Michael Hanselmann
        # no changes.
605 dfe57c22 Michael Hanselmann
        break
606 dfe57c22 Michael Hanselmann
607 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
608 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
609 6c5a7090 Michael Hanselmann
        break
610 6c5a7090 Michael Hanselmann
611 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
612 6c5a7090 Michael Hanselmann
613 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
614 5c735209 Iustin Pop
      job.change.wait(delta_time)
615 dfe57c22 Michael Hanselmann
616 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
617 dfe57c22 Michael Hanselmann
618 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
619 dfe57c22 Michael Hanselmann
620 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
621 db37da70 Michael Hanselmann
  @_RequireOpenQueue
622 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
623 188c5e0a Michael Hanselmann
    """Cancels a job.
624 188c5e0a Michael Hanselmann

625 188c5e0a Michael Hanselmann
    @type job_id: string
626 188c5e0a Michael Hanselmann
    @param job_id: Job ID of job to be cancelled.
627 188c5e0a Michael Hanselmann

628 188c5e0a Michael Hanselmann
    """
629 188c5e0a Michael Hanselmann
    logging.debug("Cancelling job %s", job_id)
630 188c5e0a Michael Hanselmann
631 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
632 188c5e0a Michael Hanselmann
    if not job:
633 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
634 188c5e0a Michael Hanselmann
      return
635 188c5e0a Michael Hanselmann
636 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
637 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
638 188c5e0a Michael Hanselmann
      return
639 188c5e0a Michael Hanselmann
640 85f03e0d Michael Hanselmann
    try:
641 85f03e0d Michael Hanselmann
      for op in job.ops:
642 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
643 85f03e0d Michael Hanselmann
        op.result = "Job cancelled by request"
644 85f03e0d Michael Hanselmann
    finally:
645 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
646 188c5e0a Michael Hanselmann
647 c609f802 Michael Hanselmann
  @utils.LockedMethod
648 db37da70 Michael Hanselmann
  @_RequireOpenQueue
649 f1da30e6 Michael Hanselmann
  def ArchiveJob(self, job_id):
650 c609f802 Michael Hanselmann
    """Archives a job.
651 c609f802 Michael Hanselmann

652 c609f802 Michael Hanselmann
    @type job_id: string
653 c609f802 Michael Hanselmann
    @param job_id: Job ID of job to be archived.
654 c609f802 Michael Hanselmann

655 c609f802 Michael Hanselmann
    """
656 c609f802 Michael Hanselmann
    logging.debug("Archiving job %s", job_id)
657 c609f802 Michael Hanselmann
658 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
659 c609f802 Michael Hanselmann
    if not job:
660 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
661 c609f802 Michael Hanselmann
      return
662 c609f802 Michael Hanselmann
663 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
664 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_SUCCESS,
665 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_ERROR):
666 85f03e0d Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
667 c609f802 Michael Hanselmann
      return
668 c609f802 Michael Hanselmann
669 5685c1a5 Michael Hanselmann
    old = self._GetJobPath(job.id)
670 5685c1a5 Michael Hanselmann
    new = self._GetArchivedJobPath(job.id)
671 c609f802 Michael Hanselmann
672 5685c1a5 Michael Hanselmann
    self._RenameFileUnlocked(old, new)
673 c609f802 Michael Hanselmann
674 5685c1a5 Michael Hanselmann
    logging.debug("Successfully archived job %s", job.id)
675 f1da30e6 Michael Hanselmann
676 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
677 e2715f69 Michael Hanselmann
    row = []
678 e2715f69 Michael Hanselmann
    for fname in fields:
679 e2715f69 Michael Hanselmann
      if fname == "id":
680 e2715f69 Michael Hanselmann
        row.append(job.id)
681 e2715f69 Michael Hanselmann
      elif fname == "status":
682 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
683 af30b2fd Michael Hanselmann
      elif fname == "ops":
684 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
685 af30b2fd Michael Hanselmann
      elif fname == "opresult":
686 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
687 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
688 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
689 e2715f69 Michael Hanselmann
      else:
690 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
691 e2715f69 Michael Hanselmann
    return row
692 e2715f69 Michael Hanselmann
693 85f03e0d Michael Hanselmann
  @utils.LockedMethod
694 db37da70 Michael Hanselmann
  @_RequireOpenQueue
695 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
696 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
697 e2715f69 Michael Hanselmann

698 e2715f69 Michael Hanselmann
    Args:
699 e2715f69 Michael Hanselmann
    - job_ids: Sequence of job identifiers or None for all
700 e2715f69 Michael Hanselmann
    - fields: Names of fields to return
701 e2715f69 Michael Hanselmann

702 e2715f69 Michael Hanselmann
    """
703 85f03e0d Michael Hanselmann
    jobs = []
704 e2715f69 Michael Hanselmann
705 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
706 85f03e0d Michael Hanselmann
      if job is None:
707 85f03e0d Michael Hanselmann
        jobs.append(None)
708 85f03e0d Michael Hanselmann
      else:
709 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
710 e2715f69 Michael Hanselmann
711 85f03e0d Michael Hanselmann
    return jobs
712 e2715f69 Michael Hanselmann
713 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
714 db37da70 Michael Hanselmann
  @_RequireOpenQueue
715 e2715f69 Michael Hanselmann
  def Shutdown(self):
716 e2715f69 Michael Hanselmann
    """Stops the job queue.
717 e2715f69 Michael Hanselmann

718 e2715f69 Michael Hanselmann
    """
719 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
720 85f03e0d Michael Hanselmann
721 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
722 04ab05ce Michael Hanselmann
    self._queue_lock = None