Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ a1578d63

History | View | Annotate | Download (20 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 6c5a7090 Michael Hanselmann
Locking:
25 6c5a7090 Michael Hanselmann
There's a single, large lock in the JobQueue class. It's used by all other
26 6c5a7090 Michael Hanselmann
classes in this module.
27 6c5a7090 Michael Hanselmann

28 6c5a7090 Michael Hanselmann
"""
29 498ae1cc Iustin Pop
30 f1da30e6 Michael Hanselmann
import os
31 e2715f69 Michael Hanselmann
import logging
32 e2715f69 Michael Hanselmann
import threading
33 f1da30e6 Michael Hanselmann
import errno
34 f1da30e6 Michael Hanselmann
import re
35 f1048938 Iustin Pop
import time
36 5685c1a5 Michael Hanselmann
import weakref
37 498ae1cc Iustin Pop
38 e2715f69 Michael Hanselmann
from ganeti import constants
39 f1da30e6 Michael Hanselmann
from ganeti import serializer
40 e2715f69 Michael Hanselmann
from ganeti import workerpool
41 f1da30e6 Michael Hanselmann
from ganeti import opcodes
42 7a1ecaed Iustin Pop
from ganeti import errors
43 e2715f69 Michael Hanselmann
from ganeti import mcpu
44 7996a135 Iustin Pop
from ganeti import utils
45 04ab05ce Michael Hanselmann
from ganeti import jstore
46 c3f0a12f Iustin Pop
from ganeti import rpc
47 e2715f69 Michael Hanselmann
48 e2715f69 Michael Hanselmann
49 e2715f69 Michael Hanselmann
JOBQUEUE_THREADS = 5
50 e2715f69 Michael Hanselmann
51 498ae1cc Iustin Pop
52 70552c46 Michael Hanselmann
def TimeStampNow():
53 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
54 70552c46 Michael Hanselmann
55 70552c46 Michael Hanselmann
56 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
57 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
58 e2715f69 Michael Hanselmann

59 f1048938 Iustin Pop
  The 'log' attribute holds the execution log and consists of tuples
60 6c5a7090 Michael Hanselmann
  of the form (log_serial, timestamp, level, message).
61 f1048938 Iustin Pop

62 e2715f69 Michael Hanselmann
  """
63 85f03e0d Michael Hanselmann
  def __init__(self, op):
64 85f03e0d Michael Hanselmann
    self.input = op
65 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
66 85f03e0d Michael Hanselmann
    self.result = None
67 85f03e0d Michael Hanselmann
    self.log = []
68 70552c46 Michael Hanselmann
    self.start_timestamp = None
69 70552c46 Michael Hanselmann
    self.end_timestamp = None
70 f1da30e6 Michael Hanselmann
71 f1da30e6 Michael Hanselmann
  @classmethod
72 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
73 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
74 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
75 85f03e0d Michael Hanselmann
    obj.status = state["status"]
76 85f03e0d Michael Hanselmann
    obj.result = state["result"]
77 85f03e0d Michael Hanselmann
    obj.log = state["log"]
78 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
79 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
80 f1da30e6 Michael Hanselmann
    return obj
81 f1da30e6 Michael Hanselmann
82 f1da30e6 Michael Hanselmann
  def Serialize(self):
83 6c5a7090 Michael Hanselmann
    return {
84 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
85 6c5a7090 Michael Hanselmann
      "status": self.status,
86 6c5a7090 Michael Hanselmann
      "result": self.result,
87 6c5a7090 Michael Hanselmann
      "log": self.log,
88 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
89 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
90 6c5a7090 Michael Hanselmann
      }
91 f1048938 Iustin Pop
92 e2715f69 Michael Hanselmann
93 e2715f69 Michael Hanselmann
class _QueuedJob(object):
94 e2715f69 Michael Hanselmann
  """In-memory job representation.
95 e2715f69 Michael Hanselmann

96 6c5a7090 Michael Hanselmann
  This is what we use to track the user-submitted jobs. Locking must be taken
97 6c5a7090 Michael Hanselmann
  care of by users of this class.
98 e2715f69 Michael Hanselmann

99 e2715f69 Michael Hanselmann
  """
100 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
101 e2715f69 Michael Hanselmann
    if not ops:
102 e2715f69 Michael Hanselmann
      # TODO
103 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
104 e2715f69 Michael Hanselmann
105 85f03e0d Michael Hanselmann
    self.queue = queue
106 f1da30e6 Michael Hanselmann
    self.id = job_id
107 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
108 85f03e0d Michael Hanselmann
    self.run_op_index = -1
109 6c5a7090 Michael Hanselmann
    self.log_serial = 0
110 6c5a7090 Michael Hanselmann
111 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
112 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
113 f1da30e6 Michael Hanselmann
114 f1da30e6 Michael Hanselmann
  @classmethod
115 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
116 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
117 85f03e0d Michael Hanselmann
    obj.queue = queue
118 85f03e0d Michael Hanselmann
    obj.id = state["id"]
119 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
120 6c5a7090 Michael Hanselmann
121 6c5a7090 Michael Hanselmann
    obj.ops = []
122 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
123 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
124 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
125 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
126 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
127 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
128 6c5a7090 Michael Hanselmann
129 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
130 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
131 6c5a7090 Michael Hanselmann
132 f1da30e6 Michael Hanselmann
    return obj
133 f1da30e6 Michael Hanselmann
134 f1da30e6 Michael Hanselmann
  def Serialize(self):
135 f1da30e6 Michael Hanselmann
    return {
136 f1da30e6 Michael Hanselmann
      "id": self.id,
137 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
138 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
139 f1da30e6 Michael Hanselmann
      }
140 f1da30e6 Michael Hanselmann
141 85f03e0d Michael Hanselmann
  def CalcStatus(self):
142 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
143 e2715f69 Michael Hanselmann
144 e2715f69 Michael Hanselmann
    all_success = True
145 85f03e0d Michael Hanselmann
    for op in self.ops:
146 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
147 e2715f69 Michael Hanselmann
        continue
148 e2715f69 Michael Hanselmann
149 e2715f69 Michael Hanselmann
      all_success = False
150 e2715f69 Michael Hanselmann
151 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
152 e2715f69 Michael Hanselmann
        pass
153 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
154 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
155 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
156 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
157 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
158 f1da30e6 Michael Hanselmann
        break
159 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
160 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
161 4cb1d919 Michael Hanselmann
        break
162 e2715f69 Michael Hanselmann
163 e2715f69 Michael Hanselmann
    if all_success:
164 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
165 e2715f69 Michael Hanselmann
166 e2715f69 Michael Hanselmann
    return status
167 e2715f69 Michael Hanselmann
168 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
169 6c5a7090 Michael Hanselmann
    if newer_than is None:
170 6c5a7090 Michael Hanselmann
      serial = -1
171 6c5a7090 Michael Hanselmann
    else:
172 6c5a7090 Michael Hanselmann
      serial = newer_than
173 6c5a7090 Michael Hanselmann
174 6c5a7090 Michael Hanselmann
    entries = []
175 6c5a7090 Michael Hanselmann
    for op in self.ops:
176 6c5a7090 Michael Hanselmann
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
177 6c5a7090 Michael Hanselmann
178 6c5a7090 Michael Hanselmann
    return entries
179 6c5a7090 Michael Hanselmann
180 f1048938 Iustin Pop
181 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
182 85f03e0d Michael Hanselmann
  def RunTask(self, job):
183 e2715f69 Michael Hanselmann
    """Job executor.
184 e2715f69 Michael Hanselmann

185 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
186 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
187 e2715f69 Michael Hanselmann

188 e2715f69 Michael Hanselmann
    """
189 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
190 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
191 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
192 85f03e0d Michael Hanselmann
    queue = job.queue
193 e2715f69 Michael Hanselmann
    try:
194 85f03e0d Michael Hanselmann
      try:
195 85f03e0d Michael Hanselmann
        count = len(job.ops)
196 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
197 85f03e0d Michael Hanselmann
          try:
198 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
199 85f03e0d Michael Hanselmann
200 85f03e0d Michael Hanselmann
            queue.acquire()
201 85f03e0d Michael Hanselmann
            try:
202 85f03e0d Michael Hanselmann
              job.run_op_index = idx
203 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_RUNNING
204 85f03e0d Michael Hanselmann
              op.result = None
205 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
206 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
207 85f03e0d Michael Hanselmann
208 38206f3c Iustin Pop
              input_opcode = op.input
209 85f03e0d Michael Hanselmann
            finally:
210 85f03e0d Michael Hanselmann
              queue.release()
211 85f03e0d Michael Hanselmann
212 dfe57c22 Michael Hanselmann
            def _Log(*args):
213 6c5a7090 Michael Hanselmann
              """Append a log entry.
214 6c5a7090 Michael Hanselmann

215 6c5a7090 Michael Hanselmann
              """
216 6c5a7090 Michael Hanselmann
              assert len(args) < 3
217 6c5a7090 Michael Hanselmann
218 6c5a7090 Michael Hanselmann
              if len(args) == 1:
219 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
220 6c5a7090 Michael Hanselmann
                log_msg = args[0]
221 6c5a7090 Michael Hanselmann
              else:
222 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
223 6c5a7090 Michael Hanselmann
224 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
225 6c5a7090 Michael Hanselmann
              # precision.
226 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
227 dfe57c22 Michael Hanselmann
228 6c5a7090 Michael Hanselmann
              queue.acquire()
229 dfe57c22 Michael Hanselmann
              try:
230 6c5a7090 Michael Hanselmann
                job.log_serial += 1
231 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
232 6c5a7090 Michael Hanselmann
233 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
234 dfe57c22 Michael Hanselmann
              finally:
235 6c5a7090 Michael Hanselmann
                queue.release()
236 dfe57c22 Michael Hanselmann
237 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
238 dfe57c22 Michael Hanselmann
            result = proc.ExecOpCode(input_opcode, _Log)
239 85f03e0d Michael Hanselmann
240 85f03e0d Michael Hanselmann
            queue.acquire()
241 85f03e0d Michael Hanselmann
            try:
242 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
243 85f03e0d Michael Hanselmann
              op.result = result
244 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
245 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
246 85f03e0d Michael Hanselmann
            finally:
247 85f03e0d Michael Hanselmann
              queue.release()
248 85f03e0d Michael Hanselmann
249 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
250 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
251 85f03e0d Michael Hanselmann
          except Exception, err:
252 85f03e0d Michael Hanselmann
            queue.acquire()
253 85f03e0d Michael Hanselmann
            try:
254 85f03e0d Michael Hanselmann
              try:
255 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
256 85f03e0d Michael Hanselmann
                op.result = str(err)
257 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
258 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
259 85f03e0d Michael Hanselmann
              finally:
260 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
261 85f03e0d Michael Hanselmann
            finally:
262 85f03e0d Michael Hanselmann
              queue.release()
263 85f03e0d Michael Hanselmann
            raise
264 85f03e0d Michael Hanselmann
265 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
266 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
267 85f03e0d Michael Hanselmann
      except:
268 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
269 e2715f69 Michael Hanselmann
    finally:
270 85f03e0d Michael Hanselmann
      queue.acquire()
271 85f03e0d Michael Hanselmann
      try:
272 65548ed5 Michael Hanselmann
        try:
273 65548ed5 Michael Hanselmann
          job.run_op_idx = -1
274 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
275 65548ed5 Michael Hanselmann
        finally:
276 65548ed5 Michael Hanselmann
          job_id = job.id
277 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
278 85f03e0d Michael Hanselmann
      finally:
279 85f03e0d Michael Hanselmann
        queue.release()
280 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
281 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
282 e2715f69 Michael Hanselmann
283 e2715f69 Michael Hanselmann
284 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
285 5bdce580 Michael Hanselmann
  def __init__(self, queue):
286 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
287 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
288 5bdce580 Michael Hanselmann
    self.queue = queue
289 e2715f69 Michael Hanselmann
290 e2715f69 Michael Hanselmann
291 85f03e0d Michael Hanselmann
class JobQueue(object):
292 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
293 f1da30e6 Michael Hanselmann
294 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
295 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
296 db37da70 Michael Hanselmann

297 db37da70 Michael Hanselmann
    This function should be used for all "public" functions. That is, functions
298 db37da70 Michael Hanselmann
    usually called from other classes.
299 db37da70 Michael Hanselmann

300 db37da70 Michael Hanselmann
    Important: Use this decorator only after utils.LockedMethod!
301 db37da70 Michael Hanselmann

302 db37da70 Michael Hanselmann
    Example:
303 db37da70 Michael Hanselmann
      @utils.LockedMethod
304 db37da70 Michael Hanselmann
      @_RequireOpenQueue
305 db37da70 Michael Hanselmann
      def Example(self):
306 db37da70 Michael Hanselmann
        pass
307 db37da70 Michael Hanselmann

308 db37da70 Michael Hanselmann
    """
309 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
310 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
311 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
312 db37da70 Michael Hanselmann
    return wrapper
313 db37da70 Michael Hanselmann
314 85f03e0d Michael Hanselmann
  def __init__(self, context):
315 5bdce580 Michael Hanselmann
    self.context = context
316 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
317 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
318 f1da30e6 Michael Hanselmann
319 85f03e0d Michael Hanselmann
    # Locking
320 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
321 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
322 85f03e0d Michael Hanselmann
    self.release = self._lock.release
323 85f03e0d Michael Hanselmann
324 04ab05ce Michael Hanselmann
    # Initialize
325 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
326 f1da30e6 Michael Hanselmann
327 04ab05ce Michael Hanselmann
    # Read serial file
328 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
329 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
330 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
331 c4beba1c Iustin Pop
332 23752136 Michael Hanselmann
    # Get initial list of nodes
333 8e00939c Michael Hanselmann
    self._nodes = set(self.context.cfg.GetNodeList())
334 8e00939c Michael Hanselmann
335 8e00939c Michael Hanselmann
    # Remove master node
336 8e00939c Michael Hanselmann
    try:
337 8e00939c Michael Hanselmann
      self._nodes.remove(self._my_hostname)
338 8e00939c Michael Hanselmann
    except ValueError:
339 8e00939c Michael Hanselmann
      pass
340 23752136 Michael Hanselmann
341 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
342 23752136 Michael Hanselmann
343 85f03e0d Michael Hanselmann
    # Setup worker pool
344 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
345 85f03e0d Michael Hanselmann
346 85f03e0d Michael Hanselmann
    # We need to lock here because WorkerPool.AddTask() may start a job while
347 85f03e0d Michael Hanselmann
    # we're still doing our work.
348 85f03e0d Michael Hanselmann
    self.acquire()
349 85f03e0d Michael Hanselmann
    try:
350 85f03e0d Michael Hanselmann
      for job in self._GetJobsUnlocked(None):
351 85f03e0d Michael Hanselmann
        status = job.CalcStatus()
352 85f03e0d Michael Hanselmann
353 85f03e0d Michael Hanselmann
        if status in (constants.JOB_STATUS_QUEUED, ):
354 85f03e0d Michael Hanselmann
          self._wpool.AddTask(job)
355 85f03e0d Michael Hanselmann
356 85f03e0d Michael Hanselmann
        elif status in (constants.JOB_STATUS_RUNNING, ):
357 85f03e0d Michael Hanselmann
          logging.warning("Unfinished job %s found: %s", job.id, job)
358 85f03e0d Michael Hanselmann
          try:
359 85f03e0d Michael Hanselmann
            for op in job.ops:
360 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_ERROR
361 85f03e0d Michael Hanselmann
              op.result = "Unclean master daemon shutdown"
362 85f03e0d Michael Hanselmann
          finally:
363 85f03e0d Michael Hanselmann
            self.UpdateJobUnlocked(job)
364 85f03e0d Michael Hanselmann
    finally:
365 85f03e0d Michael Hanselmann
      self.release()
366 85f03e0d Michael Hanselmann
367 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
368 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
369 d2e03a33 Michael Hanselmann
  def AddNode(self, node_name):
370 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
371 23752136 Michael Hanselmann
372 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
373 9f774ee8 Michael Hanselmann
    rpc.call_jobqueue_purge(node_name)
374 23752136 Michael Hanselmann
375 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
376 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
377 23752136 Michael Hanselmann
378 d2e03a33 Michael Hanselmann
    # Upload current serial file
379 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
380 d2e03a33 Michael Hanselmann
381 d2e03a33 Michael Hanselmann
    for file_name in files:
382 9f774ee8 Michael Hanselmann
      # Read file content
383 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
384 9f774ee8 Michael Hanselmann
      try:
385 9f774ee8 Michael Hanselmann
        content = fd.read()
386 9f774ee8 Michael Hanselmann
      finally:
387 9f774ee8 Michael Hanselmann
        fd.close()
388 9f774ee8 Michael Hanselmann
389 9f774ee8 Michael Hanselmann
      result = rpc.call_jobqueue_update([node_name], file_name, content)
390 d2e03a33 Michael Hanselmann
      if not result[node_name]:
391 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
392 d2e03a33 Michael Hanselmann
393 d2e03a33 Michael Hanselmann
    self._nodes.add(node_name)
394 d2e03a33 Michael Hanselmann
395 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
396 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
397 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
398 23752136 Michael Hanselmann
    try:
399 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
400 d2e03a33 Michael Hanselmann
      self._nodes.remove(node_name)
401 d2e03a33 Michael Hanselmann
    except KeyError:
402 23752136 Michael Hanselmann
      pass
403 23752136 Michael Hanselmann
404 e74798c1 Michael Hanselmann
  def _CheckRpcResult(self, result, nodes, failmsg):
405 e74798c1 Michael Hanselmann
    failed = []
406 e74798c1 Michael Hanselmann
    success = []
407 e74798c1 Michael Hanselmann
408 e74798c1 Michael Hanselmann
    for node in nodes:
409 e74798c1 Michael Hanselmann
      if result[node]:
410 e74798c1 Michael Hanselmann
        success.append(node)
411 e74798c1 Michael Hanselmann
      else:
412 e74798c1 Michael Hanselmann
        failed.append(node)
413 e74798c1 Michael Hanselmann
414 e74798c1 Michael Hanselmann
    if failed:
415 e74798c1 Michael Hanselmann
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
416 e74798c1 Michael Hanselmann
417 e74798c1 Michael Hanselmann
    # +1 for the master node
418 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
419 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
420 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
421 e74798c1 Michael Hanselmann
422 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
423 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
424 8e00939c Michael Hanselmann

425 8e00939c Michael Hanselmann
    """
426 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
427 8e00939c Michael Hanselmann
428 9f774ee8 Michael Hanselmann
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
429 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
430 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
431 23752136 Michael Hanselmann
432 abc1f2ce Michael Hanselmann
  def _RenameFileUnlocked(self, old, new):
433 abc1f2ce Michael Hanselmann
    os.rename(old, new)
434 abc1f2ce Michael Hanselmann
435 abc1f2ce Michael Hanselmann
    result = rpc.call_jobqueue_rename(self._nodes, old, new)
436 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
437 e74798c1 Michael Hanselmann
                         "Moving %s to %s" % (old, new))
438 abc1f2ce Michael Hanselmann
439 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
440 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
441 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
442 85f03e0d Michael Hanselmann
    if job_id < 0:
443 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
444 85f03e0d Michael Hanselmann
445 85f03e0d Michael Hanselmann
    return str(job_id)
446 85f03e0d Michael Hanselmann
447 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
448 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
449 f1da30e6 Michael Hanselmann

450 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
451 f1da30e6 Michael Hanselmann

452 f1da30e6 Michael Hanselmann
    Returns: A string representing the job identifier.
453 f1da30e6 Michael Hanselmann

454 f1da30e6 Michael Hanselmann
    """
455 f1da30e6 Michael Hanselmann
    # New number
456 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
457 f1da30e6 Michael Hanselmann
458 f1da30e6 Michael Hanselmann
    # Write to file
459 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
460 23752136 Michael Hanselmann
                                        "%s\n" % serial)
461 f1da30e6 Michael Hanselmann
462 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
463 f1da30e6 Michael Hanselmann
    self._last_serial = serial
464 f1da30e6 Michael Hanselmann
465 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
466 f1da30e6 Michael Hanselmann
467 85f03e0d Michael Hanselmann
  @staticmethod
468 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
469 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
470 f1da30e6 Michael Hanselmann
471 85f03e0d Michael Hanselmann
  @staticmethod
472 85f03e0d Michael Hanselmann
  def _GetArchivedJobPath(job_id):
473 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
474 0cb94105 Michael Hanselmann
475 85f03e0d Michael Hanselmann
  @classmethod
476 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
477 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
478 fae737ac Michael Hanselmann
    if m:
479 fae737ac Michael Hanselmann
      return m.group(1)
480 fae737ac Michael Hanselmann
    else:
481 fae737ac Michael Hanselmann
      return None
482 fae737ac Michael Hanselmann
483 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
484 911a495b Iustin Pop
    """Return all known job IDs.
485 911a495b Iustin Pop

486 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
487 911a495b Iustin Pop
    included. Currently this argument is unused.
488 911a495b Iustin Pop

489 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
490 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
491 ac0930b9 Iustin Pop
    extra IDs).
492 ac0930b9 Iustin Pop

493 911a495b Iustin Pop
    """
494 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
495 f0d874fe Iustin Pop
    jlist.sort()
496 f0d874fe Iustin Pop
    return jlist
497 911a495b Iustin Pop
498 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
499 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
500 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
501 f1da30e6 Michael Hanselmann
502 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
503 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
504 5685c1a5 Michael Hanselmann
    if job:
505 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
506 5685c1a5 Michael Hanselmann
      return job
507 ac0930b9 Iustin Pop
508 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
509 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
510 f1da30e6 Michael Hanselmann
    try:
511 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
512 f1da30e6 Michael Hanselmann
    except IOError, err:
513 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
514 f1da30e6 Michael Hanselmann
        return None
515 f1da30e6 Michael Hanselmann
      raise
516 f1da30e6 Michael Hanselmann
    try:
517 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
518 f1da30e6 Michael Hanselmann
    finally:
519 f1da30e6 Michael Hanselmann
      fd.close()
520 f1da30e6 Michael Hanselmann
521 ac0930b9 Iustin Pop
    job = _QueuedJob.Restore(self, data)
522 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
523 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
524 ac0930b9 Iustin Pop
    return job
525 f1da30e6 Michael Hanselmann
526 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
527 911a495b Iustin Pop
    if not job_ids:
528 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
529 f1da30e6 Michael Hanselmann
530 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
531 f1da30e6 Michael Hanselmann
532 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
533 db37da70 Michael Hanselmann
  @_RequireOpenQueue
534 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
535 85f03e0d Michael Hanselmann
    """Create and store a new job.
536 f1da30e6 Michael Hanselmann

537 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
538 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
539 c3f0a12f Iustin Pop

540 c3f0a12f Iustin Pop
    @type ops: list
541 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
542 c3f0a12f Iustin Pop

543 c3f0a12f Iustin Pop
    """
544 f1da30e6 Michael Hanselmann
    # Get job identifier
545 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
546 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
547 f1da30e6 Michael Hanselmann
548 f1da30e6 Michael Hanselmann
    # Write to disk
549 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
550 f1da30e6 Michael Hanselmann
551 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
552 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
553 ac0930b9 Iustin Pop
554 85f03e0d Michael Hanselmann
    # Add to worker pool
555 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
556 85f03e0d Michael Hanselmann
557 85f03e0d Michael Hanselmann
    return job.id
558 f1da30e6 Michael Hanselmann
559 db37da70 Michael Hanselmann
  @_RequireOpenQueue
560 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
561 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
562 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
563 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
564 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
565 ac0930b9 Iustin Pop
566 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
567 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
568 dfe57c22 Michael Hanselmann
569 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
570 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
571 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
572 5c735209 Iustin Pop
                        timeout):
573 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
574 6c5a7090 Michael Hanselmann

575 6c5a7090 Michael Hanselmann
    @type job_id: string
576 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
577 6c5a7090 Michael Hanselmann
    @type fields: list of strings
578 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
579 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
580 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
581 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
582 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
583 5c735209 Iustin Pop
    @type timeout: float
584 5c735209 Iustin Pop
    @param timeout: maximum time to wait
585 6c5a7090 Michael Hanselmann

586 6c5a7090 Michael Hanselmann
    """
587 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
588 5c735209 Iustin Pop
    end_time = time.time() + timeout
589 dfe57c22 Michael Hanselmann
    while True:
590 5c735209 Iustin Pop
      delta_time = end_time - time.time()
591 5c735209 Iustin Pop
      if delta_time < 0:
592 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
593 5c735209 Iustin Pop
594 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
595 6c5a7090 Michael Hanselmann
      if not job:
596 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
597 6c5a7090 Michael Hanselmann
        break
598 dfe57c22 Michael Hanselmann
599 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
600 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
601 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
602 dfe57c22 Michael Hanselmann
603 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
604 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
605 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
606 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
607 dfe57c22 Michael Hanselmann
      # significantly different.
608 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
609 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
610 dfe57c22 Michael Hanselmann
611 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
612 6c5a7090 Michael Hanselmann
                        constants.JOB_STATUS_RUNNING):
613 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
614 6c5a7090 Michael Hanselmann
        # no changes.
615 dfe57c22 Michael Hanselmann
        break
616 dfe57c22 Michael Hanselmann
617 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
618 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
619 6c5a7090 Michael Hanselmann
        break
620 6c5a7090 Michael Hanselmann
621 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
622 6c5a7090 Michael Hanselmann
623 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
624 5c735209 Iustin Pop
      job.change.wait(delta_time)
625 dfe57c22 Michael Hanselmann
626 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
627 dfe57c22 Michael Hanselmann
628 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
629 dfe57c22 Michael Hanselmann
630 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
631 db37da70 Michael Hanselmann
  @_RequireOpenQueue
632 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
633 188c5e0a Michael Hanselmann
    """Cancels a job.
634 188c5e0a Michael Hanselmann

635 188c5e0a Michael Hanselmann
    @type job_id: string
636 188c5e0a Michael Hanselmann
    @param job_id: Job ID of job to be cancelled.
637 188c5e0a Michael Hanselmann

638 188c5e0a Michael Hanselmann
    """
639 188c5e0a Michael Hanselmann
    logging.debug("Cancelling job %s", job_id)
640 188c5e0a Michael Hanselmann
641 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
642 188c5e0a Michael Hanselmann
    if not job:
643 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
644 188c5e0a Michael Hanselmann
      return
645 188c5e0a Michael Hanselmann
646 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
647 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
648 188c5e0a Michael Hanselmann
      return
649 188c5e0a Michael Hanselmann
650 85f03e0d Michael Hanselmann
    try:
651 85f03e0d Michael Hanselmann
      for op in job.ops:
652 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
653 85f03e0d Michael Hanselmann
        op.result = "Job cancelled by request"
654 85f03e0d Michael Hanselmann
    finally:
655 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
656 188c5e0a Michael Hanselmann
657 c609f802 Michael Hanselmann
  @utils.LockedMethod
658 db37da70 Michael Hanselmann
  @_RequireOpenQueue
659 f1da30e6 Michael Hanselmann
  def ArchiveJob(self, job_id):
660 c609f802 Michael Hanselmann
    """Archives a job.
661 c609f802 Michael Hanselmann

662 c609f802 Michael Hanselmann
    @type job_id: string
663 c609f802 Michael Hanselmann
    @param job_id: Job ID of job to be archived.
664 c609f802 Michael Hanselmann

665 c609f802 Michael Hanselmann
    """
666 c609f802 Michael Hanselmann
    logging.debug("Archiving job %s", job_id)
667 c609f802 Michael Hanselmann
668 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
669 c609f802 Michael Hanselmann
    if not job:
670 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
671 c609f802 Michael Hanselmann
      return
672 c609f802 Michael Hanselmann
673 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
674 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_SUCCESS,
675 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_ERROR):
676 85f03e0d Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
677 c609f802 Michael Hanselmann
      return
678 c609f802 Michael Hanselmann
679 5685c1a5 Michael Hanselmann
    old = self._GetJobPath(job.id)
680 5685c1a5 Michael Hanselmann
    new = self._GetArchivedJobPath(job.id)
681 c609f802 Michael Hanselmann
682 5685c1a5 Michael Hanselmann
    self._RenameFileUnlocked(old, new)
683 c609f802 Michael Hanselmann
684 5685c1a5 Michael Hanselmann
    logging.debug("Successfully archived job %s", job.id)
685 f1da30e6 Michael Hanselmann
686 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
687 e2715f69 Michael Hanselmann
    row = []
688 e2715f69 Michael Hanselmann
    for fname in fields:
689 e2715f69 Michael Hanselmann
      if fname == "id":
690 e2715f69 Michael Hanselmann
        row.append(job.id)
691 e2715f69 Michael Hanselmann
      elif fname == "status":
692 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
693 af30b2fd Michael Hanselmann
      elif fname == "ops":
694 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
695 af30b2fd Michael Hanselmann
      elif fname == "opresult":
696 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
697 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
698 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
699 e2715f69 Michael Hanselmann
      else:
700 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
701 e2715f69 Michael Hanselmann
    return row
702 e2715f69 Michael Hanselmann
703 85f03e0d Michael Hanselmann
  @utils.LockedMethod
704 db37da70 Michael Hanselmann
  @_RequireOpenQueue
705 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
706 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
707 e2715f69 Michael Hanselmann

708 e2715f69 Michael Hanselmann
    Args:
709 e2715f69 Michael Hanselmann
    - job_ids: Sequence of job identifiers or None for all
710 e2715f69 Michael Hanselmann
    - fields: Names of fields to return
711 e2715f69 Michael Hanselmann

712 e2715f69 Michael Hanselmann
    """
713 85f03e0d Michael Hanselmann
    jobs = []
714 e2715f69 Michael Hanselmann
715 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
716 85f03e0d Michael Hanselmann
      if job is None:
717 85f03e0d Michael Hanselmann
        jobs.append(None)
718 85f03e0d Michael Hanselmann
      else:
719 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
720 e2715f69 Michael Hanselmann
721 85f03e0d Michael Hanselmann
    return jobs
722 e2715f69 Michael Hanselmann
723 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
724 db37da70 Michael Hanselmann
  @_RequireOpenQueue
725 e2715f69 Michael Hanselmann
  def Shutdown(self):
726 e2715f69 Michael Hanselmann
    """Stops the job queue.
727 e2715f69 Michael Hanselmann

728 e2715f69 Michael Hanselmann
    """
729 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
730 85f03e0d Michael Hanselmann
731 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
732 04ab05ce Michael Hanselmann
    self._queue_lock = None