Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 65548ed5

History | View | Annotate | Download (19.7 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 498ae1cc Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 6c5a7090 Michael Hanselmann
Locking:
25 6c5a7090 Michael Hanselmann
There's a single, large lock in the JobQueue class. It's used by all other
26 6c5a7090 Michael Hanselmann
classes in this module.
27 6c5a7090 Michael Hanselmann

28 6c5a7090 Michael Hanselmann
"""
29 498ae1cc Iustin Pop
30 f1da30e6 Michael Hanselmann
import os
31 e2715f69 Michael Hanselmann
import logging
32 e2715f69 Michael Hanselmann
import threading
33 f1da30e6 Michael Hanselmann
import errno
34 f1da30e6 Michael Hanselmann
import re
35 f1048938 Iustin Pop
import time
36 498ae1cc Iustin Pop
37 e2715f69 Michael Hanselmann
from ganeti import constants
38 f1da30e6 Michael Hanselmann
from ganeti import serializer
39 e2715f69 Michael Hanselmann
from ganeti import workerpool
40 f1da30e6 Michael Hanselmann
from ganeti import opcodes
41 7a1ecaed Iustin Pop
from ganeti import errors
42 e2715f69 Michael Hanselmann
from ganeti import mcpu
43 7996a135 Iustin Pop
from ganeti import utils
44 04ab05ce Michael Hanselmann
from ganeti import jstore
45 c3f0a12f Iustin Pop
from ganeti import rpc
46 e2715f69 Michael Hanselmann
47 e2715f69 Michael Hanselmann
48 e2715f69 Michael Hanselmann
JOBQUEUE_THREADS = 5
49 e2715f69 Michael Hanselmann
50 498ae1cc Iustin Pop
51 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
52 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
53 e2715f69 Michael Hanselmann

54 f1048938 Iustin Pop
  The 'log' attribute holds the execution log and consists of tuples
55 6c5a7090 Michael Hanselmann
  of the form (log_serial, timestamp, level, message).
56 f1048938 Iustin Pop

57 e2715f69 Michael Hanselmann
  """
58 85f03e0d Michael Hanselmann
  def __init__(self, op):
59 85f03e0d Michael Hanselmann
    self.input = op
60 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
61 85f03e0d Michael Hanselmann
    self.result = None
62 85f03e0d Michael Hanselmann
    self.log = []
63 f1da30e6 Michael Hanselmann
64 f1da30e6 Michael Hanselmann
  @classmethod
65 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
66 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
67 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
68 85f03e0d Michael Hanselmann
    obj.status = state["status"]
69 85f03e0d Michael Hanselmann
    obj.result = state["result"]
70 85f03e0d Michael Hanselmann
    obj.log = state["log"]
71 f1da30e6 Michael Hanselmann
    return obj
72 f1da30e6 Michael Hanselmann
73 f1da30e6 Michael Hanselmann
  def Serialize(self):
74 6c5a7090 Michael Hanselmann
    return {
75 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
76 6c5a7090 Michael Hanselmann
      "status": self.status,
77 6c5a7090 Michael Hanselmann
      "result": self.result,
78 6c5a7090 Michael Hanselmann
      "log": self.log,
79 6c5a7090 Michael Hanselmann
      }
80 f1048938 Iustin Pop
81 e2715f69 Michael Hanselmann
82 e2715f69 Michael Hanselmann
class _QueuedJob(object):
83 e2715f69 Michael Hanselmann
  """In-memory job representation.
84 e2715f69 Michael Hanselmann

85 6c5a7090 Michael Hanselmann
  This is what we use to track the user-submitted jobs. Locking must be taken
86 6c5a7090 Michael Hanselmann
  care of by users of this class.
87 e2715f69 Michael Hanselmann

88 e2715f69 Michael Hanselmann
  """
89 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
90 e2715f69 Michael Hanselmann
    if not ops:
91 e2715f69 Michael Hanselmann
      # TODO
92 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
93 e2715f69 Michael Hanselmann
94 85f03e0d Michael Hanselmann
    self.queue = queue
95 f1da30e6 Michael Hanselmann
    self.id = job_id
96 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
97 85f03e0d Michael Hanselmann
    self.run_op_index = -1
98 6c5a7090 Michael Hanselmann
    self.log_serial = 0
99 6c5a7090 Michael Hanselmann
100 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
101 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
102 f1da30e6 Michael Hanselmann
103 f1da30e6 Michael Hanselmann
  @classmethod
104 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
105 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
106 85f03e0d Michael Hanselmann
    obj.queue = queue
107 85f03e0d Michael Hanselmann
    obj.id = state["id"]
108 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
109 6c5a7090 Michael Hanselmann
110 6c5a7090 Michael Hanselmann
    obj.ops = []
111 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
112 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
113 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
114 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
115 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
116 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
117 6c5a7090 Michael Hanselmann
118 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
119 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
120 6c5a7090 Michael Hanselmann
121 f1da30e6 Michael Hanselmann
    return obj
122 f1da30e6 Michael Hanselmann
123 f1da30e6 Michael Hanselmann
  def Serialize(self):
124 f1da30e6 Michael Hanselmann
    return {
125 f1da30e6 Michael Hanselmann
      "id": self.id,
126 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
127 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
128 f1da30e6 Michael Hanselmann
      }
129 f1da30e6 Michael Hanselmann
130 85f03e0d Michael Hanselmann
  def CalcStatus(self):
131 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
132 e2715f69 Michael Hanselmann
133 e2715f69 Michael Hanselmann
    all_success = True
134 85f03e0d Michael Hanselmann
    for op in self.ops:
135 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
136 e2715f69 Michael Hanselmann
        continue
137 e2715f69 Michael Hanselmann
138 e2715f69 Michael Hanselmann
      all_success = False
139 e2715f69 Michael Hanselmann
140 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
141 e2715f69 Michael Hanselmann
        pass
142 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
143 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
144 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
145 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
146 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
147 f1da30e6 Michael Hanselmann
        break
148 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
149 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
150 4cb1d919 Michael Hanselmann
        break
151 e2715f69 Michael Hanselmann
152 e2715f69 Michael Hanselmann
    if all_success:
153 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
154 e2715f69 Michael Hanselmann
155 e2715f69 Michael Hanselmann
    return status
156 e2715f69 Michael Hanselmann
157 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
158 6c5a7090 Michael Hanselmann
    if newer_than is None:
159 6c5a7090 Michael Hanselmann
      serial = -1
160 6c5a7090 Michael Hanselmann
    else:
161 6c5a7090 Michael Hanselmann
      serial = newer_than
162 6c5a7090 Michael Hanselmann
163 6c5a7090 Michael Hanselmann
    entries = []
164 6c5a7090 Michael Hanselmann
    for op in self.ops:
165 6c5a7090 Michael Hanselmann
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
166 6c5a7090 Michael Hanselmann
167 6c5a7090 Michael Hanselmann
    return entries
168 6c5a7090 Michael Hanselmann
169 f1048938 Iustin Pop
170 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
171 85f03e0d Michael Hanselmann
  def RunTask(self, job):
172 e2715f69 Michael Hanselmann
    """Job executor.
173 e2715f69 Michael Hanselmann

174 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
175 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
176 e2715f69 Michael Hanselmann

177 e2715f69 Michael Hanselmann
    """
178 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
179 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
180 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
181 85f03e0d Michael Hanselmann
    queue = job.queue
182 e2715f69 Michael Hanselmann
    try:
183 85f03e0d Michael Hanselmann
      try:
184 85f03e0d Michael Hanselmann
        count = len(job.ops)
185 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
186 85f03e0d Michael Hanselmann
          try:
187 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
188 85f03e0d Michael Hanselmann
189 85f03e0d Michael Hanselmann
            queue.acquire()
190 85f03e0d Michael Hanselmann
            try:
191 85f03e0d Michael Hanselmann
              job.run_op_index = idx
192 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_RUNNING
193 85f03e0d Michael Hanselmann
              op.result = None
194 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
195 85f03e0d Michael Hanselmann
196 38206f3c Iustin Pop
              input_opcode = op.input
197 85f03e0d Michael Hanselmann
            finally:
198 85f03e0d Michael Hanselmann
              queue.release()
199 85f03e0d Michael Hanselmann
200 dfe57c22 Michael Hanselmann
            def _Log(*args):
201 6c5a7090 Michael Hanselmann
              """Append a log entry.
202 6c5a7090 Michael Hanselmann

203 6c5a7090 Michael Hanselmann
              """
204 6c5a7090 Michael Hanselmann
              assert len(args) < 3
205 6c5a7090 Michael Hanselmann
206 6c5a7090 Michael Hanselmann
              if len(args) == 1:
207 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
208 6c5a7090 Michael Hanselmann
                log_msg = args[0]
209 6c5a7090 Michael Hanselmann
              else:
210 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
211 6c5a7090 Michael Hanselmann
212 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
213 6c5a7090 Michael Hanselmann
              # precision.
214 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
215 dfe57c22 Michael Hanselmann
216 6c5a7090 Michael Hanselmann
              queue.acquire()
217 dfe57c22 Michael Hanselmann
              try:
218 6c5a7090 Michael Hanselmann
                job.log_serial += 1
219 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
220 6c5a7090 Michael Hanselmann
221 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
222 dfe57c22 Michael Hanselmann
              finally:
223 6c5a7090 Michael Hanselmann
                queue.release()
224 dfe57c22 Michael Hanselmann
225 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
226 dfe57c22 Michael Hanselmann
            result = proc.ExecOpCode(input_opcode, _Log)
227 85f03e0d Michael Hanselmann
228 85f03e0d Michael Hanselmann
            queue.acquire()
229 85f03e0d Michael Hanselmann
            try:
230 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
231 85f03e0d Michael Hanselmann
              op.result = result
232 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
233 85f03e0d Michael Hanselmann
            finally:
234 85f03e0d Michael Hanselmann
              queue.release()
235 85f03e0d Michael Hanselmann
236 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
237 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
238 85f03e0d Michael Hanselmann
          except Exception, err:
239 85f03e0d Michael Hanselmann
            queue.acquire()
240 85f03e0d Michael Hanselmann
            try:
241 85f03e0d Michael Hanselmann
              try:
242 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
243 85f03e0d Michael Hanselmann
                op.result = str(err)
244 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
245 85f03e0d Michael Hanselmann
              finally:
246 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
247 85f03e0d Michael Hanselmann
            finally:
248 85f03e0d Michael Hanselmann
              queue.release()
249 85f03e0d Michael Hanselmann
            raise
250 85f03e0d Michael Hanselmann
251 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
252 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
253 85f03e0d Michael Hanselmann
      except:
254 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
255 e2715f69 Michael Hanselmann
    finally:
256 85f03e0d Michael Hanselmann
      queue.acquire()
257 85f03e0d Michael Hanselmann
      try:
258 65548ed5 Michael Hanselmann
        try:
259 65548ed5 Michael Hanselmann
          job.run_op_idx = -1
260 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
261 65548ed5 Michael Hanselmann
        finally:
262 65548ed5 Michael Hanselmann
          job_id = job.id
263 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
264 85f03e0d Michael Hanselmann
      finally:
265 85f03e0d Michael Hanselmann
        queue.release()
266 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
267 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
268 e2715f69 Michael Hanselmann
269 e2715f69 Michael Hanselmann
270 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
271 5bdce580 Michael Hanselmann
  def __init__(self, queue):
272 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
273 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
274 5bdce580 Michael Hanselmann
    self.queue = queue
275 e2715f69 Michael Hanselmann
276 e2715f69 Michael Hanselmann
277 85f03e0d Michael Hanselmann
class JobQueue(object):
278 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
279 f1da30e6 Michael Hanselmann
280 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
281 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
282 db37da70 Michael Hanselmann

283 db37da70 Michael Hanselmann
    This function should be used for all "public" functions. That is, functions
284 db37da70 Michael Hanselmann
    usually called from other classes.
285 db37da70 Michael Hanselmann

286 db37da70 Michael Hanselmann
    Important: Use this decorator only after utils.LockedMethod!
287 db37da70 Michael Hanselmann

288 db37da70 Michael Hanselmann
    Example:
289 db37da70 Michael Hanselmann
      @utils.LockedMethod
290 db37da70 Michael Hanselmann
      @_RequireOpenQueue
291 db37da70 Michael Hanselmann
      def Example(self):
292 db37da70 Michael Hanselmann
        pass
293 db37da70 Michael Hanselmann

294 db37da70 Michael Hanselmann
    """
295 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
296 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
297 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
298 db37da70 Michael Hanselmann
    return wrapper
299 db37da70 Michael Hanselmann
300 85f03e0d Michael Hanselmann
  def __init__(self, context):
301 5bdce580 Michael Hanselmann
    self.context = context
302 ac0930b9 Iustin Pop
    self._memcache = {}
303 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
304 f1da30e6 Michael Hanselmann
305 85f03e0d Michael Hanselmann
    # Locking
306 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
307 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
308 85f03e0d Michael Hanselmann
    self.release = self._lock.release
309 85f03e0d Michael Hanselmann
310 04ab05ce Michael Hanselmann
    # Initialize
311 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
312 f1da30e6 Michael Hanselmann
313 04ab05ce Michael Hanselmann
    # Read serial file
314 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
315 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
316 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
317 c4beba1c Iustin Pop
318 23752136 Michael Hanselmann
    # Get initial list of nodes
319 8e00939c Michael Hanselmann
    self._nodes = set(self.context.cfg.GetNodeList())
320 8e00939c Michael Hanselmann
321 8e00939c Michael Hanselmann
    # Remove master node
322 8e00939c Michael Hanselmann
    try:
323 8e00939c Michael Hanselmann
      self._nodes.remove(self._my_hostname)
324 8e00939c Michael Hanselmann
    except ValueError:
325 8e00939c Michael Hanselmann
      pass
326 23752136 Michael Hanselmann
327 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
328 23752136 Michael Hanselmann
329 85f03e0d Michael Hanselmann
    # Setup worker pool
330 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
331 85f03e0d Michael Hanselmann
332 85f03e0d Michael Hanselmann
    # We need to lock here because WorkerPool.AddTask() may start a job while
333 85f03e0d Michael Hanselmann
    # we're still doing our work.
334 85f03e0d Michael Hanselmann
    self.acquire()
335 85f03e0d Michael Hanselmann
    try:
336 85f03e0d Michael Hanselmann
      for job in self._GetJobsUnlocked(None):
337 85f03e0d Michael Hanselmann
        status = job.CalcStatus()
338 85f03e0d Michael Hanselmann
339 85f03e0d Michael Hanselmann
        if status in (constants.JOB_STATUS_QUEUED, ):
340 85f03e0d Michael Hanselmann
          self._wpool.AddTask(job)
341 85f03e0d Michael Hanselmann
342 85f03e0d Michael Hanselmann
        elif status in (constants.JOB_STATUS_RUNNING, ):
343 85f03e0d Michael Hanselmann
          logging.warning("Unfinished job %s found: %s", job.id, job)
344 85f03e0d Michael Hanselmann
          try:
345 85f03e0d Michael Hanselmann
            for op in job.ops:
346 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_ERROR
347 85f03e0d Michael Hanselmann
              op.result = "Unclean master daemon shutdown"
348 85f03e0d Michael Hanselmann
          finally:
349 85f03e0d Michael Hanselmann
            self.UpdateJobUnlocked(job)
350 85f03e0d Michael Hanselmann
    finally:
351 85f03e0d Michael Hanselmann
      self.release()
352 85f03e0d Michael Hanselmann
353 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
354 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
355 d2e03a33 Michael Hanselmann
  def AddNode(self, node_name):
356 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
357 23752136 Michael Hanselmann
358 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
359 9f774ee8 Michael Hanselmann
    rpc.call_jobqueue_purge(node_name)
360 23752136 Michael Hanselmann
361 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
362 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
363 23752136 Michael Hanselmann
364 d2e03a33 Michael Hanselmann
    # Upload current serial file
365 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
366 d2e03a33 Michael Hanselmann
367 d2e03a33 Michael Hanselmann
    for file_name in files:
368 9f774ee8 Michael Hanselmann
      # Read file content
369 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
370 9f774ee8 Michael Hanselmann
      try:
371 9f774ee8 Michael Hanselmann
        content = fd.read()
372 9f774ee8 Michael Hanselmann
      finally:
373 9f774ee8 Michael Hanselmann
        fd.close()
374 9f774ee8 Michael Hanselmann
375 9f774ee8 Michael Hanselmann
      result = rpc.call_jobqueue_update([node_name], file_name, content)
376 d2e03a33 Michael Hanselmann
      if not result[node_name]:
377 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
378 d2e03a33 Michael Hanselmann
379 d2e03a33 Michael Hanselmann
    self._nodes.add(node_name)
380 d2e03a33 Michael Hanselmann
381 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
382 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
383 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
384 23752136 Michael Hanselmann
    try:
385 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
386 d2e03a33 Michael Hanselmann
      self._nodes.remove(node_name)
387 d2e03a33 Michael Hanselmann
    except KeyError:
388 23752136 Michael Hanselmann
      pass
389 23752136 Michael Hanselmann
390 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
391 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
392 8e00939c Michael Hanselmann

393 8e00939c Michael Hanselmann
    """
394 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
395 8e00939c Michael Hanselmann
396 23752136 Michael Hanselmann
    failed_nodes = 0
397 9f774ee8 Michael Hanselmann
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
398 8e00939c Michael Hanselmann
    for node in self._nodes:
399 23752136 Michael Hanselmann
      if not result[node]:
400 23752136 Michael Hanselmann
        failed_nodes += 1
401 23752136 Michael Hanselmann
        logging.error("Copy of job queue file to node %s failed", node)
402 23752136 Michael Hanselmann
403 23752136 Michael Hanselmann
    # TODO: check failed_nodes
404 23752136 Michael Hanselmann
405 abc1f2ce Michael Hanselmann
  def _RenameFileUnlocked(self, old, new):
406 abc1f2ce Michael Hanselmann
    os.rename(old, new)
407 abc1f2ce Michael Hanselmann
408 abc1f2ce Michael Hanselmann
    result = rpc.call_jobqueue_rename(self._nodes, old, new)
409 abc1f2ce Michael Hanselmann
    for node in self._nodes:
410 abc1f2ce Michael Hanselmann
      if not result[node]:
411 abc1f2ce Michael Hanselmann
        logging.error("Moving %s to %s failed on %s", old, new, node)
412 abc1f2ce Michael Hanselmann
413 abc1f2ce Michael Hanselmann
    # TODO: check failed nodes
414 abc1f2ce Michael Hanselmann
415 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
416 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
417 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
418 85f03e0d Michael Hanselmann
    if job_id < 0:
419 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
420 85f03e0d Michael Hanselmann
421 85f03e0d Michael Hanselmann
    return str(job_id)
422 85f03e0d Michael Hanselmann
423 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
424 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
425 f1da30e6 Michael Hanselmann

426 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
427 f1da30e6 Michael Hanselmann

428 f1da30e6 Michael Hanselmann
    Returns: A string representing the job identifier.
429 f1da30e6 Michael Hanselmann

430 f1da30e6 Michael Hanselmann
    """
431 f1da30e6 Michael Hanselmann
    # New number
432 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
433 f1da30e6 Michael Hanselmann
434 f1da30e6 Michael Hanselmann
    # Write to file
435 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
436 23752136 Michael Hanselmann
                                        "%s\n" % serial)
437 f1da30e6 Michael Hanselmann
438 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
439 f1da30e6 Michael Hanselmann
    self._last_serial = serial
440 f1da30e6 Michael Hanselmann
441 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
442 f1da30e6 Michael Hanselmann
443 85f03e0d Michael Hanselmann
  @staticmethod
444 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
445 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
446 f1da30e6 Michael Hanselmann
447 85f03e0d Michael Hanselmann
  @staticmethod
448 85f03e0d Michael Hanselmann
  def _GetArchivedJobPath(job_id):
449 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
450 0cb94105 Michael Hanselmann
451 85f03e0d Michael Hanselmann
  @classmethod
452 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
453 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
454 fae737ac Michael Hanselmann
    if m:
455 fae737ac Michael Hanselmann
      return m.group(1)
456 fae737ac Michael Hanselmann
    else:
457 fae737ac Michael Hanselmann
      return None
458 fae737ac Michael Hanselmann
459 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
460 911a495b Iustin Pop
    """Return all known job IDs.
461 911a495b Iustin Pop

462 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
463 911a495b Iustin Pop
    included. Currently this argument is unused.
464 911a495b Iustin Pop

465 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
466 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
467 ac0930b9 Iustin Pop
    extra IDs).
468 ac0930b9 Iustin Pop

469 911a495b Iustin Pop
    """
470 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
471 f0d874fe Iustin Pop
    jlist.sort()
472 f0d874fe Iustin Pop
    return jlist
473 911a495b Iustin Pop
474 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
475 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
476 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
477 f1da30e6 Michael Hanselmann
478 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
479 ac0930b9 Iustin Pop
    if job_id in self._memcache:
480 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
481 ac0930b9 Iustin Pop
      return self._memcache[job_id]
482 ac0930b9 Iustin Pop
483 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
484 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
485 f1da30e6 Michael Hanselmann
    try:
486 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
487 f1da30e6 Michael Hanselmann
    except IOError, err:
488 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
489 f1da30e6 Michael Hanselmann
        return None
490 f1da30e6 Michael Hanselmann
      raise
491 f1da30e6 Michael Hanselmann
    try:
492 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
493 f1da30e6 Michael Hanselmann
    finally:
494 f1da30e6 Michael Hanselmann
      fd.close()
495 f1da30e6 Michael Hanselmann
496 ac0930b9 Iustin Pop
    job = _QueuedJob.Restore(self, data)
497 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
498 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
499 ac0930b9 Iustin Pop
    return job
500 f1da30e6 Michael Hanselmann
501 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
502 911a495b Iustin Pop
    if not job_ids:
503 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
504 f1da30e6 Michael Hanselmann
505 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
506 f1da30e6 Michael Hanselmann
507 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
508 db37da70 Michael Hanselmann
  @_RequireOpenQueue
509 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
510 85f03e0d Michael Hanselmann
    """Create and store a new job.
511 f1da30e6 Michael Hanselmann

512 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
513 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
514 c3f0a12f Iustin Pop

515 c3f0a12f Iustin Pop
    @type ops: list
516 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
517 c3f0a12f Iustin Pop

518 c3f0a12f Iustin Pop
    """
519 f1da30e6 Michael Hanselmann
    # Get job identifier
520 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
521 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
522 f1da30e6 Michael Hanselmann
523 f1da30e6 Michael Hanselmann
    # Write to disk
524 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
525 f1da30e6 Michael Hanselmann
526 205d71fd Michael Hanselmann
    logging.debug("Added new job %s to the cache", job_id)
527 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
528 ac0930b9 Iustin Pop
529 85f03e0d Michael Hanselmann
    # Add to worker pool
530 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
531 85f03e0d Michael Hanselmann
532 85f03e0d Michael Hanselmann
    return job.id
533 f1da30e6 Michael Hanselmann
534 db37da70 Michael Hanselmann
  @_RequireOpenQueue
535 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
536 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
537 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
538 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
539 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
540 57f8615f Michael Hanselmann
    self._CleanCacheUnlocked([job.id])
541 ac0930b9 Iustin Pop
542 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
543 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
544 dfe57c22 Michael Hanselmann
545 57f8615f Michael Hanselmann
  def _CleanCacheUnlocked(self, exclude):
546 ac0930b9 Iustin Pop
    """Clean the memory cache.
547 ac0930b9 Iustin Pop

548 ac0930b9 Iustin Pop
    The exceptions argument contains job IDs that should not be
549 ac0930b9 Iustin Pop
    cleaned.
550 ac0930b9 Iustin Pop

551 ac0930b9 Iustin Pop
    """
552 57f8615f Michael Hanselmann
    assert isinstance(exclude, list)
553 85f03e0d Michael Hanselmann
554 ac0930b9 Iustin Pop
    for job in self._memcache.values():
555 57f8615f Michael Hanselmann
      if job.id in exclude:
556 ac0930b9 Iustin Pop
        continue
557 85f03e0d Michael Hanselmann
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
558 85f03e0d Michael Hanselmann
                                  constants.JOB_STATUS_RUNNING):
559 205d71fd Michael Hanselmann
        logging.debug("Cleaning job %s from the cache", job.id)
560 ac0930b9 Iustin Pop
        try:
561 ac0930b9 Iustin Pop
          del self._memcache[job.id]
562 ac0930b9 Iustin Pop
        except KeyError:
563 ac0930b9 Iustin Pop
          pass
564 f1da30e6 Michael Hanselmann
565 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
566 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
567 6c5a7090 Michael Hanselmann
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
568 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
569 6c5a7090 Michael Hanselmann

570 6c5a7090 Michael Hanselmann
    @type job_id: string
571 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
572 6c5a7090 Michael Hanselmann
    @type fields: list of strings
573 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
574 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
575 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
576 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
577 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
578 6c5a7090 Michael Hanselmann

579 6c5a7090 Michael Hanselmann
    """
580 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
581 dfe57c22 Michael Hanselmann
582 dfe57c22 Michael Hanselmann
    while True:
583 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
584 6c5a7090 Michael Hanselmann
      if not job:
585 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
586 6c5a7090 Michael Hanselmann
        new_state = None
587 6c5a7090 Michael Hanselmann
        break
588 dfe57c22 Michael Hanselmann
589 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
590 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
591 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
592 dfe57c22 Michael Hanselmann
593 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
594 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
595 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
596 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
597 dfe57c22 Michael Hanselmann
      # significantly different.
598 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
599 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
600 dfe57c22 Michael Hanselmann
601 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
602 6c5a7090 Michael Hanselmann
                        constants.JOB_STATUS_RUNNING):
603 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
604 6c5a7090 Michael Hanselmann
        # no changes.
605 dfe57c22 Michael Hanselmann
        break
606 dfe57c22 Michael Hanselmann
607 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
608 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
609 6c5a7090 Michael Hanselmann
        break
610 6c5a7090 Michael Hanselmann
611 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
612 6c5a7090 Michael Hanselmann
613 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
614 6c5a7090 Michael Hanselmann
      job.change.wait()
615 dfe57c22 Michael Hanselmann
616 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
617 dfe57c22 Michael Hanselmann
618 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
619 dfe57c22 Michael Hanselmann
620 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
621 db37da70 Michael Hanselmann
  @_RequireOpenQueue
622 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
623 188c5e0a Michael Hanselmann
    """Cancels a job.
624 188c5e0a Michael Hanselmann

625 188c5e0a Michael Hanselmann
    @type job_id: string
626 188c5e0a Michael Hanselmann
    @param job_id: Job ID of job to be cancelled.
627 188c5e0a Michael Hanselmann

628 188c5e0a Michael Hanselmann
    """
629 188c5e0a Michael Hanselmann
    logging.debug("Cancelling job %s", job_id)
630 188c5e0a Michael Hanselmann
631 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
632 188c5e0a Michael Hanselmann
    if not job:
633 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
634 188c5e0a Michael Hanselmann
      return
635 188c5e0a Michael Hanselmann
636 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
637 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
638 188c5e0a Michael Hanselmann
      return
639 188c5e0a Michael Hanselmann
640 85f03e0d Michael Hanselmann
    try:
641 85f03e0d Michael Hanselmann
      for op in job.ops:
642 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
643 85f03e0d Michael Hanselmann
        op.result = "Job cancelled by request"
644 85f03e0d Michael Hanselmann
    finally:
645 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
646 188c5e0a Michael Hanselmann
647 c609f802 Michael Hanselmann
  @utils.LockedMethod
648 db37da70 Michael Hanselmann
  @_RequireOpenQueue
649 f1da30e6 Michael Hanselmann
  def ArchiveJob(self, job_id):
650 c609f802 Michael Hanselmann
    """Archives a job.
651 c609f802 Michael Hanselmann

652 c609f802 Michael Hanselmann
    @type job_id: string
653 c609f802 Michael Hanselmann
    @param job_id: Job ID of job to be archived.
654 c609f802 Michael Hanselmann

655 c609f802 Michael Hanselmann
    """
656 c609f802 Michael Hanselmann
    logging.debug("Archiving job %s", job_id)
657 c609f802 Michael Hanselmann
658 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
659 c609f802 Michael Hanselmann
    if not job:
660 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
661 c609f802 Michael Hanselmann
      return
662 c609f802 Michael Hanselmann
663 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
664 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_SUCCESS,
665 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_ERROR):
666 85f03e0d Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
667 c609f802 Michael Hanselmann
      return
668 c609f802 Michael Hanselmann
669 c609f802 Michael Hanselmann
    try:
670 c609f802 Michael Hanselmann
      old = self._GetJobPath(job.id)
671 c609f802 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
672 c609f802 Michael Hanselmann
673 abc1f2ce Michael Hanselmann
      self._RenameFileUnlocked(old, new)
674 c609f802 Michael Hanselmann
675 c609f802 Michael Hanselmann
      logging.debug("Successfully archived job %s", job.id)
676 c609f802 Michael Hanselmann
    finally:
677 c609f802 Michael Hanselmann
      # Cleaning the cache because we don't know what os.rename actually did
678 c609f802 Michael Hanselmann
      # and to be on the safe side.
679 c609f802 Michael Hanselmann
      self._CleanCacheUnlocked([])
680 f1da30e6 Michael Hanselmann
681 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
682 e2715f69 Michael Hanselmann
    row = []
683 e2715f69 Michael Hanselmann
    for fname in fields:
684 e2715f69 Michael Hanselmann
      if fname == "id":
685 e2715f69 Michael Hanselmann
        row.append(job.id)
686 e2715f69 Michael Hanselmann
      elif fname == "status":
687 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
688 af30b2fd Michael Hanselmann
      elif fname == "ops":
689 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
690 af30b2fd Michael Hanselmann
      elif fname == "opresult":
691 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
692 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
693 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
694 e2715f69 Michael Hanselmann
      else:
695 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
696 e2715f69 Michael Hanselmann
    return row
697 e2715f69 Michael Hanselmann
698 85f03e0d Michael Hanselmann
  @utils.LockedMethod
699 db37da70 Michael Hanselmann
  @_RequireOpenQueue
700 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
701 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
702 e2715f69 Michael Hanselmann

703 e2715f69 Michael Hanselmann
    Args:
704 e2715f69 Michael Hanselmann
    - job_ids: Sequence of job identifiers or None for all
705 e2715f69 Michael Hanselmann
    - fields: Names of fields to return
706 e2715f69 Michael Hanselmann

707 e2715f69 Michael Hanselmann
    """
708 85f03e0d Michael Hanselmann
    jobs = []
709 e2715f69 Michael Hanselmann
710 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
711 85f03e0d Michael Hanselmann
      if job is None:
712 85f03e0d Michael Hanselmann
        jobs.append(None)
713 85f03e0d Michael Hanselmann
      else:
714 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
715 e2715f69 Michael Hanselmann
716 85f03e0d Michael Hanselmann
    return jobs
717 e2715f69 Michael Hanselmann
718 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
719 db37da70 Michael Hanselmann
  @_RequireOpenQueue
720 e2715f69 Michael Hanselmann
  def Shutdown(self):
721 e2715f69 Michael Hanselmann
    """Stops the job queue.
722 e2715f69 Michael Hanselmann

723 e2715f69 Michael Hanselmann
    """
724 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
725 85f03e0d Michael Hanselmann
726 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
727 04ab05ce Michael Hanselmann
    self._queue_lock = None