Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 70552c46

History | View | Annotate | Download (20.2 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 498ae1cc Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 6c5a7090 Michael Hanselmann
Locking:
25 6c5a7090 Michael Hanselmann
There's a single, large lock in the JobQueue class. It's used by all other
26 6c5a7090 Michael Hanselmann
classes in this module.
27 6c5a7090 Michael Hanselmann

28 6c5a7090 Michael Hanselmann
"""
29 498ae1cc Iustin Pop
30 f1da30e6 Michael Hanselmann
import os
31 e2715f69 Michael Hanselmann
import logging
32 e2715f69 Michael Hanselmann
import threading
33 f1da30e6 Michael Hanselmann
import errno
34 f1da30e6 Michael Hanselmann
import re
35 f1048938 Iustin Pop
import time
36 498ae1cc Iustin Pop
37 e2715f69 Michael Hanselmann
from ganeti import constants
38 f1da30e6 Michael Hanselmann
from ganeti import serializer
39 e2715f69 Michael Hanselmann
from ganeti import workerpool
40 f1da30e6 Michael Hanselmann
from ganeti import opcodes
41 7a1ecaed Iustin Pop
from ganeti import errors
42 e2715f69 Michael Hanselmann
from ganeti import mcpu
43 7996a135 Iustin Pop
from ganeti import utils
44 04ab05ce Michael Hanselmann
from ganeti import jstore
45 c3f0a12f Iustin Pop
from ganeti import rpc
46 e2715f69 Michael Hanselmann
47 e2715f69 Michael Hanselmann
48 e2715f69 Michael Hanselmann
JOBQUEUE_THREADS = 5
49 e2715f69 Michael Hanselmann
50 498ae1cc Iustin Pop
51 70552c46 Michael Hanselmann
def TimeStampNow():
52 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
53 70552c46 Michael Hanselmann
54 70552c46 Michael Hanselmann
55 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
56 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
57 e2715f69 Michael Hanselmann

58 f1048938 Iustin Pop
  The 'log' attribute holds the execution log and consists of tuples
59 6c5a7090 Michael Hanselmann
  of the form (log_serial, timestamp, level, message).
60 f1048938 Iustin Pop

61 e2715f69 Michael Hanselmann
  """
62 85f03e0d Michael Hanselmann
  def __init__(self, op):
63 85f03e0d Michael Hanselmann
    self.input = op
64 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
65 85f03e0d Michael Hanselmann
    self.result = None
66 85f03e0d Michael Hanselmann
    self.log = []
67 70552c46 Michael Hanselmann
    self.start_timestamp = None
68 70552c46 Michael Hanselmann
    self.end_timestamp = None
69 f1da30e6 Michael Hanselmann
70 f1da30e6 Michael Hanselmann
  @classmethod
71 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
72 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
73 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
74 85f03e0d Michael Hanselmann
    obj.status = state["status"]
75 85f03e0d Michael Hanselmann
    obj.result = state["result"]
76 85f03e0d Michael Hanselmann
    obj.log = state["log"]
77 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
78 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
79 f1da30e6 Michael Hanselmann
    return obj
80 f1da30e6 Michael Hanselmann
81 f1da30e6 Michael Hanselmann
  def Serialize(self):
82 6c5a7090 Michael Hanselmann
    return {
83 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
84 6c5a7090 Michael Hanselmann
      "status": self.status,
85 6c5a7090 Michael Hanselmann
      "result": self.result,
86 6c5a7090 Michael Hanselmann
      "log": self.log,
87 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
88 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
89 6c5a7090 Michael Hanselmann
      }
90 f1048938 Iustin Pop
91 e2715f69 Michael Hanselmann
92 e2715f69 Michael Hanselmann
class _QueuedJob(object):
93 e2715f69 Michael Hanselmann
  """In-memory job representation.
94 e2715f69 Michael Hanselmann

95 6c5a7090 Michael Hanselmann
  This is what we use to track the user-submitted jobs. Locking must be taken
96 6c5a7090 Michael Hanselmann
  care of by users of this class.
97 e2715f69 Michael Hanselmann

98 e2715f69 Michael Hanselmann
  """
99 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
100 e2715f69 Michael Hanselmann
    if not ops:
101 e2715f69 Michael Hanselmann
      # TODO
102 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
103 e2715f69 Michael Hanselmann
104 85f03e0d Michael Hanselmann
    self.queue = queue
105 f1da30e6 Michael Hanselmann
    self.id = job_id
106 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
107 85f03e0d Michael Hanselmann
    self.run_op_index = -1
108 6c5a7090 Michael Hanselmann
    self.log_serial = 0
109 6c5a7090 Michael Hanselmann
110 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
111 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
112 f1da30e6 Michael Hanselmann
113 f1da30e6 Michael Hanselmann
  @classmethod
114 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
115 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
116 85f03e0d Michael Hanselmann
    obj.queue = queue
117 85f03e0d Michael Hanselmann
    obj.id = state["id"]
118 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
119 6c5a7090 Michael Hanselmann
120 6c5a7090 Michael Hanselmann
    obj.ops = []
121 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
122 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
123 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
124 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
125 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
126 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
127 6c5a7090 Michael Hanselmann
128 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
129 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
130 6c5a7090 Michael Hanselmann
131 f1da30e6 Michael Hanselmann
    return obj
132 f1da30e6 Michael Hanselmann
133 f1da30e6 Michael Hanselmann
  def Serialize(self):
134 f1da30e6 Michael Hanselmann
    return {
135 f1da30e6 Michael Hanselmann
      "id": self.id,
136 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
137 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
138 f1da30e6 Michael Hanselmann
      }
139 f1da30e6 Michael Hanselmann
140 85f03e0d Michael Hanselmann
  def CalcStatus(self):
141 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
142 e2715f69 Michael Hanselmann
143 e2715f69 Michael Hanselmann
    all_success = True
144 85f03e0d Michael Hanselmann
    for op in self.ops:
145 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
146 e2715f69 Michael Hanselmann
        continue
147 e2715f69 Michael Hanselmann
148 e2715f69 Michael Hanselmann
      all_success = False
149 e2715f69 Michael Hanselmann
150 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
151 e2715f69 Michael Hanselmann
        pass
152 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
153 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
154 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
155 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
156 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
157 f1da30e6 Michael Hanselmann
        break
158 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
159 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
160 4cb1d919 Michael Hanselmann
        break
161 e2715f69 Michael Hanselmann
162 e2715f69 Michael Hanselmann
    if all_success:
163 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
164 e2715f69 Michael Hanselmann
165 e2715f69 Michael Hanselmann
    return status
166 e2715f69 Michael Hanselmann
167 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
168 6c5a7090 Michael Hanselmann
    if newer_than is None:
169 6c5a7090 Michael Hanselmann
      serial = -1
170 6c5a7090 Michael Hanselmann
    else:
171 6c5a7090 Michael Hanselmann
      serial = newer_than
172 6c5a7090 Michael Hanselmann
173 6c5a7090 Michael Hanselmann
    entries = []
174 6c5a7090 Michael Hanselmann
    for op in self.ops:
175 6c5a7090 Michael Hanselmann
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
176 6c5a7090 Michael Hanselmann
177 6c5a7090 Michael Hanselmann
    return entries
178 6c5a7090 Michael Hanselmann
179 f1048938 Iustin Pop
180 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
181 85f03e0d Michael Hanselmann
  def RunTask(self, job):
182 e2715f69 Michael Hanselmann
    """Job executor.
183 e2715f69 Michael Hanselmann

184 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
185 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
186 e2715f69 Michael Hanselmann

187 e2715f69 Michael Hanselmann
    """
188 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
189 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
190 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
191 85f03e0d Michael Hanselmann
    queue = job.queue
192 e2715f69 Michael Hanselmann
    try:
193 85f03e0d Michael Hanselmann
      try:
194 85f03e0d Michael Hanselmann
        count = len(job.ops)
195 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
196 85f03e0d Michael Hanselmann
          try:
197 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
198 85f03e0d Michael Hanselmann
199 85f03e0d Michael Hanselmann
            queue.acquire()
200 85f03e0d Michael Hanselmann
            try:
201 85f03e0d Michael Hanselmann
              job.run_op_index = idx
202 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_RUNNING
203 85f03e0d Michael Hanselmann
              op.result = None
204 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
205 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
206 85f03e0d Michael Hanselmann
207 38206f3c Iustin Pop
              input_opcode = op.input
208 85f03e0d Michael Hanselmann
            finally:
209 85f03e0d Michael Hanselmann
              queue.release()
210 85f03e0d Michael Hanselmann
211 dfe57c22 Michael Hanselmann
            def _Log(*args):
212 6c5a7090 Michael Hanselmann
              """Append a log entry.
213 6c5a7090 Michael Hanselmann

214 6c5a7090 Michael Hanselmann
              """
215 6c5a7090 Michael Hanselmann
              assert len(args) < 3
216 6c5a7090 Michael Hanselmann
217 6c5a7090 Michael Hanselmann
              if len(args) == 1:
218 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
219 6c5a7090 Michael Hanselmann
                log_msg = args[0]
220 6c5a7090 Michael Hanselmann
              else:
221 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
222 6c5a7090 Michael Hanselmann
223 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
224 6c5a7090 Michael Hanselmann
              # precision.
225 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
226 dfe57c22 Michael Hanselmann
227 6c5a7090 Michael Hanselmann
              queue.acquire()
228 dfe57c22 Michael Hanselmann
              try:
229 6c5a7090 Michael Hanselmann
                job.log_serial += 1
230 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
231 6c5a7090 Michael Hanselmann
232 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
233 dfe57c22 Michael Hanselmann
              finally:
234 6c5a7090 Michael Hanselmann
                queue.release()
235 dfe57c22 Michael Hanselmann
236 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
237 dfe57c22 Michael Hanselmann
            result = proc.ExecOpCode(input_opcode, _Log)
238 85f03e0d Michael Hanselmann
239 85f03e0d Michael Hanselmann
            queue.acquire()
240 85f03e0d Michael Hanselmann
            try:
241 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
242 85f03e0d Michael Hanselmann
              op.result = result
243 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
244 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
245 85f03e0d Michael Hanselmann
            finally:
246 85f03e0d Michael Hanselmann
              queue.release()
247 85f03e0d Michael Hanselmann
248 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
249 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
250 85f03e0d Michael Hanselmann
          except Exception, err:
251 85f03e0d Michael Hanselmann
            queue.acquire()
252 85f03e0d Michael Hanselmann
            try:
253 85f03e0d Michael Hanselmann
              try:
254 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
255 85f03e0d Michael Hanselmann
                op.result = str(err)
256 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
257 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
258 85f03e0d Michael Hanselmann
              finally:
259 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
260 85f03e0d Michael Hanselmann
            finally:
261 85f03e0d Michael Hanselmann
              queue.release()
262 85f03e0d Michael Hanselmann
            raise
263 85f03e0d Michael Hanselmann
264 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
265 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
266 85f03e0d Michael Hanselmann
      except:
267 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
268 e2715f69 Michael Hanselmann
    finally:
269 85f03e0d Michael Hanselmann
      queue.acquire()
270 85f03e0d Michael Hanselmann
      try:
271 65548ed5 Michael Hanselmann
        try:
272 65548ed5 Michael Hanselmann
          job.run_op_idx = -1
273 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
274 65548ed5 Michael Hanselmann
        finally:
275 65548ed5 Michael Hanselmann
          job_id = job.id
276 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
277 85f03e0d Michael Hanselmann
      finally:
278 85f03e0d Michael Hanselmann
        queue.release()
279 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
280 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
281 e2715f69 Michael Hanselmann
282 e2715f69 Michael Hanselmann
283 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
284 5bdce580 Michael Hanselmann
  def __init__(self, queue):
285 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
286 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
287 5bdce580 Michael Hanselmann
    self.queue = queue
288 e2715f69 Michael Hanselmann
289 e2715f69 Michael Hanselmann
290 85f03e0d Michael Hanselmann
class JobQueue(object):
291 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
292 f1da30e6 Michael Hanselmann
293 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
294 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
295 db37da70 Michael Hanselmann

296 db37da70 Michael Hanselmann
    This function should be used for all "public" functions. That is, functions
297 db37da70 Michael Hanselmann
    usually called from other classes.
298 db37da70 Michael Hanselmann

299 db37da70 Michael Hanselmann
    Important: Use this decorator only after utils.LockedMethod!
300 db37da70 Michael Hanselmann

301 db37da70 Michael Hanselmann
    Example:
302 db37da70 Michael Hanselmann
      @utils.LockedMethod
303 db37da70 Michael Hanselmann
      @_RequireOpenQueue
304 db37da70 Michael Hanselmann
      def Example(self):
305 db37da70 Michael Hanselmann
        pass
306 db37da70 Michael Hanselmann

307 db37da70 Michael Hanselmann
    """
308 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
309 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
310 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
311 db37da70 Michael Hanselmann
    return wrapper
312 db37da70 Michael Hanselmann
313 85f03e0d Michael Hanselmann
  def __init__(self, context):
314 5bdce580 Michael Hanselmann
    self.context = context
315 ac0930b9 Iustin Pop
    self._memcache = {}
316 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
317 f1da30e6 Michael Hanselmann
318 85f03e0d Michael Hanselmann
    # Locking
319 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
320 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
321 85f03e0d Michael Hanselmann
    self.release = self._lock.release
322 85f03e0d Michael Hanselmann
323 04ab05ce Michael Hanselmann
    # Initialize
324 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
325 f1da30e6 Michael Hanselmann
326 04ab05ce Michael Hanselmann
    # Read serial file
327 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
328 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
329 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
330 c4beba1c Iustin Pop
331 23752136 Michael Hanselmann
    # Get initial list of nodes
332 8e00939c Michael Hanselmann
    self._nodes = set(self.context.cfg.GetNodeList())
333 8e00939c Michael Hanselmann
334 8e00939c Michael Hanselmann
    # Remove master node
335 8e00939c Michael Hanselmann
    try:
336 8e00939c Michael Hanselmann
      self._nodes.remove(self._my_hostname)
337 8e00939c Michael Hanselmann
    except ValueError:
338 8e00939c Michael Hanselmann
      pass
339 23752136 Michael Hanselmann
340 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
341 23752136 Michael Hanselmann
342 85f03e0d Michael Hanselmann
    # Setup worker pool
343 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
344 85f03e0d Michael Hanselmann
345 85f03e0d Michael Hanselmann
    # We need to lock here because WorkerPool.AddTask() may start a job while
346 85f03e0d Michael Hanselmann
    # we're still doing our work.
347 85f03e0d Michael Hanselmann
    self.acquire()
348 85f03e0d Michael Hanselmann
    try:
349 85f03e0d Michael Hanselmann
      for job in self._GetJobsUnlocked(None):
350 85f03e0d Michael Hanselmann
        status = job.CalcStatus()
351 85f03e0d Michael Hanselmann
352 85f03e0d Michael Hanselmann
        if status in (constants.JOB_STATUS_QUEUED, ):
353 85f03e0d Michael Hanselmann
          self._wpool.AddTask(job)
354 85f03e0d Michael Hanselmann
355 85f03e0d Michael Hanselmann
        elif status in (constants.JOB_STATUS_RUNNING, ):
356 85f03e0d Michael Hanselmann
          logging.warning("Unfinished job %s found: %s", job.id, job)
357 85f03e0d Michael Hanselmann
          try:
358 85f03e0d Michael Hanselmann
            for op in job.ops:
359 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_ERROR
360 85f03e0d Michael Hanselmann
              op.result = "Unclean master daemon shutdown"
361 85f03e0d Michael Hanselmann
          finally:
362 85f03e0d Michael Hanselmann
            self.UpdateJobUnlocked(job)
363 85f03e0d Michael Hanselmann
    finally:
364 85f03e0d Michael Hanselmann
      self.release()
365 85f03e0d Michael Hanselmann
366 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
367 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
368 d2e03a33 Michael Hanselmann
  def AddNode(self, node_name):
369 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
370 23752136 Michael Hanselmann
371 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
372 9f774ee8 Michael Hanselmann
    rpc.call_jobqueue_purge(node_name)
373 23752136 Michael Hanselmann
374 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
375 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
376 23752136 Michael Hanselmann
377 d2e03a33 Michael Hanselmann
    # Upload current serial file
378 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
379 d2e03a33 Michael Hanselmann
380 d2e03a33 Michael Hanselmann
    for file_name in files:
381 9f774ee8 Michael Hanselmann
      # Read file content
382 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
383 9f774ee8 Michael Hanselmann
      try:
384 9f774ee8 Michael Hanselmann
        content = fd.read()
385 9f774ee8 Michael Hanselmann
      finally:
386 9f774ee8 Michael Hanselmann
        fd.close()
387 9f774ee8 Michael Hanselmann
388 9f774ee8 Michael Hanselmann
      result = rpc.call_jobqueue_update([node_name], file_name, content)
389 d2e03a33 Michael Hanselmann
      if not result[node_name]:
390 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
391 d2e03a33 Michael Hanselmann
392 d2e03a33 Michael Hanselmann
    self._nodes.add(node_name)
393 d2e03a33 Michael Hanselmann
394 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
395 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
396 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
397 23752136 Michael Hanselmann
    try:
398 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
399 d2e03a33 Michael Hanselmann
      self._nodes.remove(node_name)
400 d2e03a33 Michael Hanselmann
    except KeyError:
401 23752136 Michael Hanselmann
      pass
402 23752136 Michael Hanselmann
403 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
404 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
405 8e00939c Michael Hanselmann

406 8e00939c Michael Hanselmann
    """
407 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
408 8e00939c Michael Hanselmann
409 23752136 Michael Hanselmann
    failed_nodes = 0
410 9f774ee8 Michael Hanselmann
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
411 8e00939c Michael Hanselmann
    for node in self._nodes:
412 23752136 Michael Hanselmann
      if not result[node]:
413 23752136 Michael Hanselmann
        failed_nodes += 1
414 23752136 Michael Hanselmann
        logging.error("Copy of job queue file to node %s failed", node)
415 23752136 Michael Hanselmann
416 23752136 Michael Hanselmann
    # TODO: check failed_nodes
417 23752136 Michael Hanselmann
418 abc1f2ce Michael Hanselmann
  def _RenameFileUnlocked(self, old, new):
419 abc1f2ce Michael Hanselmann
    os.rename(old, new)
420 abc1f2ce Michael Hanselmann
421 abc1f2ce Michael Hanselmann
    result = rpc.call_jobqueue_rename(self._nodes, old, new)
422 abc1f2ce Michael Hanselmann
    for node in self._nodes:
423 abc1f2ce Michael Hanselmann
      if not result[node]:
424 abc1f2ce Michael Hanselmann
        logging.error("Moving %s to %s failed on %s", old, new, node)
425 abc1f2ce Michael Hanselmann
426 abc1f2ce Michael Hanselmann
    # TODO: check failed nodes
427 abc1f2ce Michael Hanselmann
428 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
429 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
430 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
431 85f03e0d Michael Hanselmann
    if job_id < 0:
432 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
433 85f03e0d Michael Hanselmann
434 85f03e0d Michael Hanselmann
    return str(job_id)
435 85f03e0d Michael Hanselmann
436 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
437 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
438 f1da30e6 Michael Hanselmann

439 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
440 f1da30e6 Michael Hanselmann

441 f1da30e6 Michael Hanselmann
    Returns: A string representing the job identifier.
442 f1da30e6 Michael Hanselmann

443 f1da30e6 Michael Hanselmann
    """
444 f1da30e6 Michael Hanselmann
    # New number
445 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
446 f1da30e6 Michael Hanselmann
447 f1da30e6 Michael Hanselmann
    # Write to file
448 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
449 23752136 Michael Hanselmann
                                        "%s\n" % serial)
450 f1da30e6 Michael Hanselmann
451 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
452 f1da30e6 Michael Hanselmann
    self._last_serial = serial
453 f1da30e6 Michael Hanselmann
454 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
455 f1da30e6 Michael Hanselmann
456 85f03e0d Michael Hanselmann
  @staticmethod
457 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
458 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
459 f1da30e6 Michael Hanselmann
460 85f03e0d Michael Hanselmann
  @staticmethod
461 85f03e0d Michael Hanselmann
  def _GetArchivedJobPath(job_id):
462 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
463 0cb94105 Michael Hanselmann
464 85f03e0d Michael Hanselmann
  @classmethod
465 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
466 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
467 fae737ac Michael Hanselmann
    if m:
468 fae737ac Michael Hanselmann
      return m.group(1)
469 fae737ac Michael Hanselmann
    else:
470 fae737ac Michael Hanselmann
      return None
471 fae737ac Michael Hanselmann
472 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
473 911a495b Iustin Pop
    """Return all known job IDs.
474 911a495b Iustin Pop

475 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
476 911a495b Iustin Pop
    included. Currently this argument is unused.
477 911a495b Iustin Pop

478 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
479 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
480 ac0930b9 Iustin Pop
    extra IDs).
481 ac0930b9 Iustin Pop

482 911a495b Iustin Pop
    """
483 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
484 f0d874fe Iustin Pop
    jlist.sort()
485 f0d874fe Iustin Pop
    return jlist
486 911a495b Iustin Pop
487 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
488 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
489 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
490 f1da30e6 Michael Hanselmann
491 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
492 ac0930b9 Iustin Pop
    if job_id in self._memcache:
493 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
494 ac0930b9 Iustin Pop
      return self._memcache[job_id]
495 ac0930b9 Iustin Pop
496 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
497 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
498 f1da30e6 Michael Hanselmann
    try:
499 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
500 f1da30e6 Michael Hanselmann
    except IOError, err:
501 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
502 f1da30e6 Michael Hanselmann
        return None
503 f1da30e6 Michael Hanselmann
      raise
504 f1da30e6 Michael Hanselmann
    try:
505 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
506 f1da30e6 Michael Hanselmann
    finally:
507 f1da30e6 Michael Hanselmann
      fd.close()
508 f1da30e6 Michael Hanselmann
509 ac0930b9 Iustin Pop
    job = _QueuedJob.Restore(self, data)
510 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
511 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
512 ac0930b9 Iustin Pop
    return job
513 f1da30e6 Michael Hanselmann
514 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
515 911a495b Iustin Pop
    if not job_ids:
516 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
517 f1da30e6 Michael Hanselmann
518 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
519 f1da30e6 Michael Hanselmann
520 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
521 db37da70 Michael Hanselmann
  @_RequireOpenQueue
522 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
523 85f03e0d Michael Hanselmann
    """Create and store a new job.
524 f1da30e6 Michael Hanselmann

525 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
526 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
527 c3f0a12f Iustin Pop

528 c3f0a12f Iustin Pop
    @type ops: list
529 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
530 c3f0a12f Iustin Pop

531 c3f0a12f Iustin Pop
    """
532 f1da30e6 Michael Hanselmann
    # Get job identifier
533 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
534 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
535 f1da30e6 Michael Hanselmann
536 f1da30e6 Michael Hanselmann
    # Write to disk
537 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
538 f1da30e6 Michael Hanselmann
539 205d71fd Michael Hanselmann
    logging.debug("Added new job %s to the cache", job_id)
540 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
541 ac0930b9 Iustin Pop
542 85f03e0d Michael Hanselmann
    # Add to worker pool
543 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
544 85f03e0d Michael Hanselmann
545 85f03e0d Michael Hanselmann
    return job.id
546 f1da30e6 Michael Hanselmann
547 db37da70 Michael Hanselmann
  @_RequireOpenQueue
548 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
549 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
550 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
551 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
552 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
553 57f8615f Michael Hanselmann
    self._CleanCacheUnlocked([job.id])
554 ac0930b9 Iustin Pop
555 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
556 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
557 dfe57c22 Michael Hanselmann
558 57f8615f Michael Hanselmann
  def _CleanCacheUnlocked(self, exclude):
559 ac0930b9 Iustin Pop
    """Clean the memory cache.
560 ac0930b9 Iustin Pop

561 ac0930b9 Iustin Pop
    The exceptions argument contains job IDs that should not be
562 ac0930b9 Iustin Pop
    cleaned.
563 ac0930b9 Iustin Pop

564 ac0930b9 Iustin Pop
    """
565 57f8615f Michael Hanselmann
    assert isinstance(exclude, list)
566 85f03e0d Michael Hanselmann
567 ac0930b9 Iustin Pop
    for job in self._memcache.values():
568 57f8615f Michael Hanselmann
      if job.id in exclude:
569 ac0930b9 Iustin Pop
        continue
570 85f03e0d Michael Hanselmann
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
571 85f03e0d Michael Hanselmann
                                  constants.JOB_STATUS_RUNNING):
572 205d71fd Michael Hanselmann
        logging.debug("Cleaning job %s from the cache", job.id)
573 ac0930b9 Iustin Pop
        try:
574 ac0930b9 Iustin Pop
          del self._memcache[job.id]
575 ac0930b9 Iustin Pop
        except KeyError:
576 ac0930b9 Iustin Pop
          pass
577 f1da30e6 Michael Hanselmann
578 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
579 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
580 6c5a7090 Michael Hanselmann
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
581 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
582 6c5a7090 Michael Hanselmann

583 6c5a7090 Michael Hanselmann
    @type job_id: string
584 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
585 6c5a7090 Michael Hanselmann
    @type fields: list of strings
586 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
587 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
588 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
589 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
590 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
591 6c5a7090 Michael Hanselmann

592 6c5a7090 Michael Hanselmann
    """
593 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
594 dfe57c22 Michael Hanselmann
595 dfe57c22 Michael Hanselmann
    while True:
596 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
597 6c5a7090 Michael Hanselmann
      if not job:
598 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
599 6c5a7090 Michael Hanselmann
        new_state = None
600 6c5a7090 Michael Hanselmann
        break
601 dfe57c22 Michael Hanselmann
602 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
603 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
604 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
605 dfe57c22 Michael Hanselmann
606 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
607 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
608 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
609 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
610 dfe57c22 Michael Hanselmann
      # significantly different.
611 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
612 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
613 dfe57c22 Michael Hanselmann
614 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
615 6c5a7090 Michael Hanselmann
                        constants.JOB_STATUS_RUNNING):
616 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
617 6c5a7090 Michael Hanselmann
        # no changes.
618 dfe57c22 Michael Hanselmann
        break
619 dfe57c22 Michael Hanselmann
620 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
621 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
622 6c5a7090 Michael Hanselmann
        break
623 6c5a7090 Michael Hanselmann
624 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
625 6c5a7090 Michael Hanselmann
626 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
627 6c5a7090 Michael Hanselmann
      job.change.wait()
628 dfe57c22 Michael Hanselmann
629 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
630 dfe57c22 Michael Hanselmann
631 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
632 dfe57c22 Michael Hanselmann
633 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
634 db37da70 Michael Hanselmann
  @_RequireOpenQueue
635 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
636 188c5e0a Michael Hanselmann
    """Cancels a job.
637 188c5e0a Michael Hanselmann

638 188c5e0a Michael Hanselmann
    @type job_id: string
639 188c5e0a Michael Hanselmann
    @param job_id: Job ID of job to be cancelled.
640 188c5e0a Michael Hanselmann

641 188c5e0a Michael Hanselmann
    """
642 188c5e0a Michael Hanselmann
    logging.debug("Cancelling job %s", job_id)
643 188c5e0a Michael Hanselmann
644 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
645 188c5e0a Michael Hanselmann
    if not job:
646 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
647 188c5e0a Michael Hanselmann
      return
648 188c5e0a Michael Hanselmann
649 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
650 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
651 188c5e0a Michael Hanselmann
      return
652 188c5e0a Michael Hanselmann
653 85f03e0d Michael Hanselmann
    try:
654 85f03e0d Michael Hanselmann
      for op in job.ops:
655 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
656 85f03e0d Michael Hanselmann
        op.result = "Job cancelled by request"
657 85f03e0d Michael Hanselmann
    finally:
658 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
659 188c5e0a Michael Hanselmann
660 c609f802 Michael Hanselmann
  @utils.LockedMethod
661 db37da70 Michael Hanselmann
  @_RequireOpenQueue
662 f1da30e6 Michael Hanselmann
  def ArchiveJob(self, job_id):
663 c609f802 Michael Hanselmann
    """Archives a job.
664 c609f802 Michael Hanselmann

665 c609f802 Michael Hanselmann
    @type job_id: string
666 c609f802 Michael Hanselmann
    @param job_id: Job ID of job to be archived.
667 c609f802 Michael Hanselmann

668 c609f802 Michael Hanselmann
    """
669 c609f802 Michael Hanselmann
    logging.debug("Archiving job %s", job_id)
670 c609f802 Michael Hanselmann
671 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
672 c609f802 Michael Hanselmann
    if not job:
673 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
674 c609f802 Michael Hanselmann
      return
675 c609f802 Michael Hanselmann
676 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
677 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_SUCCESS,
678 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_ERROR):
679 85f03e0d Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
680 c609f802 Michael Hanselmann
      return
681 c609f802 Michael Hanselmann
682 c609f802 Michael Hanselmann
    try:
683 c609f802 Michael Hanselmann
      old = self._GetJobPath(job.id)
684 c609f802 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
685 c609f802 Michael Hanselmann
686 abc1f2ce Michael Hanselmann
      self._RenameFileUnlocked(old, new)
687 c609f802 Michael Hanselmann
688 c609f802 Michael Hanselmann
      logging.debug("Successfully archived job %s", job.id)
689 c609f802 Michael Hanselmann
    finally:
690 c609f802 Michael Hanselmann
      # Cleaning the cache because we don't know what os.rename actually did
691 c609f802 Michael Hanselmann
      # and to be on the safe side.
692 c609f802 Michael Hanselmann
      self._CleanCacheUnlocked([])
693 f1da30e6 Michael Hanselmann
694 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
695 e2715f69 Michael Hanselmann
    row = []
696 e2715f69 Michael Hanselmann
    for fname in fields:
697 e2715f69 Michael Hanselmann
      if fname == "id":
698 e2715f69 Michael Hanselmann
        row.append(job.id)
699 e2715f69 Michael Hanselmann
      elif fname == "status":
700 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
701 af30b2fd Michael Hanselmann
      elif fname == "ops":
702 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
703 af30b2fd Michael Hanselmann
      elif fname == "opresult":
704 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
705 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
706 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
707 e2715f69 Michael Hanselmann
      else:
708 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
709 e2715f69 Michael Hanselmann
    return row
710 e2715f69 Michael Hanselmann
711 85f03e0d Michael Hanselmann
  @utils.LockedMethod
712 db37da70 Michael Hanselmann
  @_RequireOpenQueue
713 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
714 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
715 e2715f69 Michael Hanselmann

716 e2715f69 Michael Hanselmann
    Args:
717 e2715f69 Michael Hanselmann
    - job_ids: Sequence of job identifiers or None for all
718 e2715f69 Michael Hanselmann
    - fields: Names of fields to return
719 e2715f69 Michael Hanselmann

720 e2715f69 Michael Hanselmann
    """
721 85f03e0d Michael Hanselmann
    jobs = []
722 e2715f69 Michael Hanselmann
723 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
724 85f03e0d Michael Hanselmann
      if job is None:
725 85f03e0d Michael Hanselmann
        jobs.append(None)
726 85f03e0d Michael Hanselmann
      else:
727 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
728 e2715f69 Michael Hanselmann
729 85f03e0d Michael Hanselmann
    return jobs
730 e2715f69 Michael Hanselmann
731 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
732 db37da70 Michael Hanselmann
  @_RequireOpenQueue
733 e2715f69 Michael Hanselmann
  def Shutdown(self):
734 e2715f69 Michael Hanselmann
    """Stops the job queue.
735 e2715f69 Michael Hanselmann

736 e2715f69 Michael Hanselmann
    """
737 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
738 85f03e0d Michael Hanselmann
739 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
740 04ab05ce Michael Hanselmann
    self._queue_lock = None