Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ a3811745

History | View | Annotate | Download (33.6 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 e2715f69 Michael Hanselmann
import threading
35 f1da30e6 Michael Hanselmann
import errno
36 f1da30e6 Michael Hanselmann
import re
37 f1048938 Iustin Pop
import time
38 5685c1a5 Michael Hanselmann
import weakref
39 498ae1cc Iustin Pop
40 e2715f69 Michael Hanselmann
from ganeti import constants
41 f1da30e6 Michael Hanselmann
from ganeti import serializer
42 e2715f69 Michael Hanselmann
from ganeti import workerpool
43 f1da30e6 Michael Hanselmann
from ganeti import opcodes
44 7a1ecaed Iustin Pop
from ganeti import errors
45 e2715f69 Michael Hanselmann
from ganeti import mcpu
46 7996a135 Iustin Pop
from ganeti import utils
47 04ab05ce Michael Hanselmann
from ganeti import jstore
48 c3f0a12f Iustin Pop
from ganeti import rpc
49 e2715f69 Michael Hanselmann
50 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
51 e2715f69 Michael Hanselmann
52 498ae1cc Iustin Pop
53 70552c46 Michael Hanselmann
def TimeStampNow():
54 ea03467c Iustin Pop
  """Returns the current timestamp.
55 ea03467c Iustin Pop

56 ea03467c Iustin Pop
  @rtype: tuple
57 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
58 ea03467c Iustin Pop

59 ea03467c Iustin Pop
  """
60 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
61 70552c46 Michael Hanselmann
62 70552c46 Michael Hanselmann
63 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
64 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
65 e2715f69 Michael Hanselmann

66 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
67 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
68 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
69 ea03467c Iustin Pop
  @ivar status: the current status
70 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
71 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
72 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
73 f1048938 Iustin Pop

74 e2715f69 Michael Hanselmann
  """
75 85f03e0d Michael Hanselmann
  def __init__(self, op):
76 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
77 ea03467c Iustin Pop

78 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
79 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
80 ea03467c Iustin Pop

81 ea03467c Iustin Pop
    """
82 85f03e0d Michael Hanselmann
    self.input = op
83 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
84 85f03e0d Michael Hanselmann
    self.result = None
85 85f03e0d Michael Hanselmann
    self.log = []
86 70552c46 Michael Hanselmann
    self.start_timestamp = None
87 70552c46 Michael Hanselmann
    self.end_timestamp = None
88 f1da30e6 Michael Hanselmann
89 f1da30e6 Michael Hanselmann
  @classmethod
90 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
91 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
92 ea03467c Iustin Pop

93 ea03467c Iustin Pop
    @type state: dict
94 ea03467c Iustin Pop
    @param state: the serialized state
95 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
96 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
97 ea03467c Iustin Pop

98 ea03467c Iustin Pop
    """
99 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
100 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
101 85f03e0d Michael Hanselmann
    obj.status = state["status"]
102 85f03e0d Michael Hanselmann
    obj.result = state["result"]
103 85f03e0d Michael Hanselmann
    obj.log = state["log"]
104 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
105 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
106 f1da30e6 Michael Hanselmann
    return obj
107 f1da30e6 Michael Hanselmann
108 f1da30e6 Michael Hanselmann
  def Serialize(self):
109 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
110 ea03467c Iustin Pop

111 ea03467c Iustin Pop
    @rtype: dict
112 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
113 ea03467c Iustin Pop

114 ea03467c Iustin Pop
    """
115 6c5a7090 Michael Hanselmann
    return {
116 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
117 6c5a7090 Michael Hanselmann
      "status": self.status,
118 6c5a7090 Michael Hanselmann
      "result": self.result,
119 6c5a7090 Michael Hanselmann
      "log": self.log,
120 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
121 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
122 6c5a7090 Michael Hanselmann
      }
123 f1048938 Iustin Pop
124 e2715f69 Michael Hanselmann
125 e2715f69 Michael Hanselmann
class _QueuedJob(object):
126 e2715f69 Michael Hanselmann
  """In-memory job representation.
127 e2715f69 Michael Hanselmann

128 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
129 ea03467c Iustin Pop
  be taken care of by users of this class.
130 ea03467c Iustin Pop

131 ea03467c Iustin Pop
  @type queue: L{JobQueue}
132 ea03467c Iustin Pop
  @ivar queue: the parent queue
133 ea03467c Iustin Pop
  @ivar id: the job ID
134 ea03467c Iustin Pop
  @type ops: list
135 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
136 ea03467c Iustin Pop
  @type run_op_index: int
137 ea03467c Iustin Pop
  @ivar run_op_index: the currently executing opcode, or -1 if
138 ea03467c Iustin Pop
      we didn't yet start executing
139 ea03467c Iustin Pop
  @type log_serial: int
140 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
141 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
142 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
143 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
144 ea03467c Iustin Pop
  @ivar change: a Condition variable we use for waiting for job changes
145 e2715f69 Michael Hanselmann

146 e2715f69 Michael Hanselmann
  """
147 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
148 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
149 ea03467c Iustin Pop

150 ea03467c Iustin Pop
    @type queue: L{JobQueue}
151 ea03467c Iustin Pop
    @param queue: our parent queue
152 ea03467c Iustin Pop
    @type job_id: job_id
153 ea03467c Iustin Pop
    @param job_id: our job id
154 ea03467c Iustin Pop
    @type ops: list
155 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
156 ea03467c Iustin Pop
        in _QueuedOpCodes
157 ea03467c Iustin Pop

158 ea03467c Iustin Pop
    """
159 e2715f69 Michael Hanselmann
    if not ops:
160 ea03467c Iustin Pop
      # TODO: use a better exception
161 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
162 e2715f69 Michael Hanselmann
163 85f03e0d Michael Hanselmann
    self.queue = queue
164 f1da30e6 Michael Hanselmann
    self.id = job_id
165 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
166 85f03e0d Michael Hanselmann
    self.run_op_index = -1
167 6c5a7090 Michael Hanselmann
    self.log_serial = 0
168 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
169 c56ec146 Iustin Pop
    self.start_timestamp = None
170 c56ec146 Iustin Pop
    self.end_timestamp = None
171 6c5a7090 Michael Hanselmann
172 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
173 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
174 f1da30e6 Michael Hanselmann
175 f1da30e6 Michael Hanselmann
  @classmethod
176 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
177 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
178 ea03467c Iustin Pop

179 ea03467c Iustin Pop
    @type queue: L{JobQueue}
180 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
181 ea03467c Iustin Pop
    @type state: dict
182 ea03467c Iustin Pop
    @param state: the serialized state
183 ea03467c Iustin Pop
    @rtype: _JobQueue
184 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
185 ea03467c Iustin Pop

186 ea03467c Iustin Pop
    """
187 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
188 85f03e0d Michael Hanselmann
    obj.queue = queue
189 85f03e0d Michael Hanselmann
    obj.id = state["id"]
190 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
191 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
192 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
193 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
194 6c5a7090 Michael Hanselmann
195 6c5a7090 Michael Hanselmann
    obj.ops = []
196 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
197 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
198 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
199 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
200 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
201 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
202 6c5a7090 Michael Hanselmann
203 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
204 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
205 6c5a7090 Michael Hanselmann
206 f1da30e6 Michael Hanselmann
    return obj
207 f1da30e6 Michael Hanselmann
208 f1da30e6 Michael Hanselmann
  def Serialize(self):
209 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
210 ea03467c Iustin Pop

211 ea03467c Iustin Pop
    @rtype: dict
212 ea03467c Iustin Pop
    @return: the serialized state
213 ea03467c Iustin Pop

214 ea03467c Iustin Pop
    """
215 f1da30e6 Michael Hanselmann
    return {
216 f1da30e6 Michael Hanselmann
      "id": self.id,
217 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
218 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
219 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
220 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
221 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
222 f1da30e6 Michael Hanselmann
      }
223 f1da30e6 Michael Hanselmann
224 85f03e0d Michael Hanselmann
  def CalcStatus(self):
225 ea03467c Iustin Pop
    """Compute the status of this job.
226 ea03467c Iustin Pop

227 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
228 ea03467c Iustin Pop
    based on their status, computes the job status.
229 ea03467c Iustin Pop

230 ea03467c Iustin Pop
    The algorithm is:
231 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
232 ea03467c Iustin Pop
        status will be the same
233 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
234 ea03467c Iustin Pop
          - waitlock
235 ea03467c Iustin Pop
          - running
236 ea03467c Iustin Pop

237 ea03467c Iustin Pop
        will determine the job status
238 ea03467c Iustin Pop

239 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
240 ea03467c Iustin Pop
        and the job status will be the same
241 ea03467c Iustin Pop

242 ea03467c Iustin Pop
    @return: the job status
243 ea03467c Iustin Pop

244 ea03467c Iustin Pop
    """
245 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
246 e2715f69 Michael Hanselmann
247 e2715f69 Michael Hanselmann
    all_success = True
248 85f03e0d Michael Hanselmann
    for op in self.ops:
249 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
250 e2715f69 Michael Hanselmann
        continue
251 e2715f69 Michael Hanselmann
252 e2715f69 Michael Hanselmann
      all_success = False
253 e2715f69 Michael Hanselmann
254 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
255 e2715f69 Michael Hanselmann
        pass
256 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
257 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
258 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
259 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
260 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
261 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
262 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
263 f1da30e6 Michael Hanselmann
        break
264 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
265 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
266 4cb1d919 Michael Hanselmann
        break
267 e2715f69 Michael Hanselmann
268 e2715f69 Michael Hanselmann
    if all_success:
269 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
270 e2715f69 Michael Hanselmann
271 e2715f69 Michael Hanselmann
    return status
272 e2715f69 Michael Hanselmann
273 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
274 ea03467c Iustin Pop
    """Selectively returns the log entries.
275 ea03467c Iustin Pop

276 ea03467c Iustin Pop
    @type newer_than: None or int
277 ea03467c Iustin Pop
    @param newer_than: if this is None, return all log enties,
278 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
279 ea03467c Iustin Pop
        than this value
280 ea03467c Iustin Pop
    @rtype: list
281 ea03467c Iustin Pop
    @return: the list of the log entries selected
282 ea03467c Iustin Pop

283 ea03467c Iustin Pop
    """
284 6c5a7090 Michael Hanselmann
    if newer_than is None:
285 6c5a7090 Michael Hanselmann
      serial = -1
286 6c5a7090 Michael Hanselmann
    else:
287 6c5a7090 Michael Hanselmann
      serial = newer_than
288 6c5a7090 Michael Hanselmann
289 6c5a7090 Michael Hanselmann
    entries = []
290 6c5a7090 Michael Hanselmann
    for op in self.ops:
291 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
292 6c5a7090 Michael Hanselmann
293 6c5a7090 Michael Hanselmann
    return entries
294 6c5a7090 Michael Hanselmann
295 f1048938 Iustin Pop
296 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
297 ea03467c Iustin Pop
  """The actual job workers.
298 ea03467c Iustin Pop

299 ea03467c Iustin Pop
  """
300 e92376d7 Iustin Pop
  def _NotifyStart(self):
301 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
302 e92376d7 Iustin Pop

303 e92376d7 Iustin Pop
    This is called from the mcpu code as a notifier function, when the
304 e92376d7 Iustin Pop
    LU is finally about to start the Exec() method. Of course, to have
305 e92376d7 Iustin Pop
    end-user visible results, the opcode must be initially (before
306 e92376d7 Iustin Pop
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
307 e92376d7 Iustin Pop

308 e92376d7 Iustin Pop
    """
309 e92376d7 Iustin Pop
    assert self.queue, "Queue attribute is missing"
310 e92376d7 Iustin Pop
    assert self.opcode, "Opcode attribute is missing"
311 e92376d7 Iustin Pop
312 e92376d7 Iustin Pop
    self.queue.acquire()
313 e92376d7 Iustin Pop
    try:
314 e92376d7 Iustin Pop
      self.opcode.status = constants.OP_STATUS_RUNNING
315 e92376d7 Iustin Pop
    finally:
316 e92376d7 Iustin Pop
      self.queue.release()
317 e92376d7 Iustin Pop
318 85f03e0d Michael Hanselmann
  def RunTask(self, job):
319 e2715f69 Michael Hanselmann
    """Job executor.
320 e2715f69 Michael Hanselmann

321 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
322 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
323 e2715f69 Michael Hanselmann

324 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
325 ea03467c Iustin Pop
    @param job: the job to be processed
326 ea03467c Iustin Pop

327 e2715f69 Michael Hanselmann
    """
328 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
329 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
330 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
331 e92376d7 Iustin Pop
    self.queue = queue = job.queue
332 e2715f69 Michael Hanselmann
    try:
333 85f03e0d Michael Hanselmann
      try:
334 85f03e0d Michael Hanselmann
        count = len(job.ops)
335 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
336 85f03e0d Michael Hanselmann
          try:
337 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
338 85f03e0d Michael Hanselmann
339 85f03e0d Michael Hanselmann
            queue.acquire()
340 85f03e0d Michael Hanselmann
            try:
341 85f03e0d Michael Hanselmann
              job.run_op_index = idx
342 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
343 85f03e0d Michael Hanselmann
              op.result = None
344 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
345 c56ec146 Iustin Pop
              if idx == 0: # first opcode
346 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
347 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
348 85f03e0d Michael Hanselmann
349 38206f3c Iustin Pop
              input_opcode = op.input
350 85f03e0d Michael Hanselmann
            finally:
351 85f03e0d Michael Hanselmann
              queue.release()
352 85f03e0d Michael Hanselmann
353 dfe57c22 Michael Hanselmann
            def _Log(*args):
354 6c5a7090 Michael Hanselmann
              """Append a log entry.
355 6c5a7090 Michael Hanselmann

356 6c5a7090 Michael Hanselmann
              """
357 6c5a7090 Michael Hanselmann
              assert len(args) < 3
358 6c5a7090 Michael Hanselmann
359 6c5a7090 Michael Hanselmann
              if len(args) == 1:
360 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
361 6c5a7090 Michael Hanselmann
                log_msg = args[0]
362 6c5a7090 Michael Hanselmann
              else:
363 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
364 6c5a7090 Michael Hanselmann
365 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
366 6c5a7090 Michael Hanselmann
              # precision.
367 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
368 dfe57c22 Michael Hanselmann
369 6c5a7090 Michael Hanselmann
              queue.acquire()
370 dfe57c22 Michael Hanselmann
              try:
371 6c5a7090 Michael Hanselmann
                job.log_serial += 1
372 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
373 6c5a7090 Michael Hanselmann
374 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
375 dfe57c22 Michael Hanselmann
              finally:
376 6c5a7090 Michael Hanselmann
                queue.release()
377 dfe57c22 Michael Hanselmann
378 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
379 e92376d7 Iustin Pop
            self.opcode = op
380 e92376d7 Iustin Pop
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
381 85f03e0d Michael Hanselmann
382 85f03e0d Michael Hanselmann
            queue.acquire()
383 85f03e0d Michael Hanselmann
            try:
384 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
385 85f03e0d Michael Hanselmann
              op.result = result
386 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
387 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
388 85f03e0d Michael Hanselmann
            finally:
389 85f03e0d Michael Hanselmann
              queue.release()
390 85f03e0d Michael Hanselmann
391 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
392 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
393 85f03e0d Michael Hanselmann
          except Exception, err:
394 85f03e0d Michael Hanselmann
            queue.acquire()
395 85f03e0d Michael Hanselmann
            try:
396 85f03e0d Michael Hanselmann
              try:
397 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
398 85f03e0d Michael Hanselmann
                op.result = str(err)
399 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
400 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
401 85f03e0d Michael Hanselmann
              finally:
402 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
403 85f03e0d Michael Hanselmann
            finally:
404 85f03e0d Michael Hanselmann
              queue.release()
405 85f03e0d Michael Hanselmann
            raise
406 85f03e0d Michael Hanselmann
407 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
408 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
409 85f03e0d Michael Hanselmann
      except:
410 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
411 e2715f69 Michael Hanselmann
    finally:
412 85f03e0d Michael Hanselmann
      queue.acquire()
413 85f03e0d Michael Hanselmann
      try:
414 65548ed5 Michael Hanselmann
        try:
415 65548ed5 Michael Hanselmann
          job.run_op_idx = -1
416 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
417 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
418 65548ed5 Michael Hanselmann
        finally:
419 65548ed5 Michael Hanselmann
          job_id = job.id
420 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
421 85f03e0d Michael Hanselmann
      finally:
422 85f03e0d Michael Hanselmann
        queue.release()
423 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
424 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
425 e2715f69 Michael Hanselmann
426 e2715f69 Michael Hanselmann
427 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
428 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
429 ea03467c Iustin Pop

430 ea03467c Iustin Pop
  """
431 5bdce580 Michael Hanselmann
  def __init__(self, queue):
432 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
433 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
434 5bdce580 Michael Hanselmann
    self.queue = queue
435 e2715f69 Michael Hanselmann
436 e2715f69 Michael Hanselmann
437 85f03e0d Michael Hanselmann
class JobQueue(object):
438 ea03467c Iustin Pop
  """Quue used to manaage the jobs.
439 ea03467c Iustin Pop

440 ea03467c Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
441 ea03467c Iustin Pop

442 ea03467c Iustin Pop
  """
443 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
444 f1da30e6 Michael Hanselmann
445 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
446 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
447 db37da70 Michael Hanselmann

448 ea03467c Iustin Pop
    This function should be used for all 'public' functions. That is,
449 ea03467c Iustin Pop
    functions usually called from other classes.
450 db37da70 Michael Hanselmann

451 ea03467c Iustin Pop
    @warning: Use this decorator only after utils.LockedMethod!
452 db37da70 Michael Hanselmann

453 ea03467c Iustin Pop
    Example::
454 db37da70 Michael Hanselmann
      @utils.LockedMethod
455 db37da70 Michael Hanselmann
      @_RequireOpenQueue
456 db37da70 Michael Hanselmann
      def Example(self):
457 db37da70 Michael Hanselmann
        pass
458 db37da70 Michael Hanselmann

459 db37da70 Michael Hanselmann
    """
460 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
461 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
462 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
463 db37da70 Michael Hanselmann
    return wrapper
464 db37da70 Michael Hanselmann
465 85f03e0d Michael Hanselmann
  def __init__(self, context):
466 ea03467c Iustin Pop
    """Constructor for JobQueue.
467 ea03467c Iustin Pop

468 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
469 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
470 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
471 ea03467c Iustin Pop
    running).
472 ea03467c Iustin Pop

473 ea03467c Iustin Pop
    @type context: GanetiContext
474 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
475 ea03467c Iustin Pop
        data and other ganeti objects
476 ea03467c Iustin Pop

477 ea03467c Iustin Pop
    """
478 5bdce580 Michael Hanselmann
    self.context = context
479 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
480 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
481 f1da30e6 Michael Hanselmann
482 85f03e0d Michael Hanselmann
    # Locking
483 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
484 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
485 85f03e0d Michael Hanselmann
    self.release = self._lock.release
486 85f03e0d Michael Hanselmann
487 04ab05ce Michael Hanselmann
    # Initialize
488 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
489 f1da30e6 Michael Hanselmann
490 04ab05ce Michael Hanselmann
    # Read serial file
491 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
492 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
493 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
494 c4beba1c Iustin Pop
495 23752136 Michael Hanselmann
    # Get initial list of nodes
496 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
497 99aabbed Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values())
498 8e00939c Michael Hanselmann
499 8e00939c Michael Hanselmann
    # Remove master node
500 8e00939c Michael Hanselmann
    try:
501 99aabbed Iustin Pop
      del self._nodes[self._my_hostname]
502 8e00939c Michael Hanselmann
    except ValueError:
503 8e00939c Michael Hanselmann
      pass
504 23752136 Michael Hanselmann
505 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
506 23752136 Michael Hanselmann
507 85f03e0d Michael Hanselmann
    # Setup worker pool
508 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
509 85f03e0d Michael Hanselmann
510 85f03e0d Michael Hanselmann
    # We need to lock here because WorkerPool.AddTask() may start a job while
511 85f03e0d Michael Hanselmann
    # we're still doing our work.
512 85f03e0d Michael Hanselmann
    self.acquire()
513 85f03e0d Michael Hanselmann
    try:
514 85f03e0d Michael Hanselmann
      for job in self._GetJobsUnlocked(None):
515 94ed59a5 Iustin Pop
        # a failure in loading the job can cause 'None' to be returned
516 94ed59a5 Iustin Pop
        if job is None:
517 94ed59a5 Iustin Pop
          continue
518 94ed59a5 Iustin Pop
519 85f03e0d Michael Hanselmann
        status = job.CalcStatus()
520 85f03e0d Michael Hanselmann
521 85f03e0d Michael Hanselmann
        if status in (constants.JOB_STATUS_QUEUED, ):
522 85f03e0d Michael Hanselmann
          self._wpool.AddTask(job)
523 85f03e0d Michael Hanselmann
524 e92376d7 Iustin Pop
        elif status in (constants.JOB_STATUS_RUNNING,
525 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
526 85f03e0d Michael Hanselmann
          logging.warning("Unfinished job %s found: %s", job.id, job)
527 85f03e0d Michael Hanselmann
          try:
528 85f03e0d Michael Hanselmann
            for op in job.ops:
529 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_ERROR
530 85f03e0d Michael Hanselmann
              op.result = "Unclean master daemon shutdown"
531 85f03e0d Michael Hanselmann
          finally:
532 85f03e0d Michael Hanselmann
            self.UpdateJobUnlocked(job)
533 85f03e0d Michael Hanselmann
    finally:
534 85f03e0d Michael Hanselmann
      self.release()
535 85f03e0d Michael Hanselmann
536 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
537 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
538 99aabbed Iustin Pop
  def AddNode(self, node):
539 99aabbed Iustin Pop
    """Register a new node with the queue.
540 99aabbed Iustin Pop

541 99aabbed Iustin Pop
    @type node: L{objects.Node}
542 99aabbed Iustin Pop
    @param node: the node object to be added
543 99aabbed Iustin Pop

544 99aabbed Iustin Pop
    """
545 99aabbed Iustin Pop
    node_name = node.name
546 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
547 23752136 Michael Hanselmann
548 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
549 a3811745 Michael Hanselmann
    rpc.RpcRunner.call_jobqueue_purge(node_name)
550 23752136 Michael Hanselmann
551 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
552 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
553 23752136 Michael Hanselmann
554 d2e03a33 Michael Hanselmann
    # Upload current serial file
555 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
556 d2e03a33 Michael Hanselmann
557 d2e03a33 Michael Hanselmann
    for file_name in files:
558 9f774ee8 Michael Hanselmann
      # Read file content
559 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
560 9f774ee8 Michael Hanselmann
      try:
561 9f774ee8 Michael Hanselmann
        content = fd.read()
562 9f774ee8 Michael Hanselmann
      finally:
563 9f774ee8 Michael Hanselmann
        fd.close()
564 9f774ee8 Michael Hanselmann
565 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
566 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
567 a3811745 Michael Hanselmann
                                                  file_name, content)
568 d2e03a33 Michael Hanselmann
      if not result[node_name]:
569 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
570 d2e03a33 Michael Hanselmann
571 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
572 d2e03a33 Michael Hanselmann
573 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
574 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
575 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
576 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
577 ea03467c Iustin Pop

578 ea03467c Iustin Pop
    @type node_name: str
579 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
580 ea03467c Iustin Pop

581 ea03467c Iustin Pop
    """
582 23752136 Michael Hanselmann
    try:
583 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
584 99aabbed Iustin Pop
      del self._nodes[node_name]
585 d2e03a33 Michael Hanselmann
    except KeyError:
586 23752136 Michael Hanselmann
      pass
587 23752136 Michael Hanselmann
588 e74798c1 Michael Hanselmann
  def _CheckRpcResult(self, result, nodes, failmsg):
589 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
590 ea03467c Iustin Pop

591 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
592 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
593 ea03467c Iustin Pop
    log the case when more than half of the nodes failes.
594 ea03467c Iustin Pop

595 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
596 ea03467c Iustin Pop
    @type nodes: list
597 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
598 ea03467c Iustin Pop
    @type failmsg: str
599 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
600 ea03467c Iustin Pop

601 ea03467c Iustin Pop
    """
602 e74798c1 Michael Hanselmann
    failed = []
603 e74798c1 Michael Hanselmann
    success = []
604 e74798c1 Michael Hanselmann
605 e74798c1 Michael Hanselmann
    for node in nodes:
606 e74798c1 Michael Hanselmann
      if result[node]:
607 e74798c1 Michael Hanselmann
        success.append(node)
608 e74798c1 Michael Hanselmann
      else:
609 e74798c1 Michael Hanselmann
        failed.append(node)
610 e74798c1 Michael Hanselmann
611 e74798c1 Michael Hanselmann
    if failed:
612 e74798c1 Michael Hanselmann
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
613 e74798c1 Michael Hanselmann
614 e74798c1 Michael Hanselmann
    # +1 for the master node
615 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
616 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
617 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
618 e74798c1 Michael Hanselmann
619 99aabbed Iustin Pop
  def _GetNodeIp(self):
620 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
621 99aabbed Iustin Pop

622 ea03467c Iustin Pop
    @rtype: (list, list)
623 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
624 ea03467c Iustin Pop
        names and the second one with the node addresses
625 ea03467c Iustin Pop

626 99aabbed Iustin Pop
    """
627 99aabbed Iustin Pop
    name_list = self._nodes.keys()
628 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
629 99aabbed Iustin Pop
    return name_list, addr_list
630 99aabbed Iustin Pop
631 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
632 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
633 8e00939c Michael Hanselmann

634 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
635 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
636 ea03467c Iustin Pop

637 ea03467c Iustin Pop
    @type file_name: str
638 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
639 ea03467c Iustin Pop
    @type data: str
640 ea03467c Iustin Pop
    @param data: the new contents of the file
641 ea03467c Iustin Pop

642 8e00939c Michael Hanselmann
    """
643 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
644 8e00939c Michael Hanselmann
645 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
646 a3811745 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
647 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
648 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
649 23752136 Michael Hanselmann
650 abc1f2ce Michael Hanselmann
  def _RenameFileUnlocked(self, old, new):
651 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
652 ea03467c Iustin Pop

653 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
654 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
655 ea03467c Iustin Pop

656 ea03467c Iustin Pop
    @type old: str
657 ea03467c Iustin Pop
    @param old: the current name of the file
658 ea03467c Iustin Pop
    @type new: str
659 ea03467c Iustin Pop
    @param new: the new name of the file
660 ea03467c Iustin Pop

661 ea03467c Iustin Pop
    """
662 abc1f2ce Michael Hanselmann
    os.rename(old, new)
663 abc1f2ce Michael Hanselmann
664 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
665 a3811745 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
666 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
667 e74798c1 Michael Hanselmann
                         "Moving %s to %s" % (old, new))
668 abc1f2ce Michael Hanselmann
669 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
670 ea03467c Iustin Pop
    """Convert a job ID to string format.
671 ea03467c Iustin Pop

672 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
673 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
674 ea03467c Iustin Pop
    abstract this change.
675 ea03467c Iustin Pop

676 ea03467c Iustin Pop
    @type job_id: int or long
677 ea03467c Iustin Pop
    @param job_id: the numeric job id
678 ea03467c Iustin Pop
    @rtype: str
679 ea03467c Iustin Pop
    @return: the formatted job id
680 ea03467c Iustin Pop

681 ea03467c Iustin Pop
    """
682 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
683 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
684 85f03e0d Michael Hanselmann
    if job_id < 0:
685 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
686 85f03e0d Michael Hanselmann
687 85f03e0d Michael Hanselmann
    return str(job_id)
688 85f03e0d Michael Hanselmann
689 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
690 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
691 f1da30e6 Michael Hanselmann

692 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
693 f1da30e6 Michael Hanselmann

694 ea03467c Iustin Pop
    @rtype: str
695 ea03467c Iustin Pop
    @return: a string representing the job identifier.
696 f1da30e6 Michael Hanselmann

697 f1da30e6 Michael Hanselmann
    """
698 f1da30e6 Michael Hanselmann
    # New number
699 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
700 f1da30e6 Michael Hanselmann
701 f1da30e6 Michael Hanselmann
    # Write to file
702 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
703 23752136 Michael Hanselmann
                                        "%s\n" % serial)
704 f1da30e6 Michael Hanselmann
705 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
706 f1da30e6 Michael Hanselmann
    self._last_serial = serial
707 f1da30e6 Michael Hanselmann
708 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
709 f1da30e6 Michael Hanselmann
710 85f03e0d Michael Hanselmann
  @staticmethod
711 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
712 ea03467c Iustin Pop
    """Returns the job file for a given job id.
713 ea03467c Iustin Pop

714 ea03467c Iustin Pop
    @type job_id: str
715 ea03467c Iustin Pop
    @param job_id: the job identifier
716 ea03467c Iustin Pop
    @rtype: str
717 ea03467c Iustin Pop
    @return: the path to the job file
718 ea03467c Iustin Pop

719 ea03467c Iustin Pop
    """
720 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
721 f1da30e6 Michael Hanselmann
722 85f03e0d Michael Hanselmann
  @staticmethod
723 85f03e0d Michael Hanselmann
  def _GetArchivedJobPath(job_id):
724 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
725 ea03467c Iustin Pop

726 ea03467c Iustin Pop
    @type job_id: str
727 ea03467c Iustin Pop
    @param job_id: the job identifier
728 ea03467c Iustin Pop
    @rtype: str
729 ea03467c Iustin Pop
    @return: the path to the archived job file
730 ea03467c Iustin Pop

731 ea03467c Iustin Pop
    """
732 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
733 0cb94105 Michael Hanselmann
734 85f03e0d Michael Hanselmann
  @classmethod
735 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
736 ea03467c Iustin Pop
    """Extract the job id from a filename.
737 ea03467c Iustin Pop

738 ea03467c Iustin Pop
    @type name: str
739 ea03467c Iustin Pop
    @param name: the job filename
740 ea03467c Iustin Pop
    @rtype: job id or None
741 ea03467c Iustin Pop
    @return: the job id corresponding to the given filename,
742 ea03467c Iustin Pop
        or None if the filename does not represent a valid
743 ea03467c Iustin Pop
        job file
744 ea03467c Iustin Pop

745 ea03467c Iustin Pop
    """
746 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
747 fae737ac Michael Hanselmann
    if m:
748 fae737ac Michael Hanselmann
      return m.group(1)
749 fae737ac Michael Hanselmann
    else:
750 fae737ac Michael Hanselmann
      return None
751 fae737ac Michael Hanselmann
752 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
753 911a495b Iustin Pop
    """Return all known job IDs.
754 911a495b Iustin Pop

755 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
756 911a495b Iustin Pop
    included. Currently this argument is unused.
757 911a495b Iustin Pop

758 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
759 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
760 ac0930b9 Iustin Pop
    extra IDs).
761 ac0930b9 Iustin Pop

762 ea03467c Iustin Pop
    @rtype: list
763 ea03467c Iustin Pop
    @return: the list of job IDs
764 ea03467c Iustin Pop

765 911a495b Iustin Pop
    """
766 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
767 3b87986e Iustin Pop
    jlist = utils.NiceSort(jlist)
768 f0d874fe Iustin Pop
    return jlist
769 911a495b Iustin Pop
770 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
771 ea03467c Iustin Pop
    """Returns the list of current job files.
772 ea03467c Iustin Pop

773 ea03467c Iustin Pop
    @rtype: list
774 ea03467c Iustin Pop
    @return: the list of job file names
775 ea03467c Iustin Pop

776 ea03467c Iustin Pop
    """
777 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
778 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
779 f1da30e6 Michael Hanselmann
780 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
781 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
782 ea03467c Iustin Pop

783 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
784 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
785 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
786 ea03467c Iustin Pop

787 ea03467c Iustin Pop
    @param job_id: the job id
788 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
789 ea03467c Iustin Pop
    @return: either None or the job object
790 ea03467c Iustin Pop

791 ea03467c Iustin Pop
    """
792 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
793 5685c1a5 Michael Hanselmann
    if job:
794 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
795 5685c1a5 Michael Hanselmann
      return job
796 ac0930b9 Iustin Pop
797 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
798 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
799 f1da30e6 Michael Hanselmann
    try:
800 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
801 f1da30e6 Michael Hanselmann
    except IOError, err:
802 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
803 f1da30e6 Michael Hanselmann
        return None
804 f1da30e6 Michael Hanselmann
      raise
805 f1da30e6 Michael Hanselmann
    try:
806 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
807 f1da30e6 Michael Hanselmann
    finally:
808 f1da30e6 Michael Hanselmann
      fd.close()
809 f1da30e6 Michael Hanselmann
810 94ed59a5 Iustin Pop
    try:
811 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
812 94ed59a5 Iustin Pop
    except Exception, err:
813 94ed59a5 Iustin Pop
      new_path = self._GetArchivedJobPath(job_id)
814 94ed59a5 Iustin Pop
      if filepath == new_path:
815 94ed59a5 Iustin Pop
        # job already archived (future case)
816 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s", job_id)
817 94ed59a5 Iustin Pop
      else:
818 94ed59a5 Iustin Pop
        # non-archived case
819 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s, will archive.", job_id)
820 94ed59a5 Iustin Pop
        self._RenameFileUnlocked(filepath, new_path)
821 94ed59a5 Iustin Pop
      return None
822 94ed59a5 Iustin Pop
823 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
824 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
825 ac0930b9 Iustin Pop
    return job
826 f1da30e6 Michael Hanselmann
827 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
828 ea03467c Iustin Pop
    """Return a list of jobs based on their IDs.
829 ea03467c Iustin Pop

830 ea03467c Iustin Pop
    @type job_ids: list
831 ea03467c Iustin Pop
    @param job_ids: either an empty list (meaning all jobs),
832 ea03467c Iustin Pop
        or a list of job IDs
833 ea03467c Iustin Pop
    @rtype: list
834 ea03467c Iustin Pop
    @return: the list of job objects
835 ea03467c Iustin Pop

836 ea03467c Iustin Pop
    """
837 911a495b Iustin Pop
    if not job_ids:
838 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
839 f1da30e6 Michael Hanselmann
840 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
841 f1da30e6 Michael Hanselmann
842 686d7433 Iustin Pop
  @staticmethod
843 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
844 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
845 686d7433 Iustin Pop

846 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
847 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
848 686d7433 Iustin Pop

849 ea03467c Iustin Pop
    @rtype: boolean
850 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
851 ea03467c Iustin Pop

852 686d7433 Iustin Pop
    """
853 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
854 686d7433 Iustin Pop
855 3ccafd0e Iustin Pop
  @staticmethod
856 3ccafd0e Iustin Pop
  def SetDrainFlag(drain_flag):
857 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
858 3ccafd0e Iustin Pop

859 3ccafd0e Iustin Pop
    This is similar to the function L{backend.JobQueueSetDrainFlag},
860 3ccafd0e Iustin Pop
    and in the future we might merge them.
861 3ccafd0e Iustin Pop

862 ea03467c Iustin Pop
    @type drain_flag: boolean
863 ea03467c Iustin Pop
    @param drain_flag: wheter to set or unset the drain flag
864 ea03467c Iustin Pop

865 3ccafd0e Iustin Pop
    """
866 3ccafd0e Iustin Pop
    if drain_flag:
867 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
868 3ccafd0e Iustin Pop
    else:
869 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
870 3ccafd0e Iustin Pop
    return True
871 3ccafd0e Iustin Pop
872 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
873 db37da70 Michael Hanselmann
  @_RequireOpenQueue
874 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
875 85f03e0d Michael Hanselmann
    """Create and store a new job.
876 f1da30e6 Michael Hanselmann

877 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
878 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
879 c3f0a12f Iustin Pop

880 c3f0a12f Iustin Pop
    @type ops: list
881 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
882 ea03467c Iustin Pop
    @rtype: job ID
883 ea03467c Iustin Pop
    @return: the job ID of the newly created job
884 ea03467c Iustin Pop
    @raise errors.JobQueueDrainError: if the job is marked for draining
885 c3f0a12f Iustin Pop

886 c3f0a12f Iustin Pop
    """
887 686d7433 Iustin Pop
    if self._IsQueueMarkedDrain():
888 686d7433 Iustin Pop
      raise errors.JobQueueDrainError()
889 f1da30e6 Michael Hanselmann
    # Get job identifier
890 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
891 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
892 f1da30e6 Michael Hanselmann
893 f1da30e6 Michael Hanselmann
    # Write to disk
894 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
895 f1da30e6 Michael Hanselmann
896 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
897 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
898 ac0930b9 Iustin Pop
899 85f03e0d Michael Hanselmann
    # Add to worker pool
900 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
901 85f03e0d Michael Hanselmann
902 85f03e0d Michael Hanselmann
    return job.id
903 f1da30e6 Michael Hanselmann
904 db37da70 Michael Hanselmann
  @_RequireOpenQueue
905 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
906 ea03467c Iustin Pop
    """Update a job's on disk storage.
907 ea03467c Iustin Pop

908 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
909 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
910 ea03467c Iustin Pop
    nodes.
911 ea03467c Iustin Pop

912 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
913 ea03467c Iustin Pop
    @param job: the changed job
914 ea03467c Iustin Pop

915 ea03467c Iustin Pop
    """
916 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
917 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
918 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
919 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
920 ac0930b9 Iustin Pop
921 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
922 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
923 dfe57c22 Michael Hanselmann
924 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
925 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
926 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
927 5c735209 Iustin Pop
                        timeout):
928 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
929 6c5a7090 Michael Hanselmann

930 6c5a7090 Michael Hanselmann
    @type job_id: string
931 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
932 6c5a7090 Michael Hanselmann
    @type fields: list of strings
933 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
934 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
935 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
936 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
937 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
938 5c735209 Iustin Pop
    @type timeout: float
939 5c735209 Iustin Pop
    @param timeout: maximum time to wait
940 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
941 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
942 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
943 ea03467c Iustin Pop

944 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
945 ea03467c Iustin Pop
        we instead return a special value,
946 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
947 ea03467c Iustin Pop
        as such by the clients
948 6c5a7090 Michael Hanselmann

949 6c5a7090 Michael Hanselmann
    """
950 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
951 5c735209 Iustin Pop
    end_time = time.time() + timeout
952 dfe57c22 Michael Hanselmann
    while True:
953 5c735209 Iustin Pop
      delta_time = end_time - time.time()
954 5c735209 Iustin Pop
      if delta_time < 0:
955 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
956 5c735209 Iustin Pop
957 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
958 6c5a7090 Michael Hanselmann
      if not job:
959 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
960 6c5a7090 Michael Hanselmann
        break
961 dfe57c22 Michael Hanselmann
962 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
963 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
964 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
965 dfe57c22 Michael Hanselmann
966 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
967 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
968 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
969 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
970 dfe57c22 Michael Hanselmann
      # significantly different.
971 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
972 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
973 dfe57c22 Michael Hanselmann
974 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
975 e92376d7 Iustin Pop
                        constants.JOB_STATUS_RUNNING,
976 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
977 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
978 6c5a7090 Michael Hanselmann
        # no changes.
979 dfe57c22 Michael Hanselmann
        break
980 dfe57c22 Michael Hanselmann
981 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
982 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
983 6c5a7090 Michael Hanselmann
        break
984 6c5a7090 Michael Hanselmann
985 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
986 6c5a7090 Michael Hanselmann
987 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
988 5c735209 Iustin Pop
      job.change.wait(delta_time)
989 dfe57c22 Michael Hanselmann
990 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
991 dfe57c22 Michael Hanselmann
992 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
993 dfe57c22 Michael Hanselmann
994 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
995 db37da70 Michael Hanselmann
  @_RequireOpenQueue
996 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
997 188c5e0a Michael Hanselmann
    """Cancels a job.
998 188c5e0a Michael Hanselmann

999 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1000 ea03467c Iustin Pop

1001 188c5e0a Michael Hanselmann
    @type job_id: string
1002 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1003 188c5e0a Michael Hanselmann

1004 188c5e0a Michael Hanselmann
    """
1005 188c5e0a Michael Hanselmann
    logging.debug("Cancelling job %s", job_id)
1006 188c5e0a Michael Hanselmann
1007 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1008 188c5e0a Michael Hanselmann
    if not job:
1009 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1010 188c5e0a Michael Hanselmann
      return
1011 188c5e0a Michael Hanselmann
1012 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
1013 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
1014 188c5e0a Michael Hanselmann
      return
1015 188c5e0a Michael Hanselmann
1016 85f03e0d Michael Hanselmann
    try:
1017 85f03e0d Michael Hanselmann
      for op in job.ops:
1018 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1019 85f03e0d Michael Hanselmann
        op.result = "Job cancelled by request"
1020 85f03e0d Michael Hanselmann
    finally:
1021 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
1022 188c5e0a Michael Hanselmann
1023 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1024 07cd723a Iustin Pop
  def _ArchiveJobUnlocked(self, job_id):
1025 c609f802 Michael Hanselmann
    """Archives a job.
1026 c609f802 Michael Hanselmann

1027 c609f802 Michael Hanselmann
    @type job_id: string
1028 c609f802 Michael Hanselmann
    @param job_id: Job ID of job to be archived.
1029 c609f802 Michael Hanselmann

1030 c609f802 Michael Hanselmann
    """
1031 07cd723a Iustin Pop
    logging.info("Archiving job %s", job_id)
1032 c609f802 Michael Hanselmann
1033 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1034 c609f802 Michael Hanselmann
    if not job:
1035 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1036 c609f802 Michael Hanselmann
      return
1037 c609f802 Michael Hanselmann
1038 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1039 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_SUCCESS,
1040 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_ERROR):
1041 85f03e0d Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
1042 c609f802 Michael Hanselmann
      return
1043 c609f802 Michael Hanselmann
1044 5685c1a5 Michael Hanselmann
    old = self._GetJobPath(job.id)
1045 5685c1a5 Michael Hanselmann
    new = self._GetArchivedJobPath(job.id)
1046 c609f802 Michael Hanselmann
1047 5685c1a5 Michael Hanselmann
    self._RenameFileUnlocked(old, new)
1048 c609f802 Michael Hanselmann
1049 5685c1a5 Michael Hanselmann
    logging.debug("Successfully archived job %s", job.id)
1050 f1da30e6 Michael Hanselmann
1051 07cd723a Iustin Pop
  @utils.LockedMethod
1052 07cd723a Iustin Pop
  @_RequireOpenQueue
1053 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1054 07cd723a Iustin Pop
    """Archives a job.
1055 07cd723a Iustin Pop

1056 ea03467c Iustin Pop
    This is just a wrapper over L{_ArchiveJobUnlocked}.
1057 ea03467c Iustin Pop

1058 07cd723a Iustin Pop
    @type job_id: string
1059 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1060 07cd723a Iustin Pop

1061 07cd723a Iustin Pop
    """
1062 07cd723a Iustin Pop
    return self._ArchiveJobUnlocked(job_id)
1063 07cd723a Iustin Pop
1064 07cd723a Iustin Pop
  @utils.LockedMethod
1065 07cd723a Iustin Pop
  @_RequireOpenQueue
1066 07cd723a Iustin Pop
  def AutoArchiveJobs(self, age):
1067 07cd723a Iustin Pop
    """Archives all jobs based on age.
1068 07cd723a Iustin Pop

1069 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1070 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1071 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1072 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1073 07cd723a Iustin Pop

1074 07cd723a Iustin Pop
    @type age: int
1075 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1076 07cd723a Iustin Pop

1077 07cd723a Iustin Pop
    """
1078 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1079 07cd723a Iustin Pop
1080 07cd723a Iustin Pop
    now = time.time()
1081 07cd723a Iustin Pop
    for jid in self._GetJobIDsUnlocked(archived=False):
1082 07cd723a Iustin Pop
      job = self._LoadJobUnlocked(jid)
1083 07cd723a Iustin Pop
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
1084 07cd723a Iustin Pop
                                  constants.OP_STATUS_ERROR,
1085 07cd723a Iustin Pop
                                  constants.OP_STATUS_CANCELED):
1086 07cd723a Iustin Pop
        continue
1087 07cd723a Iustin Pop
      if job.end_timestamp is None:
1088 07cd723a Iustin Pop
        if job.start_timestamp is None:
1089 07cd723a Iustin Pop
          job_age = job.received_timestamp
1090 07cd723a Iustin Pop
        else:
1091 07cd723a Iustin Pop
          job_age = job.start_timestamp
1092 07cd723a Iustin Pop
      else:
1093 07cd723a Iustin Pop
        job_age = job.end_timestamp
1094 07cd723a Iustin Pop
1095 07cd723a Iustin Pop
      if age == -1 or now - job_age[0] > age:
1096 07cd723a Iustin Pop
        self._ArchiveJobUnlocked(jid)
1097 07cd723a Iustin Pop
1098 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
1099 ea03467c Iustin Pop
    """Returns information about a job.
1100 ea03467c Iustin Pop

1101 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1102 ea03467c Iustin Pop
    @param job: the job which we query
1103 ea03467c Iustin Pop
    @type fields: list
1104 ea03467c Iustin Pop
    @param fields: names of fields to return
1105 ea03467c Iustin Pop
    @rtype: list
1106 ea03467c Iustin Pop
    @return: list with one element for each field
1107 ea03467c Iustin Pop
    @raise errors.OpExecError: when an invalid field
1108 ea03467c Iustin Pop
        has been passed
1109 ea03467c Iustin Pop

1110 ea03467c Iustin Pop
    """
1111 e2715f69 Michael Hanselmann
    row = []
1112 e2715f69 Michael Hanselmann
    for fname in fields:
1113 e2715f69 Michael Hanselmann
      if fname == "id":
1114 e2715f69 Michael Hanselmann
        row.append(job.id)
1115 e2715f69 Michael Hanselmann
      elif fname == "status":
1116 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
1117 af30b2fd Michael Hanselmann
      elif fname == "ops":
1118 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
1119 af30b2fd Michael Hanselmann
      elif fname == "opresult":
1120 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
1121 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
1122 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
1123 5b23c34c Iustin Pop
      elif fname == "oplog":
1124 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
1125 c56ec146 Iustin Pop
      elif fname == "opstart":
1126 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
1127 c56ec146 Iustin Pop
      elif fname == "opend":
1128 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
1129 c56ec146 Iustin Pop
      elif fname == "received_ts":
1130 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
1131 c56ec146 Iustin Pop
      elif fname == "start_ts":
1132 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
1133 c56ec146 Iustin Pop
      elif fname == "end_ts":
1134 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
1135 60dd1473 Iustin Pop
      elif fname == "summary":
1136 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
1137 e2715f69 Michael Hanselmann
      else:
1138 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1139 e2715f69 Michael Hanselmann
    return row
1140 e2715f69 Michael Hanselmann
1141 85f03e0d Michael Hanselmann
  @utils.LockedMethod
1142 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1143 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1144 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1145 e2715f69 Michael Hanselmann

1146 ea03467c Iustin Pop
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1147 ea03467c Iustin Pop
    processing for each job.
1148 ea03467c Iustin Pop

1149 ea03467c Iustin Pop
    @type job_ids: list
1150 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1151 ea03467c Iustin Pop
    @type fields: list
1152 ea03467c Iustin Pop
    @param fields: names of fields to return
1153 ea03467c Iustin Pop
    @rtype: list
1154 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1155 ea03467c Iustin Pop
        the requested fields
1156 e2715f69 Michael Hanselmann

1157 e2715f69 Michael Hanselmann
    """
1158 85f03e0d Michael Hanselmann
    jobs = []
1159 e2715f69 Michael Hanselmann
1160 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
1161 85f03e0d Michael Hanselmann
      if job is None:
1162 85f03e0d Michael Hanselmann
        jobs.append(None)
1163 85f03e0d Michael Hanselmann
      else:
1164 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1165 e2715f69 Michael Hanselmann
1166 85f03e0d Michael Hanselmann
    return jobs
1167 e2715f69 Michael Hanselmann
1168 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1169 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1170 e2715f69 Michael Hanselmann
  def Shutdown(self):
1171 e2715f69 Michael Hanselmann
    """Stops the job queue.
1172 e2715f69 Michael Hanselmann

1173 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1174 ea03467c Iustin Pop

1175 e2715f69 Michael Hanselmann
    """
1176 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1177 85f03e0d Michael Hanselmann
1178 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
1179 04ab05ce Michael Hanselmann
    self._queue_lock = None