Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 1daae384

History | View | Annotate | Download (21.1 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 6c5a7090 Michael Hanselmann
Locking:
25 6c5a7090 Michael Hanselmann
There's a single, large lock in the JobQueue class. It's used by all other
26 6c5a7090 Michael Hanselmann
classes in this module.
27 6c5a7090 Michael Hanselmann

28 6c5a7090 Michael Hanselmann
"""
29 498ae1cc Iustin Pop
30 f1da30e6 Michael Hanselmann
import os
31 e2715f69 Michael Hanselmann
import logging
32 e2715f69 Michael Hanselmann
import threading
33 f1da30e6 Michael Hanselmann
import errno
34 f1da30e6 Michael Hanselmann
import re
35 f1048938 Iustin Pop
import time
36 5685c1a5 Michael Hanselmann
import weakref
37 498ae1cc Iustin Pop
38 e2715f69 Michael Hanselmann
from ganeti import constants
39 f1da30e6 Michael Hanselmann
from ganeti import serializer
40 e2715f69 Michael Hanselmann
from ganeti import workerpool
41 f1da30e6 Michael Hanselmann
from ganeti import opcodes
42 7a1ecaed Iustin Pop
from ganeti import errors
43 e2715f69 Michael Hanselmann
from ganeti import mcpu
44 7996a135 Iustin Pop
from ganeti import utils
45 04ab05ce Michael Hanselmann
from ganeti import jstore
46 c3f0a12f Iustin Pop
from ganeti import rpc
47 e2715f69 Michael Hanselmann
48 e2715f69 Michael Hanselmann
49 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
50 e2715f69 Michael Hanselmann
51 498ae1cc Iustin Pop
52 70552c46 Michael Hanselmann
def TimeStampNow():
53 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
54 70552c46 Michael Hanselmann
55 70552c46 Michael Hanselmann
56 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
57 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
58 e2715f69 Michael Hanselmann

59 f1048938 Iustin Pop
  The 'log' attribute holds the execution log and consists of tuples
60 6c5a7090 Michael Hanselmann
  of the form (log_serial, timestamp, level, message).
61 f1048938 Iustin Pop

62 e2715f69 Michael Hanselmann
  """
63 85f03e0d Michael Hanselmann
  def __init__(self, op):
64 85f03e0d Michael Hanselmann
    self.input = op
65 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
66 85f03e0d Michael Hanselmann
    self.result = None
67 85f03e0d Michael Hanselmann
    self.log = []
68 70552c46 Michael Hanselmann
    self.start_timestamp = None
69 70552c46 Michael Hanselmann
    self.end_timestamp = None
70 f1da30e6 Michael Hanselmann
71 f1da30e6 Michael Hanselmann
  @classmethod
72 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
73 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
74 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
75 85f03e0d Michael Hanselmann
    obj.status = state["status"]
76 85f03e0d Michael Hanselmann
    obj.result = state["result"]
77 85f03e0d Michael Hanselmann
    obj.log = state["log"]
78 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
79 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
80 f1da30e6 Michael Hanselmann
    return obj
81 f1da30e6 Michael Hanselmann
82 f1da30e6 Michael Hanselmann
  def Serialize(self):
83 6c5a7090 Michael Hanselmann
    return {
84 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
85 6c5a7090 Michael Hanselmann
      "status": self.status,
86 6c5a7090 Michael Hanselmann
      "result": self.result,
87 6c5a7090 Michael Hanselmann
      "log": self.log,
88 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
89 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
90 6c5a7090 Michael Hanselmann
      }
91 f1048938 Iustin Pop
92 e2715f69 Michael Hanselmann
93 e2715f69 Michael Hanselmann
class _QueuedJob(object):
94 e2715f69 Michael Hanselmann
  """In-memory job representation.
95 e2715f69 Michael Hanselmann

96 6c5a7090 Michael Hanselmann
  This is what we use to track the user-submitted jobs. Locking must be taken
97 6c5a7090 Michael Hanselmann
  care of by users of this class.
98 e2715f69 Michael Hanselmann

99 e2715f69 Michael Hanselmann
  """
100 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
101 e2715f69 Michael Hanselmann
    if not ops:
102 e2715f69 Michael Hanselmann
      # TODO
103 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
104 e2715f69 Michael Hanselmann
105 85f03e0d Michael Hanselmann
    self.queue = queue
106 f1da30e6 Michael Hanselmann
    self.id = job_id
107 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
108 85f03e0d Michael Hanselmann
    self.run_op_index = -1
109 6c5a7090 Michael Hanselmann
    self.log_serial = 0
110 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
111 c56ec146 Iustin Pop
    self.start_timestamp = None
112 c56ec146 Iustin Pop
    self.end_timestamp = None
113 6c5a7090 Michael Hanselmann
114 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
115 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
116 f1da30e6 Michael Hanselmann
117 f1da30e6 Michael Hanselmann
  @classmethod
118 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
119 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
120 85f03e0d Michael Hanselmann
    obj.queue = queue
121 85f03e0d Michael Hanselmann
    obj.id = state["id"]
122 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
123 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
124 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
125 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
126 6c5a7090 Michael Hanselmann
127 6c5a7090 Michael Hanselmann
    obj.ops = []
128 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
129 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
130 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
131 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
132 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
133 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
134 6c5a7090 Michael Hanselmann
135 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
136 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
137 6c5a7090 Michael Hanselmann
138 f1da30e6 Michael Hanselmann
    return obj
139 f1da30e6 Michael Hanselmann
140 f1da30e6 Michael Hanselmann
  def Serialize(self):
141 f1da30e6 Michael Hanselmann
    return {
142 f1da30e6 Michael Hanselmann
      "id": self.id,
143 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
144 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
145 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
146 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
147 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
148 f1da30e6 Michael Hanselmann
      }
149 f1da30e6 Michael Hanselmann
150 85f03e0d Michael Hanselmann
  def CalcStatus(self):
151 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
152 e2715f69 Michael Hanselmann
153 e2715f69 Michael Hanselmann
    all_success = True
154 85f03e0d Michael Hanselmann
    for op in self.ops:
155 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
156 e2715f69 Michael Hanselmann
        continue
157 e2715f69 Michael Hanselmann
158 e2715f69 Michael Hanselmann
      all_success = False
159 e2715f69 Michael Hanselmann
160 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
161 e2715f69 Michael Hanselmann
        pass
162 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
163 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
164 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
165 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
166 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
167 f1da30e6 Michael Hanselmann
        break
168 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
169 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
170 4cb1d919 Michael Hanselmann
        break
171 e2715f69 Michael Hanselmann
172 e2715f69 Michael Hanselmann
    if all_success:
173 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
174 e2715f69 Michael Hanselmann
175 e2715f69 Michael Hanselmann
    return status
176 e2715f69 Michael Hanselmann
177 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
178 6c5a7090 Michael Hanselmann
    if newer_than is None:
179 6c5a7090 Michael Hanselmann
      serial = -1
180 6c5a7090 Michael Hanselmann
    else:
181 6c5a7090 Michael Hanselmann
      serial = newer_than
182 6c5a7090 Michael Hanselmann
183 6c5a7090 Michael Hanselmann
    entries = []
184 6c5a7090 Michael Hanselmann
    for op in self.ops:
185 6c5a7090 Michael Hanselmann
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
186 6c5a7090 Michael Hanselmann
187 6c5a7090 Michael Hanselmann
    return entries
188 6c5a7090 Michael Hanselmann
189 f1048938 Iustin Pop
190 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
191 85f03e0d Michael Hanselmann
  def RunTask(self, job):
192 e2715f69 Michael Hanselmann
    """Job executor.
193 e2715f69 Michael Hanselmann

194 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
195 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
196 e2715f69 Michael Hanselmann

197 e2715f69 Michael Hanselmann
    """
198 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
199 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
200 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
201 85f03e0d Michael Hanselmann
    queue = job.queue
202 e2715f69 Michael Hanselmann
    try:
203 85f03e0d Michael Hanselmann
      try:
204 85f03e0d Michael Hanselmann
        count = len(job.ops)
205 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
206 85f03e0d Michael Hanselmann
          try:
207 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
208 85f03e0d Michael Hanselmann
209 85f03e0d Michael Hanselmann
            queue.acquire()
210 85f03e0d Michael Hanselmann
            try:
211 85f03e0d Michael Hanselmann
              job.run_op_index = idx
212 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_RUNNING
213 85f03e0d Michael Hanselmann
              op.result = None
214 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
215 c56ec146 Iustin Pop
              if idx == 0: # first opcode
216 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
217 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
218 85f03e0d Michael Hanselmann
219 38206f3c Iustin Pop
              input_opcode = op.input
220 85f03e0d Michael Hanselmann
            finally:
221 85f03e0d Michael Hanselmann
              queue.release()
222 85f03e0d Michael Hanselmann
223 dfe57c22 Michael Hanselmann
            def _Log(*args):
224 6c5a7090 Michael Hanselmann
              """Append a log entry.
225 6c5a7090 Michael Hanselmann

226 6c5a7090 Michael Hanselmann
              """
227 6c5a7090 Michael Hanselmann
              assert len(args) < 3
228 6c5a7090 Michael Hanselmann
229 6c5a7090 Michael Hanselmann
              if len(args) == 1:
230 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
231 6c5a7090 Michael Hanselmann
                log_msg = args[0]
232 6c5a7090 Michael Hanselmann
              else:
233 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
234 6c5a7090 Michael Hanselmann
235 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
236 6c5a7090 Michael Hanselmann
              # precision.
237 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
238 dfe57c22 Michael Hanselmann
239 6c5a7090 Michael Hanselmann
              queue.acquire()
240 dfe57c22 Michael Hanselmann
              try:
241 6c5a7090 Michael Hanselmann
                job.log_serial += 1
242 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
243 6c5a7090 Michael Hanselmann
244 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
245 dfe57c22 Michael Hanselmann
              finally:
246 6c5a7090 Michael Hanselmann
                queue.release()
247 dfe57c22 Michael Hanselmann
248 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
249 dfe57c22 Michael Hanselmann
            result = proc.ExecOpCode(input_opcode, _Log)
250 85f03e0d Michael Hanselmann
251 85f03e0d Michael Hanselmann
            queue.acquire()
252 85f03e0d Michael Hanselmann
            try:
253 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
254 85f03e0d Michael Hanselmann
              op.result = result
255 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
256 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
257 85f03e0d Michael Hanselmann
            finally:
258 85f03e0d Michael Hanselmann
              queue.release()
259 85f03e0d Michael Hanselmann
260 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
261 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
262 85f03e0d Michael Hanselmann
          except Exception, err:
263 85f03e0d Michael Hanselmann
            queue.acquire()
264 85f03e0d Michael Hanselmann
            try:
265 85f03e0d Michael Hanselmann
              try:
266 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
267 85f03e0d Michael Hanselmann
                op.result = str(err)
268 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
269 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
270 85f03e0d Michael Hanselmann
              finally:
271 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
272 85f03e0d Michael Hanselmann
            finally:
273 85f03e0d Michael Hanselmann
              queue.release()
274 85f03e0d Michael Hanselmann
            raise
275 85f03e0d Michael Hanselmann
276 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
277 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
278 85f03e0d Michael Hanselmann
      except:
279 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
280 e2715f69 Michael Hanselmann
    finally:
281 85f03e0d Michael Hanselmann
      queue.acquire()
282 85f03e0d Michael Hanselmann
      try:
283 65548ed5 Michael Hanselmann
        try:
284 65548ed5 Michael Hanselmann
          job.run_op_idx = -1
285 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
286 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
287 65548ed5 Michael Hanselmann
        finally:
288 65548ed5 Michael Hanselmann
          job_id = job.id
289 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
290 85f03e0d Michael Hanselmann
      finally:
291 85f03e0d Michael Hanselmann
        queue.release()
292 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
293 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
294 e2715f69 Michael Hanselmann
295 e2715f69 Michael Hanselmann
296 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
297 5bdce580 Michael Hanselmann
  def __init__(self, queue):
298 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
299 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
300 5bdce580 Michael Hanselmann
    self.queue = queue
301 e2715f69 Michael Hanselmann
302 e2715f69 Michael Hanselmann
303 85f03e0d Michael Hanselmann
class JobQueue(object):
304 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
305 f1da30e6 Michael Hanselmann
306 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
307 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
308 db37da70 Michael Hanselmann

309 db37da70 Michael Hanselmann
    This function should be used for all "public" functions. That is, functions
310 db37da70 Michael Hanselmann
    usually called from other classes.
311 db37da70 Michael Hanselmann

312 db37da70 Michael Hanselmann
    Important: Use this decorator only after utils.LockedMethod!
313 db37da70 Michael Hanselmann

314 db37da70 Michael Hanselmann
    Example:
315 db37da70 Michael Hanselmann
      @utils.LockedMethod
316 db37da70 Michael Hanselmann
      @_RequireOpenQueue
317 db37da70 Michael Hanselmann
      def Example(self):
318 db37da70 Michael Hanselmann
        pass
319 db37da70 Michael Hanselmann

320 db37da70 Michael Hanselmann
    """
321 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
322 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
323 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
324 db37da70 Michael Hanselmann
    return wrapper
325 db37da70 Michael Hanselmann
326 85f03e0d Michael Hanselmann
  def __init__(self, context):
327 5bdce580 Michael Hanselmann
    self.context = context
328 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
329 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
330 f1da30e6 Michael Hanselmann
331 85f03e0d Michael Hanselmann
    # Locking
332 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
333 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
334 85f03e0d Michael Hanselmann
    self.release = self._lock.release
335 85f03e0d Michael Hanselmann
336 04ab05ce Michael Hanselmann
    # Initialize
337 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
338 f1da30e6 Michael Hanselmann
339 04ab05ce Michael Hanselmann
    # Read serial file
340 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
341 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
342 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
343 c4beba1c Iustin Pop
344 23752136 Michael Hanselmann
    # Get initial list of nodes
345 8e00939c Michael Hanselmann
    self._nodes = set(self.context.cfg.GetNodeList())
346 8e00939c Michael Hanselmann
347 8e00939c Michael Hanselmann
    # Remove master node
348 8e00939c Michael Hanselmann
    try:
349 8e00939c Michael Hanselmann
      self._nodes.remove(self._my_hostname)
350 8e00939c Michael Hanselmann
    except ValueError:
351 8e00939c Michael Hanselmann
      pass
352 23752136 Michael Hanselmann
353 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
354 23752136 Michael Hanselmann
355 85f03e0d Michael Hanselmann
    # Setup worker pool
356 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
357 85f03e0d Michael Hanselmann
358 85f03e0d Michael Hanselmann
    # We need to lock here because WorkerPool.AddTask() may start a job while
359 85f03e0d Michael Hanselmann
    # we're still doing our work.
360 85f03e0d Michael Hanselmann
    self.acquire()
361 85f03e0d Michael Hanselmann
    try:
362 85f03e0d Michael Hanselmann
      for job in self._GetJobsUnlocked(None):
363 85f03e0d Michael Hanselmann
        status = job.CalcStatus()
364 85f03e0d Michael Hanselmann
365 85f03e0d Michael Hanselmann
        if status in (constants.JOB_STATUS_QUEUED, ):
366 85f03e0d Michael Hanselmann
          self._wpool.AddTask(job)
367 85f03e0d Michael Hanselmann
368 85f03e0d Michael Hanselmann
        elif status in (constants.JOB_STATUS_RUNNING, ):
369 85f03e0d Michael Hanselmann
          logging.warning("Unfinished job %s found: %s", job.id, job)
370 85f03e0d Michael Hanselmann
          try:
371 85f03e0d Michael Hanselmann
            for op in job.ops:
372 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_ERROR
373 85f03e0d Michael Hanselmann
              op.result = "Unclean master daemon shutdown"
374 85f03e0d Michael Hanselmann
          finally:
375 85f03e0d Michael Hanselmann
            self.UpdateJobUnlocked(job)
376 85f03e0d Michael Hanselmann
    finally:
377 85f03e0d Michael Hanselmann
      self.release()
378 85f03e0d Michael Hanselmann
379 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
380 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
381 d2e03a33 Michael Hanselmann
  def AddNode(self, node_name):
382 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
383 23752136 Michael Hanselmann
384 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
385 9f774ee8 Michael Hanselmann
    rpc.call_jobqueue_purge(node_name)
386 23752136 Michael Hanselmann
387 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
388 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
389 23752136 Michael Hanselmann
390 d2e03a33 Michael Hanselmann
    # Upload current serial file
391 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
392 d2e03a33 Michael Hanselmann
393 d2e03a33 Michael Hanselmann
    for file_name in files:
394 9f774ee8 Michael Hanselmann
      # Read file content
395 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
396 9f774ee8 Michael Hanselmann
      try:
397 9f774ee8 Michael Hanselmann
        content = fd.read()
398 9f774ee8 Michael Hanselmann
      finally:
399 9f774ee8 Michael Hanselmann
        fd.close()
400 9f774ee8 Michael Hanselmann
401 9f774ee8 Michael Hanselmann
      result = rpc.call_jobqueue_update([node_name], file_name, content)
402 d2e03a33 Michael Hanselmann
      if not result[node_name]:
403 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
404 d2e03a33 Michael Hanselmann
405 d2e03a33 Michael Hanselmann
    self._nodes.add(node_name)
406 d2e03a33 Michael Hanselmann
407 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
408 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
409 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
410 23752136 Michael Hanselmann
    try:
411 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
412 d2e03a33 Michael Hanselmann
      self._nodes.remove(node_name)
413 d2e03a33 Michael Hanselmann
    except KeyError:
414 23752136 Michael Hanselmann
      pass
415 23752136 Michael Hanselmann
416 e74798c1 Michael Hanselmann
  def _CheckRpcResult(self, result, nodes, failmsg):
417 e74798c1 Michael Hanselmann
    failed = []
418 e74798c1 Michael Hanselmann
    success = []
419 e74798c1 Michael Hanselmann
420 e74798c1 Michael Hanselmann
    for node in nodes:
421 e74798c1 Michael Hanselmann
      if result[node]:
422 e74798c1 Michael Hanselmann
        success.append(node)
423 e74798c1 Michael Hanselmann
      else:
424 e74798c1 Michael Hanselmann
        failed.append(node)
425 e74798c1 Michael Hanselmann
426 e74798c1 Michael Hanselmann
    if failed:
427 e74798c1 Michael Hanselmann
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
428 e74798c1 Michael Hanselmann
429 e74798c1 Michael Hanselmann
    # +1 for the master node
430 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
431 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
432 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
433 e74798c1 Michael Hanselmann
434 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
435 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
436 8e00939c Michael Hanselmann

437 8e00939c Michael Hanselmann
    """
438 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
439 8e00939c Michael Hanselmann
440 9f774ee8 Michael Hanselmann
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
441 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
442 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
443 23752136 Michael Hanselmann
444 abc1f2ce Michael Hanselmann
  def _RenameFileUnlocked(self, old, new):
445 abc1f2ce Michael Hanselmann
    os.rename(old, new)
446 abc1f2ce Michael Hanselmann
447 abc1f2ce Michael Hanselmann
    result = rpc.call_jobqueue_rename(self._nodes, old, new)
448 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
449 e74798c1 Michael Hanselmann
                         "Moving %s to %s" % (old, new))
450 abc1f2ce Michael Hanselmann
451 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
452 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
453 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
454 85f03e0d Michael Hanselmann
    if job_id < 0:
455 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
456 85f03e0d Michael Hanselmann
457 85f03e0d Michael Hanselmann
    return str(job_id)
458 85f03e0d Michael Hanselmann
459 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
460 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
461 f1da30e6 Michael Hanselmann

462 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
463 f1da30e6 Michael Hanselmann

464 f1da30e6 Michael Hanselmann
    Returns: A string representing the job identifier.
465 f1da30e6 Michael Hanselmann

466 f1da30e6 Michael Hanselmann
    """
467 f1da30e6 Michael Hanselmann
    # New number
468 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
469 f1da30e6 Michael Hanselmann
470 f1da30e6 Michael Hanselmann
    # Write to file
471 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
472 23752136 Michael Hanselmann
                                        "%s\n" % serial)
473 f1da30e6 Michael Hanselmann
474 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
475 f1da30e6 Michael Hanselmann
    self._last_serial = serial
476 f1da30e6 Michael Hanselmann
477 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
478 f1da30e6 Michael Hanselmann
479 85f03e0d Michael Hanselmann
  @staticmethod
480 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
481 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
482 f1da30e6 Michael Hanselmann
483 85f03e0d Michael Hanselmann
  @staticmethod
484 85f03e0d Michael Hanselmann
  def _GetArchivedJobPath(job_id):
485 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
486 0cb94105 Michael Hanselmann
487 85f03e0d Michael Hanselmann
  @classmethod
488 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
489 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
490 fae737ac Michael Hanselmann
    if m:
491 fae737ac Michael Hanselmann
      return m.group(1)
492 fae737ac Michael Hanselmann
    else:
493 fae737ac Michael Hanselmann
      return None
494 fae737ac Michael Hanselmann
495 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
496 911a495b Iustin Pop
    """Return all known job IDs.
497 911a495b Iustin Pop

498 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
499 911a495b Iustin Pop
    included. Currently this argument is unused.
500 911a495b Iustin Pop

501 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
502 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
503 ac0930b9 Iustin Pop
    extra IDs).
504 ac0930b9 Iustin Pop

505 911a495b Iustin Pop
    """
506 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
507 3b87986e Iustin Pop
    jlist = utils.NiceSort(jlist)
508 f0d874fe Iustin Pop
    return jlist
509 911a495b Iustin Pop
510 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
511 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
512 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
513 f1da30e6 Michael Hanselmann
514 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
515 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
516 5685c1a5 Michael Hanselmann
    if job:
517 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
518 5685c1a5 Michael Hanselmann
      return job
519 ac0930b9 Iustin Pop
520 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
521 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
522 f1da30e6 Michael Hanselmann
    try:
523 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
524 f1da30e6 Michael Hanselmann
    except IOError, err:
525 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
526 f1da30e6 Michael Hanselmann
        return None
527 f1da30e6 Michael Hanselmann
      raise
528 f1da30e6 Michael Hanselmann
    try:
529 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
530 f1da30e6 Michael Hanselmann
    finally:
531 f1da30e6 Michael Hanselmann
      fd.close()
532 f1da30e6 Michael Hanselmann
533 ac0930b9 Iustin Pop
    job = _QueuedJob.Restore(self, data)
534 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
535 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
536 ac0930b9 Iustin Pop
    return job
537 f1da30e6 Michael Hanselmann
538 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
539 911a495b Iustin Pop
    if not job_ids:
540 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
541 f1da30e6 Michael Hanselmann
542 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
543 f1da30e6 Michael Hanselmann
544 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
545 db37da70 Michael Hanselmann
  @_RequireOpenQueue
546 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
547 85f03e0d Michael Hanselmann
    """Create and store a new job.
548 f1da30e6 Michael Hanselmann

549 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
550 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
551 c3f0a12f Iustin Pop

552 c3f0a12f Iustin Pop
    @type ops: list
553 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
554 c3f0a12f Iustin Pop

555 c3f0a12f Iustin Pop
    """
556 f1da30e6 Michael Hanselmann
    # Get job identifier
557 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
558 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
559 f1da30e6 Michael Hanselmann
560 f1da30e6 Michael Hanselmann
    # Write to disk
561 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
562 f1da30e6 Michael Hanselmann
563 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
564 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
565 ac0930b9 Iustin Pop
566 85f03e0d Michael Hanselmann
    # Add to worker pool
567 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
568 85f03e0d Michael Hanselmann
569 85f03e0d Michael Hanselmann
    return job.id
570 f1da30e6 Michael Hanselmann
571 db37da70 Michael Hanselmann
  @_RequireOpenQueue
572 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
573 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
574 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
575 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
576 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
577 ac0930b9 Iustin Pop
578 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
579 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
580 dfe57c22 Michael Hanselmann
581 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
582 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
583 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
584 5c735209 Iustin Pop
                        timeout):
585 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
586 6c5a7090 Michael Hanselmann

587 6c5a7090 Michael Hanselmann
    @type job_id: string
588 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
589 6c5a7090 Michael Hanselmann
    @type fields: list of strings
590 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
591 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
592 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
593 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
594 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
595 5c735209 Iustin Pop
    @type timeout: float
596 5c735209 Iustin Pop
    @param timeout: maximum time to wait
597 6c5a7090 Michael Hanselmann

598 6c5a7090 Michael Hanselmann
    """
599 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
600 5c735209 Iustin Pop
    end_time = time.time() + timeout
601 dfe57c22 Michael Hanselmann
    while True:
602 5c735209 Iustin Pop
      delta_time = end_time - time.time()
603 5c735209 Iustin Pop
      if delta_time < 0:
604 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
605 5c735209 Iustin Pop
606 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
607 6c5a7090 Michael Hanselmann
      if not job:
608 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
609 6c5a7090 Michael Hanselmann
        break
610 dfe57c22 Michael Hanselmann
611 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
612 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
613 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
614 dfe57c22 Michael Hanselmann
615 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
616 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
617 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
618 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
619 dfe57c22 Michael Hanselmann
      # significantly different.
620 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
621 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
622 dfe57c22 Michael Hanselmann
623 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
624 6c5a7090 Michael Hanselmann
                        constants.JOB_STATUS_RUNNING):
625 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
626 6c5a7090 Michael Hanselmann
        # no changes.
627 dfe57c22 Michael Hanselmann
        break
628 dfe57c22 Michael Hanselmann
629 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
630 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
631 6c5a7090 Michael Hanselmann
        break
632 6c5a7090 Michael Hanselmann
633 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
634 6c5a7090 Michael Hanselmann
635 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
636 5c735209 Iustin Pop
      job.change.wait(delta_time)
637 dfe57c22 Michael Hanselmann
638 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
639 dfe57c22 Michael Hanselmann
640 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
641 dfe57c22 Michael Hanselmann
642 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
643 db37da70 Michael Hanselmann
  @_RequireOpenQueue
644 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
645 188c5e0a Michael Hanselmann
    """Cancels a job.
646 188c5e0a Michael Hanselmann

647 188c5e0a Michael Hanselmann
    @type job_id: string
648 188c5e0a Michael Hanselmann
    @param job_id: Job ID of job to be cancelled.
649 188c5e0a Michael Hanselmann

650 188c5e0a Michael Hanselmann
    """
651 188c5e0a Michael Hanselmann
    logging.debug("Cancelling job %s", job_id)
652 188c5e0a Michael Hanselmann
653 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
654 188c5e0a Michael Hanselmann
    if not job:
655 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
656 188c5e0a Michael Hanselmann
      return
657 188c5e0a Michael Hanselmann
658 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
659 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
660 188c5e0a Michael Hanselmann
      return
661 188c5e0a Michael Hanselmann
662 85f03e0d Michael Hanselmann
    try:
663 85f03e0d Michael Hanselmann
      for op in job.ops:
664 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
665 85f03e0d Michael Hanselmann
        op.result = "Job cancelled by request"
666 85f03e0d Michael Hanselmann
    finally:
667 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
668 188c5e0a Michael Hanselmann
669 c609f802 Michael Hanselmann
  @utils.LockedMethod
670 db37da70 Michael Hanselmann
  @_RequireOpenQueue
671 f1da30e6 Michael Hanselmann
  def ArchiveJob(self, job_id):
672 c609f802 Michael Hanselmann
    """Archives a job.
673 c609f802 Michael Hanselmann

674 c609f802 Michael Hanselmann
    @type job_id: string
675 c609f802 Michael Hanselmann
    @param job_id: Job ID of job to be archived.
676 c609f802 Michael Hanselmann

677 c609f802 Michael Hanselmann
    """
678 c609f802 Michael Hanselmann
    logging.debug("Archiving job %s", job_id)
679 c609f802 Michael Hanselmann
680 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
681 c609f802 Michael Hanselmann
    if not job:
682 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
683 c609f802 Michael Hanselmann
      return
684 c609f802 Michael Hanselmann
685 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
686 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_SUCCESS,
687 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_ERROR):
688 85f03e0d Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
689 c609f802 Michael Hanselmann
      return
690 c609f802 Michael Hanselmann
691 5685c1a5 Michael Hanselmann
    old = self._GetJobPath(job.id)
692 5685c1a5 Michael Hanselmann
    new = self._GetArchivedJobPath(job.id)
693 c609f802 Michael Hanselmann
694 5685c1a5 Michael Hanselmann
    self._RenameFileUnlocked(old, new)
695 c609f802 Michael Hanselmann
696 5685c1a5 Michael Hanselmann
    logging.debug("Successfully archived job %s", job.id)
697 f1da30e6 Michael Hanselmann
698 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
699 e2715f69 Michael Hanselmann
    row = []
700 e2715f69 Michael Hanselmann
    for fname in fields:
701 e2715f69 Michael Hanselmann
      if fname == "id":
702 e2715f69 Michael Hanselmann
        row.append(job.id)
703 e2715f69 Michael Hanselmann
      elif fname == "status":
704 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
705 af30b2fd Michael Hanselmann
      elif fname == "ops":
706 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
707 af30b2fd Michael Hanselmann
      elif fname == "opresult":
708 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
709 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
710 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
711 5b23c34c Iustin Pop
      elif fname == "oplog":
712 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
713 c56ec146 Iustin Pop
      elif fname == "opstart":
714 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
715 c56ec146 Iustin Pop
      elif fname == "opend":
716 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
717 c56ec146 Iustin Pop
      elif fname == "received_ts":
718 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
719 c56ec146 Iustin Pop
      elif fname == "start_ts":
720 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
721 c56ec146 Iustin Pop
      elif fname == "end_ts":
722 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
723 60dd1473 Iustin Pop
      elif fname == "summary":
724 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
725 e2715f69 Michael Hanselmann
      else:
726 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
727 e2715f69 Michael Hanselmann
    return row
728 e2715f69 Michael Hanselmann
729 85f03e0d Michael Hanselmann
  @utils.LockedMethod
730 db37da70 Michael Hanselmann
  @_RequireOpenQueue
731 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
732 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
733 e2715f69 Michael Hanselmann

734 e2715f69 Michael Hanselmann
    Args:
735 e2715f69 Michael Hanselmann
    - job_ids: Sequence of job identifiers or None for all
736 e2715f69 Michael Hanselmann
    - fields: Names of fields to return
737 e2715f69 Michael Hanselmann

738 e2715f69 Michael Hanselmann
    """
739 85f03e0d Michael Hanselmann
    jobs = []
740 e2715f69 Michael Hanselmann
741 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
742 85f03e0d Michael Hanselmann
      if job is None:
743 85f03e0d Michael Hanselmann
        jobs.append(None)
744 85f03e0d Michael Hanselmann
      else:
745 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
746 e2715f69 Michael Hanselmann
747 85f03e0d Michael Hanselmann
    return jobs
748 e2715f69 Michael Hanselmann
749 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
750 db37da70 Michael Hanselmann
  @_RequireOpenQueue
751 e2715f69 Michael Hanselmann
  def Shutdown(self):
752 e2715f69 Michael Hanselmann
    """Stops the job queue.
753 e2715f69 Michael Hanselmann

754 e2715f69 Michael Hanselmann
    """
755 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
756 85f03e0d Michael Hanselmann
757 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
758 04ab05ce Michael Hanselmann
    self._queue_lock = None