Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ cb4e8387

History | View | Annotate | Download (33.5 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 e2715f69 Michael Hanselmann
import threading
35 f1da30e6 Michael Hanselmann
import errno
36 f1da30e6 Michael Hanselmann
import re
37 f1048938 Iustin Pop
import time
38 5685c1a5 Michael Hanselmann
import weakref
39 498ae1cc Iustin Pop
40 e2715f69 Michael Hanselmann
from ganeti import constants
41 f1da30e6 Michael Hanselmann
from ganeti import serializer
42 e2715f69 Michael Hanselmann
from ganeti import workerpool
43 f1da30e6 Michael Hanselmann
from ganeti import opcodes
44 7a1ecaed Iustin Pop
from ganeti import errors
45 e2715f69 Michael Hanselmann
from ganeti import mcpu
46 7996a135 Iustin Pop
from ganeti import utils
47 04ab05ce Michael Hanselmann
from ganeti import jstore
48 c3f0a12f Iustin Pop
from ganeti import rpc
49 e2715f69 Michael Hanselmann
50 72737a7f Iustin Pop
from ganeti.rpc import RpcRunner
51 e2715f69 Michael Hanselmann
52 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
53 e2715f69 Michael Hanselmann
54 498ae1cc Iustin Pop
55 70552c46 Michael Hanselmann
def TimeStampNow():
56 ea03467c Iustin Pop
  """Returns the current timestamp.
57 ea03467c Iustin Pop

58 ea03467c Iustin Pop
  @rtype: tuple
59 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
60 ea03467c Iustin Pop

61 ea03467c Iustin Pop
  """
62 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
63 70552c46 Michael Hanselmann
64 70552c46 Michael Hanselmann
65 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
66 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
67 e2715f69 Michael Hanselmann

68 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
69 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
70 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
71 ea03467c Iustin Pop
  @ivar status: the current status
72 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
73 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
74 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
75 f1048938 Iustin Pop

76 e2715f69 Michael Hanselmann
  """
77 85f03e0d Michael Hanselmann
  def __init__(self, op):
78 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
79 ea03467c Iustin Pop

80 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
81 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
82 ea03467c Iustin Pop

83 ea03467c Iustin Pop
    """
84 85f03e0d Michael Hanselmann
    self.input = op
85 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
86 85f03e0d Michael Hanselmann
    self.result = None
87 85f03e0d Michael Hanselmann
    self.log = []
88 70552c46 Michael Hanselmann
    self.start_timestamp = None
89 70552c46 Michael Hanselmann
    self.end_timestamp = None
90 f1da30e6 Michael Hanselmann
91 f1da30e6 Michael Hanselmann
  @classmethod
92 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
93 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
94 ea03467c Iustin Pop

95 ea03467c Iustin Pop
    @type state: dict
96 ea03467c Iustin Pop
    @param state: the serialized state
97 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
98 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
99 ea03467c Iustin Pop

100 ea03467c Iustin Pop
    """
101 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
102 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
103 85f03e0d Michael Hanselmann
    obj.status = state["status"]
104 85f03e0d Michael Hanselmann
    obj.result = state["result"]
105 85f03e0d Michael Hanselmann
    obj.log = state["log"]
106 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
107 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
108 f1da30e6 Michael Hanselmann
    return obj
109 f1da30e6 Michael Hanselmann
110 f1da30e6 Michael Hanselmann
  def Serialize(self):
111 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
112 ea03467c Iustin Pop

113 ea03467c Iustin Pop
    @rtype: dict
114 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
115 ea03467c Iustin Pop

116 ea03467c Iustin Pop
    """
117 6c5a7090 Michael Hanselmann
    return {
118 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
119 6c5a7090 Michael Hanselmann
      "status": self.status,
120 6c5a7090 Michael Hanselmann
      "result": self.result,
121 6c5a7090 Michael Hanselmann
      "log": self.log,
122 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
123 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
124 6c5a7090 Michael Hanselmann
      }
125 f1048938 Iustin Pop
126 e2715f69 Michael Hanselmann
127 e2715f69 Michael Hanselmann
class _QueuedJob(object):
128 e2715f69 Michael Hanselmann
  """In-memory job representation.
129 e2715f69 Michael Hanselmann

130 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
131 ea03467c Iustin Pop
  be taken care of by users of this class.
132 ea03467c Iustin Pop

133 ea03467c Iustin Pop
  @type queue: L{JobQueue}
134 ea03467c Iustin Pop
  @ivar queue: the parent queue
135 ea03467c Iustin Pop
  @ivar id: the job ID
136 ea03467c Iustin Pop
  @type ops: list
137 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
138 ea03467c Iustin Pop
  @type run_op_index: int
139 ea03467c Iustin Pop
  @ivar run_op_index: the currently executing opcode, or -1 if
140 ea03467c Iustin Pop
      we didn't yet start executing
141 ea03467c Iustin Pop
  @type log_serial: int
142 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
143 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
144 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
145 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
146 ea03467c Iustin Pop
  @ivar change: a Condition variable we use for waiting for job changes
147 e2715f69 Michael Hanselmann

148 e2715f69 Michael Hanselmann
  """
149 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
150 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
151 ea03467c Iustin Pop

152 ea03467c Iustin Pop
    @type queue: L{JobQueue}
153 ea03467c Iustin Pop
    @param queue: our parent queue
154 ea03467c Iustin Pop
    @type job_id: job_id
155 ea03467c Iustin Pop
    @param job_id: our job id
156 ea03467c Iustin Pop
    @type ops: list
157 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
158 ea03467c Iustin Pop
        in _QueuedOpCodes
159 ea03467c Iustin Pop

160 ea03467c Iustin Pop
    """
161 e2715f69 Michael Hanselmann
    if not ops:
162 ea03467c Iustin Pop
      # TODO: use a better exception
163 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
164 e2715f69 Michael Hanselmann
165 85f03e0d Michael Hanselmann
    self.queue = queue
166 f1da30e6 Michael Hanselmann
    self.id = job_id
167 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
168 85f03e0d Michael Hanselmann
    self.run_op_index = -1
169 6c5a7090 Michael Hanselmann
    self.log_serial = 0
170 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
171 c56ec146 Iustin Pop
    self.start_timestamp = None
172 c56ec146 Iustin Pop
    self.end_timestamp = None
173 6c5a7090 Michael Hanselmann
174 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
175 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
176 f1da30e6 Michael Hanselmann
177 f1da30e6 Michael Hanselmann
  @classmethod
178 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
179 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
180 ea03467c Iustin Pop

181 ea03467c Iustin Pop
    @type queue: L{JobQueue}
182 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
183 ea03467c Iustin Pop
    @type state: dict
184 ea03467c Iustin Pop
    @param state: the serialized state
185 ea03467c Iustin Pop
    @rtype: _JobQueue
186 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
187 ea03467c Iustin Pop

188 ea03467c Iustin Pop
    """
189 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
190 85f03e0d Michael Hanselmann
    obj.queue = queue
191 85f03e0d Michael Hanselmann
    obj.id = state["id"]
192 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
193 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
194 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
195 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
196 6c5a7090 Michael Hanselmann
197 6c5a7090 Michael Hanselmann
    obj.ops = []
198 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
199 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
200 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
201 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
202 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
203 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
204 6c5a7090 Michael Hanselmann
205 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
206 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
207 6c5a7090 Michael Hanselmann
208 f1da30e6 Michael Hanselmann
    return obj
209 f1da30e6 Michael Hanselmann
210 f1da30e6 Michael Hanselmann
  def Serialize(self):
211 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
212 ea03467c Iustin Pop

213 ea03467c Iustin Pop
    @rtype: dict
214 ea03467c Iustin Pop
    @return: the serialized state
215 ea03467c Iustin Pop

216 ea03467c Iustin Pop
    """
217 f1da30e6 Michael Hanselmann
    return {
218 f1da30e6 Michael Hanselmann
      "id": self.id,
219 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
220 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
221 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
222 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
223 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
224 f1da30e6 Michael Hanselmann
      }
225 f1da30e6 Michael Hanselmann
226 85f03e0d Michael Hanselmann
  def CalcStatus(self):
227 ea03467c Iustin Pop
    """Compute the status of this job.
228 ea03467c Iustin Pop

229 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
230 ea03467c Iustin Pop
    based on their status, computes the job status.
231 ea03467c Iustin Pop

232 ea03467c Iustin Pop
    The algorithm is:
233 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
234 ea03467c Iustin Pop
        status will be the same
235 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
236 ea03467c Iustin Pop
          - waitlock
237 ea03467c Iustin Pop
          - running
238 ea03467c Iustin Pop

239 ea03467c Iustin Pop
        will determine the job status
240 ea03467c Iustin Pop

241 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
242 ea03467c Iustin Pop
        and the job status will be the same
243 ea03467c Iustin Pop

244 ea03467c Iustin Pop
    @return: the job status
245 ea03467c Iustin Pop

246 ea03467c Iustin Pop
    """
247 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
248 e2715f69 Michael Hanselmann
249 e2715f69 Michael Hanselmann
    all_success = True
250 85f03e0d Michael Hanselmann
    for op in self.ops:
251 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
252 e2715f69 Michael Hanselmann
        continue
253 e2715f69 Michael Hanselmann
254 e2715f69 Michael Hanselmann
      all_success = False
255 e2715f69 Michael Hanselmann
256 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
257 e2715f69 Michael Hanselmann
        pass
258 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
259 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
260 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
261 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
262 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
263 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
264 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
265 f1da30e6 Michael Hanselmann
        break
266 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
267 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
268 4cb1d919 Michael Hanselmann
        break
269 e2715f69 Michael Hanselmann
270 e2715f69 Michael Hanselmann
    if all_success:
271 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
272 e2715f69 Michael Hanselmann
273 e2715f69 Michael Hanselmann
    return status
274 e2715f69 Michael Hanselmann
275 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
276 ea03467c Iustin Pop
    """Selectively returns the log entries.
277 ea03467c Iustin Pop

278 ea03467c Iustin Pop
    @type newer_than: None or int
279 ea03467c Iustin Pop
    @param newer_than: if this is None, return all log enties,
280 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
281 ea03467c Iustin Pop
        than this value
282 ea03467c Iustin Pop
    @rtype: list
283 ea03467c Iustin Pop
    @return: the list of the log entries selected
284 ea03467c Iustin Pop

285 ea03467c Iustin Pop
    """
286 6c5a7090 Michael Hanselmann
    if newer_than is None:
287 6c5a7090 Michael Hanselmann
      serial = -1
288 6c5a7090 Michael Hanselmann
    else:
289 6c5a7090 Michael Hanselmann
      serial = newer_than
290 6c5a7090 Michael Hanselmann
291 6c5a7090 Michael Hanselmann
    entries = []
292 6c5a7090 Michael Hanselmann
    for op in self.ops:
293 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
294 6c5a7090 Michael Hanselmann
295 6c5a7090 Michael Hanselmann
    return entries
296 6c5a7090 Michael Hanselmann
297 f1048938 Iustin Pop
298 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
299 ea03467c Iustin Pop
  """The actual job workers.
300 ea03467c Iustin Pop

301 ea03467c Iustin Pop
  """
302 e92376d7 Iustin Pop
  def _NotifyStart(self):
303 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
304 e92376d7 Iustin Pop

305 e92376d7 Iustin Pop
    This is called from the mcpu code as a notifier function, when the
306 e92376d7 Iustin Pop
    LU is finally about to start the Exec() method. Of course, to have
307 e92376d7 Iustin Pop
    end-user visible results, the opcode must be initially (before
308 e92376d7 Iustin Pop
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
309 e92376d7 Iustin Pop

310 e92376d7 Iustin Pop
    """
311 e92376d7 Iustin Pop
    assert self.queue, "Queue attribute is missing"
312 e92376d7 Iustin Pop
    assert self.opcode, "Opcode attribute is missing"
313 e92376d7 Iustin Pop
314 e92376d7 Iustin Pop
    self.queue.acquire()
315 e92376d7 Iustin Pop
    try:
316 e92376d7 Iustin Pop
      self.opcode.status = constants.OP_STATUS_RUNNING
317 e92376d7 Iustin Pop
    finally:
318 e92376d7 Iustin Pop
      self.queue.release()
319 e92376d7 Iustin Pop
320 85f03e0d Michael Hanselmann
  def RunTask(self, job):
321 e2715f69 Michael Hanselmann
    """Job executor.
322 e2715f69 Michael Hanselmann

323 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
324 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
325 e2715f69 Michael Hanselmann

326 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
327 ea03467c Iustin Pop
    @param job: the job to be processed
328 ea03467c Iustin Pop

329 e2715f69 Michael Hanselmann
    """
330 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
331 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
332 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
333 e92376d7 Iustin Pop
    self.queue = queue = job.queue
334 e2715f69 Michael Hanselmann
    try:
335 85f03e0d Michael Hanselmann
      try:
336 85f03e0d Michael Hanselmann
        count = len(job.ops)
337 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
338 85f03e0d Michael Hanselmann
          try:
339 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
340 85f03e0d Michael Hanselmann
341 85f03e0d Michael Hanselmann
            queue.acquire()
342 85f03e0d Michael Hanselmann
            try:
343 85f03e0d Michael Hanselmann
              job.run_op_index = idx
344 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
345 85f03e0d Michael Hanselmann
              op.result = None
346 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
347 c56ec146 Iustin Pop
              if idx == 0: # first opcode
348 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
349 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
350 85f03e0d Michael Hanselmann
351 38206f3c Iustin Pop
              input_opcode = op.input
352 85f03e0d Michael Hanselmann
            finally:
353 85f03e0d Michael Hanselmann
              queue.release()
354 85f03e0d Michael Hanselmann
355 dfe57c22 Michael Hanselmann
            def _Log(*args):
356 6c5a7090 Michael Hanselmann
              """Append a log entry.
357 6c5a7090 Michael Hanselmann

358 6c5a7090 Michael Hanselmann
              """
359 6c5a7090 Michael Hanselmann
              assert len(args) < 3
360 6c5a7090 Michael Hanselmann
361 6c5a7090 Michael Hanselmann
              if len(args) == 1:
362 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
363 6c5a7090 Michael Hanselmann
                log_msg = args[0]
364 6c5a7090 Michael Hanselmann
              else:
365 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
366 6c5a7090 Michael Hanselmann
367 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
368 6c5a7090 Michael Hanselmann
              # precision.
369 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
370 dfe57c22 Michael Hanselmann
371 6c5a7090 Michael Hanselmann
              queue.acquire()
372 dfe57c22 Michael Hanselmann
              try:
373 6c5a7090 Michael Hanselmann
                job.log_serial += 1
374 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
375 6c5a7090 Michael Hanselmann
376 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
377 dfe57c22 Michael Hanselmann
              finally:
378 6c5a7090 Michael Hanselmann
                queue.release()
379 dfe57c22 Michael Hanselmann
380 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
381 e92376d7 Iustin Pop
            self.opcode = op
382 e92376d7 Iustin Pop
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
383 85f03e0d Michael Hanselmann
384 85f03e0d Michael Hanselmann
            queue.acquire()
385 85f03e0d Michael Hanselmann
            try:
386 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
387 85f03e0d Michael Hanselmann
              op.result = result
388 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
389 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
390 85f03e0d Michael Hanselmann
            finally:
391 85f03e0d Michael Hanselmann
              queue.release()
392 85f03e0d Michael Hanselmann
393 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
394 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
395 85f03e0d Michael Hanselmann
          except Exception, err:
396 85f03e0d Michael Hanselmann
            queue.acquire()
397 85f03e0d Michael Hanselmann
            try:
398 85f03e0d Michael Hanselmann
              try:
399 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
400 85f03e0d Michael Hanselmann
                op.result = str(err)
401 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
402 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
403 85f03e0d Michael Hanselmann
              finally:
404 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
405 85f03e0d Michael Hanselmann
            finally:
406 85f03e0d Michael Hanselmann
              queue.release()
407 85f03e0d Michael Hanselmann
            raise
408 85f03e0d Michael Hanselmann
409 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
410 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
411 85f03e0d Michael Hanselmann
      except:
412 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
413 e2715f69 Michael Hanselmann
    finally:
414 85f03e0d Michael Hanselmann
      queue.acquire()
415 85f03e0d Michael Hanselmann
      try:
416 65548ed5 Michael Hanselmann
        try:
417 65548ed5 Michael Hanselmann
          job.run_op_idx = -1
418 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
419 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
420 65548ed5 Michael Hanselmann
        finally:
421 65548ed5 Michael Hanselmann
          job_id = job.id
422 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
423 85f03e0d Michael Hanselmann
      finally:
424 85f03e0d Michael Hanselmann
        queue.release()
425 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
426 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
427 e2715f69 Michael Hanselmann
428 e2715f69 Michael Hanselmann
429 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
430 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
431 ea03467c Iustin Pop

432 ea03467c Iustin Pop
  """
433 5bdce580 Michael Hanselmann
  def __init__(self, queue):
434 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
435 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
436 5bdce580 Michael Hanselmann
    self.queue = queue
437 e2715f69 Michael Hanselmann
438 e2715f69 Michael Hanselmann
439 85f03e0d Michael Hanselmann
class JobQueue(object):
440 ea03467c Iustin Pop
  """Quue used to manaage the jobs.
441 ea03467c Iustin Pop

442 ea03467c Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
443 ea03467c Iustin Pop

444 ea03467c Iustin Pop
  """
445 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
446 f1da30e6 Michael Hanselmann
447 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
448 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
449 db37da70 Michael Hanselmann

450 ea03467c Iustin Pop
    This function should be used for all 'public' functions. That is,
451 ea03467c Iustin Pop
    functions usually called from other classes.
452 db37da70 Michael Hanselmann

453 ea03467c Iustin Pop
    @warning: Use this decorator only after utils.LockedMethod!
454 db37da70 Michael Hanselmann

455 ea03467c Iustin Pop
    Example::
456 db37da70 Michael Hanselmann
      @utils.LockedMethod
457 db37da70 Michael Hanselmann
      @_RequireOpenQueue
458 db37da70 Michael Hanselmann
      def Example(self):
459 db37da70 Michael Hanselmann
        pass
460 db37da70 Michael Hanselmann

461 db37da70 Michael Hanselmann
    """
462 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
463 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
464 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
465 db37da70 Michael Hanselmann
    return wrapper
466 db37da70 Michael Hanselmann
467 85f03e0d Michael Hanselmann
  def __init__(self, context):
468 ea03467c Iustin Pop
    """Constructor for JobQueue.
469 ea03467c Iustin Pop

470 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
471 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
472 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
473 ea03467c Iustin Pop
    running).
474 ea03467c Iustin Pop

475 ea03467c Iustin Pop
    @type context: GanetiContext
476 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
477 ea03467c Iustin Pop
        data and other ganeti objects
478 ea03467c Iustin Pop

479 ea03467c Iustin Pop
    """
480 5bdce580 Michael Hanselmann
    self.context = context
481 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
482 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
483 f1da30e6 Michael Hanselmann
484 85f03e0d Michael Hanselmann
    # Locking
485 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
486 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
487 85f03e0d Michael Hanselmann
    self.release = self._lock.release
488 85f03e0d Michael Hanselmann
489 04ab05ce Michael Hanselmann
    # Initialize
490 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
491 f1da30e6 Michael Hanselmann
492 04ab05ce Michael Hanselmann
    # Read serial file
493 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
494 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
495 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
496 c4beba1c Iustin Pop
497 23752136 Michael Hanselmann
    # Get initial list of nodes
498 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
499 99aabbed Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values())
500 8e00939c Michael Hanselmann
501 8e00939c Michael Hanselmann
    # Remove master node
502 8e00939c Michael Hanselmann
    try:
503 99aabbed Iustin Pop
      del self._nodes[self._my_hostname]
504 8e00939c Michael Hanselmann
    except ValueError:
505 8e00939c Michael Hanselmann
      pass
506 23752136 Michael Hanselmann
507 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
508 23752136 Michael Hanselmann
509 85f03e0d Michael Hanselmann
    # Setup worker pool
510 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
511 85f03e0d Michael Hanselmann
512 85f03e0d Michael Hanselmann
    # We need to lock here because WorkerPool.AddTask() may start a job while
513 85f03e0d Michael Hanselmann
    # we're still doing our work.
514 85f03e0d Michael Hanselmann
    self.acquire()
515 85f03e0d Michael Hanselmann
    try:
516 85f03e0d Michael Hanselmann
      for job in self._GetJobsUnlocked(None):
517 94ed59a5 Iustin Pop
        # a failure in loading the job can cause 'None' to be returned
518 94ed59a5 Iustin Pop
        if job is None:
519 94ed59a5 Iustin Pop
          continue
520 94ed59a5 Iustin Pop
521 85f03e0d Michael Hanselmann
        status = job.CalcStatus()
522 85f03e0d Michael Hanselmann
523 85f03e0d Michael Hanselmann
        if status in (constants.JOB_STATUS_QUEUED, ):
524 85f03e0d Michael Hanselmann
          self._wpool.AddTask(job)
525 85f03e0d Michael Hanselmann
526 e92376d7 Iustin Pop
        elif status in (constants.JOB_STATUS_RUNNING,
527 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
528 85f03e0d Michael Hanselmann
          logging.warning("Unfinished job %s found: %s", job.id, job)
529 85f03e0d Michael Hanselmann
          try:
530 85f03e0d Michael Hanselmann
            for op in job.ops:
531 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_ERROR
532 85f03e0d Michael Hanselmann
              op.result = "Unclean master daemon shutdown"
533 85f03e0d Michael Hanselmann
          finally:
534 85f03e0d Michael Hanselmann
            self.UpdateJobUnlocked(job)
535 85f03e0d Michael Hanselmann
    finally:
536 85f03e0d Michael Hanselmann
      self.release()
537 85f03e0d Michael Hanselmann
538 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
539 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
540 99aabbed Iustin Pop
  def AddNode(self, node):
541 99aabbed Iustin Pop
    """Register a new node with the queue.
542 99aabbed Iustin Pop

543 99aabbed Iustin Pop
    @type node: L{objects.Node}
544 99aabbed Iustin Pop
    @param node: the node object to be added
545 99aabbed Iustin Pop

546 99aabbed Iustin Pop
    """
547 99aabbed Iustin Pop
    node_name = node.name
548 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
549 23752136 Michael Hanselmann
550 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
551 72737a7f Iustin Pop
    RpcRunner.call_jobqueue_purge(node_name)
552 23752136 Michael Hanselmann
553 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
554 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
555 23752136 Michael Hanselmann
556 d2e03a33 Michael Hanselmann
    # Upload current serial file
557 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
558 d2e03a33 Michael Hanselmann
559 d2e03a33 Michael Hanselmann
    for file_name in files:
560 9f774ee8 Michael Hanselmann
      # Read file content
561 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
562 9f774ee8 Michael Hanselmann
      try:
563 9f774ee8 Michael Hanselmann
        content = fd.read()
564 9f774ee8 Michael Hanselmann
      finally:
565 9f774ee8 Michael Hanselmann
        fd.close()
566 9f774ee8 Michael Hanselmann
567 99aabbed Iustin Pop
      result = RpcRunner.call_jobqueue_update([node_name], [node.primary_ip],
568 99aabbed Iustin Pop
                                              file_name, content)
569 d2e03a33 Michael Hanselmann
      if not result[node_name]:
570 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
571 d2e03a33 Michael Hanselmann
572 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
573 d2e03a33 Michael Hanselmann
574 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
575 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
576 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
577 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
578 ea03467c Iustin Pop

579 ea03467c Iustin Pop
    @type node_name: str
580 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
581 ea03467c Iustin Pop

582 ea03467c Iustin Pop
    """
583 23752136 Michael Hanselmann
    try:
584 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
585 99aabbed Iustin Pop
      del self._nodes[node_name]
586 d2e03a33 Michael Hanselmann
    except KeyError:
587 23752136 Michael Hanselmann
      pass
588 23752136 Michael Hanselmann
589 e74798c1 Michael Hanselmann
  def _CheckRpcResult(self, result, nodes, failmsg):
590 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
591 ea03467c Iustin Pop

592 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
593 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
594 ea03467c Iustin Pop
    log the case when more than half of the nodes failes.
595 ea03467c Iustin Pop

596 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
597 ea03467c Iustin Pop
    @type nodes: list
598 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
599 ea03467c Iustin Pop
    @type failmsg: str
600 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
601 ea03467c Iustin Pop

602 ea03467c Iustin Pop
    """
603 e74798c1 Michael Hanselmann
    failed = []
604 e74798c1 Michael Hanselmann
    success = []
605 e74798c1 Michael Hanselmann
606 e74798c1 Michael Hanselmann
    for node in nodes:
607 e74798c1 Michael Hanselmann
      if result[node]:
608 e74798c1 Michael Hanselmann
        success.append(node)
609 e74798c1 Michael Hanselmann
      else:
610 e74798c1 Michael Hanselmann
        failed.append(node)
611 e74798c1 Michael Hanselmann
612 e74798c1 Michael Hanselmann
    if failed:
613 e74798c1 Michael Hanselmann
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
614 e74798c1 Michael Hanselmann
615 e74798c1 Michael Hanselmann
    # +1 for the master node
616 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
617 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
618 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
619 e74798c1 Michael Hanselmann
620 99aabbed Iustin Pop
  def _GetNodeIp(self):
621 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
622 99aabbed Iustin Pop

623 ea03467c Iustin Pop
    @rtype: (list, list)
624 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
625 ea03467c Iustin Pop
        names and the second one with the node addresses
626 ea03467c Iustin Pop

627 99aabbed Iustin Pop
    """
628 99aabbed Iustin Pop
    name_list = self._nodes.keys()
629 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
630 99aabbed Iustin Pop
    return name_list, addr_list
631 99aabbed Iustin Pop
632 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
633 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
634 8e00939c Michael Hanselmann

635 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
636 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
637 ea03467c Iustin Pop

638 ea03467c Iustin Pop
    @type file_name: str
639 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
640 ea03467c Iustin Pop
    @type data: str
641 ea03467c Iustin Pop
    @param data: the new contents of the file
642 ea03467c Iustin Pop

643 8e00939c Michael Hanselmann
    """
644 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
645 8e00939c Michael Hanselmann
646 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
647 99aabbed Iustin Pop
    result = RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
648 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
649 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
650 23752136 Michael Hanselmann
651 abc1f2ce Michael Hanselmann
  def _RenameFileUnlocked(self, old, new):
652 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
653 ea03467c Iustin Pop

654 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
655 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
656 ea03467c Iustin Pop

657 ea03467c Iustin Pop
    @type old: str
658 ea03467c Iustin Pop
    @param old: the current name of the file
659 ea03467c Iustin Pop
    @type new: str
660 ea03467c Iustin Pop
    @param new: the new name of the file
661 ea03467c Iustin Pop

662 ea03467c Iustin Pop
    """
663 abc1f2ce Michael Hanselmann
    os.rename(old, new)
664 abc1f2ce Michael Hanselmann
665 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
666 99aabbed Iustin Pop
    result = RpcRunner.call_jobqueue_rename(names, addrs, old, new)
667 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
668 e74798c1 Michael Hanselmann
                         "Moving %s to %s" % (old, new))
669 abc1f2ce Michael Hanselmann
670 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
671 ea03467c Iustin Pop
    """Convert a job ID to string format.
672 ea03467c Iustin Pop

673 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
674 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
675 ea03467c Iustin Pop
    abstract this change.
676 ea03467c Iustin Pop

677 ea03467c Iustin Pop
    @type job_id: int or long
678 ea03467c Iustin Pop
    @param job_id: the numeric job id
679 ea03467c Iustin Pop
    @rtype: str
680 ea03467c Iustin Pop
    @return: the formatted job id
681 ea03467c Iustin Pop

682 ea03467c Iustin Pop
    """
683 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
684 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
685 85f03e0d Michael Hanselmann
    if job_id < 0:
686 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
687 85f03e0d Michael Hanselmann
688 85f03e0d Michael Hanselmann
    return str(job_id)
689 85f03e0d Michael Hanselmann
690 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
691 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
692 f1da30e6 Michael Hanselmann

693 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
694 f1da30e6 Michael Hanselmann

695 ea03467c Iustin Pop
    @rtype: str
696 ea03467c Iustin Pop
    @return: a string representing the job identifier.
697 f1da30e6 Michael Hanselmann

698 f1da30e6 Michael Hanselmann
    """
699 f1da30e6 Michael Hanselmann
    # New number
700 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
701 f1da30e6 Michael Hanselmann
702 f1da30e6 Michael Hanselmann
    # Write to file
703 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
704 23752136 Michael Hanselmann
                                        "%s\n" % serial)
705 f1da30e6 Michael Hanselmann
706 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
707 f1da30e6 Michael Hanselmann
    self._last_serial = serial
708 f1da30e6 Michael Hanselmann
709 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
710 f1da30e6 Michael Hanselmann
711 85f03e0d Michael Hanselmann
  @staticmethod
712 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
713 ea03467c Iustin Pop
    """Returns the job file for a given job id.
714 ea03467c Iustin Pop

715 ea03467c Iustin Pop
    @type job_id: str
716 ea03467c Iustin Pop
    @param job_id: the job identifier
717 ea03467c Iustin Pop
    @rtype: str
718 ea03467c Iustin Pop
    @return: the path to the job file
719 ea03467c Iustin Pop

720 ea03467c Iustin Pop
    """
721 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
722 f1da30e6 Michael Hanselmann
723 85f03e0d Michael Hanselmann
  @staticmethod
724 85f03e0d Michael Hanselmann
  def _GetArchivedJobPath(job_id):
725 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
726 ea03467c Iustin Pop

727 ea03467c Iustin Pop
    @type job_id: str
728 ea03467c Iustin Pop
    @param job_id: the job identifier
729 ea03467c Iustin Pop
    @rtype: str
730 ea03467c Iustin Pop
    @return: the path to the archived job file
731 ea03467c Iustin Pop

732 ea03467c Iustin Pop
    """
733 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
734 0cb94105 Michael Hanselmann
735 85f03e0d Michael Hanselmann
  @classmethod
736 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
737 ea03467c Iustin Pop
    """Extract the job id from a filename.
738 ea03467c Iustin Pop

739 ea03467c Iustin Pop
    @type name: str
740 ea03467c Iustin Pop
    @param name: the job filename
741 ea03467c Iustin Pop
    @rtype: job id or None
742 ea03467c Iustin Pop
    @return: the job id corresponding to the given filename,
743 ea03467c Iustin Pop
        or None if the filename does not represent a valid
744 ea03467c Iustin Pop
        job file
745 ea03467c Iustin Pop

746 ea03467c Iustin Pop
    """
747 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
748 fae737ac Michael Hanselmann
    if m:
749 fae737ac Michael Hanselmann
      return m.group(1)
750 fae737ac Michael Hanselmann
    else:
751 fae737ac Michael Hanselmann
      return None
752 fae737ac Michael Hanselmann
753 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
754 911a495b Iustin Pop
    """Return all known job IDs.
755 911a495b Iustin Pop

756 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
757 911a495b Iustin Pop
    included. Currently this argument is unused.
758 911a495b Iustin Pop

759 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
760 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
761 ac0930b9 Iustin Pop
    extra IDs).
762 ac0930b9 Iustin Pop

763 ea03467c Iustin Pop
    @rtype: list
764 ea03467c Iustin Pop
    @return: the list of job IDs
765 ea03467c Iustin Pop

766 911a495b Iustin Pop
    """
767 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
768 3b87986e Iustin Pop
    jlist = utils.NiceSort(jlist)
769 f0d874fe Iustin Pop
    return jlist
770 911a495b Iustin Pop
771 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
772 ea03467c Iustin Pop
    """Returns the list of current job files.
773 ea03467c Iustin Pop

774 ea03467c Iustin Pop
    @rtype: list
775 ea03467c Iustin Pop
    @return: the list of job file names
776 ea03467c Iustin Pop

777 ea03467c Iustin Pop
    """
778 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
779 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
780 f1da30e6 Michael Hanselmann
781 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
782 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
783 ea03467c Iustin Pop

784 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
785 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
786 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
787 ea03467c Iustin Pop

788 ea03467c Iustin Pop
    @param job_id: the job id
789 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
790 ea03467c Iustin Pop
    @return: either None or the job object
791 ea03467c Iustin Pop

792 ea03467c Iustin Pop
    """
793 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
794 5685c1a5 Michael Hanselmann
    if job:
795 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
796 5685c1a5 Michael Hanselmann
      return job
797 ac0930b9 Iustin Pop
798 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
799 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
800 f1da30e6 Michael Hanselmann
    try:
801 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
802 f1da30e6 Michael Hanselmann
    except IOError, err:
803 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
804 f1da30e6 Michael Hanselmann
        return None
805 f1da30e6 Michael Hanselmann
      raise
806 f1da30e6 Michael Hanselmann
    try:
807 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
808 f1da30e6 Michael Hanselmann
    finally:
809 f1da30e6 Michael Hanselmann
      fd.close()
810 f1da30e6 Michael Hanselmann
811 94ed59a5 Iustin Pop
    try:
812 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
813 94ed59a5 Iustin Pop
    except Exception, err:
814 94ed59a5 Iustin Pop
      new_path = self._GetArchivedJobPath(job_id)
815 94ed59a5 Iustin Pop
      if filepath == new_path:
816 94ed59a5 Iustin Pop
        # job already archived (future case)
817 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s", job_id)
818 94ed59a5 Iustin Pop
      else:
819 94ed59a5 Iustin Pop
        # non-archived case
820 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s, will archive.", job_id)
821 94ed59a5 Iustin Pop
        self._RenameFileUnlocked(filepath, new_path)
822 94ed59a5 Iustin Pop
      return None
823 94ed59a5 Iustin Pop
824 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
825 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
826 ac0930b9 Iustin Pop
    return job
827 f1da30e6 Michael Hanselmann
828 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
829 ea03467c Iustin Pop
    """Return a list of jobs based on their IDs.
830 ea03467c Iustin Pop

831 ea03467c Iustin Pop
    @type job_ids: list
832 ea03467c Iustin Pop
    @param job_ids: either an empty list (meaning all jobs),
833 ea03467c Iustin Pop
        or a list of job IDs
834 ea03467c Iustin Pop
    @rtype: list
835 ea03467c Iustin Pop
    @return: the list of job objects
836 ea03467c Iustin Pop

837 ea03467c Iustin Pop
    """
838 911a495b Iustin Pop
    if not job_ids:
839 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
840 f1da30e6 Michael Hanselmann
841 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
842 f1da30e6 Michael Hanselmann
843 686d7433 Iustin Pop
  @staticmethod
844 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
845 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
846 686d7433 Iustin Pop

847 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
848 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
849 686d7433 Iustin Pop

850 ea03467c Iustin Pop
    @rtype: boolean
851 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
852 ea03467c Iustin Pop

853 686d7433 Iustin Pop
    """
854 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
855 686d7433 Iustin Pop
856 3ccafd0e Iustin Pop
  @staticmethod
857 3ccafd0e Iustin Pop
  def SetDrainFlag(drain_flag):
858 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
859 3ccafd0e Iustin Pop

860 3ccafd0e Iustin Pop
    This is similar to the function L{backend.JobQueueSetDrainFlag},
861 3ccafd0e Iustin Pop
    and in the future we might merge them.
862 3ccafd0e Iustin Pop

863 ea03467c Iustin Pop
    @type drain_flag: boolean
864 ea03467c Iustin Pop
    @param drain_flag: wheter to set or unset the drain flag
865 ea03467c Iustin Pop

866 3ccafd0e Iustin Pop
    """
867 3ccafd0e Iustin Pop
    if drain_flag:
868 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
869 3ccafd0e Iustin Pop
    else:
870 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
871 3ccafd0e Iustin Pop
    return True
872 3ccafd0e Iustin Pop
873 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
874 db37da70 Michael Hanselmann
  @_RequireOpenQueue
875 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
876 85f03e0d Michael Hanselmann
    """Create and store a new job.
877 f1da30e6 Michael Hanselmann

878 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
879 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
880 c3f0a12f Iustin Pop

881 c3f0a12f Iustin Pop
    @type ops: list
882 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
883 ea03467c Iustin Pop
    @rtype: job ID
884 ea03467c Iustin Pop
    @return: the job ID of the newly created job
885 ea03467c Iustin Pop
    @raise errors.JobQueueDrainError: if the job is marked for draining
886 c3f0a12f Iustin Pop

887 c3f0a12f Iustin Pop
    """
888 686d7433 Iustin Pop
    if self._IsQueueMarkedDrain():
889 686d7433 Iustin Pop
      raise errors.JobQueueDrainError()
890 f1da30e6 Michael Hanselmann
    # Get job identifier
891 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
892 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
893 f1da30e6 Michael Hanselmann
894 f1da30e6 Michael Hanselmann
    # Write to disk
895 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
896 f1da30e6 Michael Hanselmann
897 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
898 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
899 ac0930b9 Iustin Pop
900 85f03e0d Michael Hanselmann
    # Add to worker pool
901 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
902 85f03e0d Michael Hanselmann
903 85f03e0d Michael Hanselmann
    return job.id
904 f1da30e6 Michael Hanselmann
905 db37da70 Michael Hanselmann
  @_RequireOpenQueue
906 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
907 ea03467c Iustin Pop
    """Update a job's on disk storage.
908 ea03467c Iustin Pop

909 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
910 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
911 ea03467c Iustin Pop
    nodes.
912 ea03467c Iustin Pop

913 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
914 ea03467c Iustin Pop
    @param job: the changed job
915 ea03467c Iustin Pop

916 ea03467c Iustin Pop
    """
917 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
918 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
919 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
920 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
921 ac0930b9 Iustin Pop
922 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
923 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
924 dfe57c22 Michael Hanselmann
925 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
926 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
927 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
928 5c735209 Iustin Pop
                        timeout):
929 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
930 6c5a7090 Michael Hanselmann

931 6c5a7090 Michael Hanselmann
    @type job_id: string
932 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
933 6c5a7090 Michael Hanselmann
    @type fields: list of strings
934 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
935 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
936 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
937 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
938 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
939 5c735209 Iustin Pop
    @type timeout: float
940 5c735209 Iustin Pop
    @param timeout: maximum time to wait
941 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
942 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
943 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
944 ea03467c Iustin Pop

945 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
946 ea03467c Iustin Pop
        we instead return a special value,
947 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
948 ea03467c Iustin Pop
        as such by the clients
949 6c5a7090 Michael Hanselmann

950 6c5a7090 Michael Hanselmann
    """
951 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
952 5c735209 Iustin Pop
    end_time = time.time() + timeout
953 dfe57c22 Michael Hanselmann
    while True:
954 5c735209 Iustin Pop
      delta_time = end_time - time.time()
955 5c735209 Iustin Pop
      if delta_time < 0:
956 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
957 5c735209 Iustin Pop
958 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
959 6c5a7090 Michael Hanselmann
      if not job:
960 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
961 6c5a7090 Michael Hanselmann
        break
962 dfe57c22 Michael Hanselmann
963 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
964 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
965 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
966 dfe57c22 Michael Hanselmann
967 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
968 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
969 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
970 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
971 dfe57c22 Michael Hanselmann
      # significantly different.
972 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
973 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
974 dfe57c22 Michael Hanselmann
975 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
976 e92376d7 Iustin Pop
                        constants.JOB_STATUS_RUNNING,
977 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
978 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
979 6c5a7090 Michael Hanselmann
        # no changes.
980 dfe57c22 Michael Hanselmann
        break
981 dfe57c22 Michael Hanselmann
982 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
983 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
984 6c5a7090 Michael Hanselmann
        break
985 6c5a7090 Michael Hanselmann
986 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
987 6c5a7090 Michael Hanselmann
988 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
989 5c735209 Iustin Pop
      job.change.wait(delta_time)
990 dfe57c22 Michael Hanselmann
991 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
992 dfe57c22 Michael Hanselmann
993 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
994 dfe57c22 Michael Hanselmann
995 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
996 db37da70 Michael Hanselmann
  @_RequireOpenQueue
997 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
998 188c5e0a Michael Hanselmann
    """Cancels a job.
999 188c5e0a Michael Hanselmann

1000 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1001 ea03467c Iustin Pop

1002 188c5e0a Michael Hanselmann
    @type job_id: string
1003 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1004 188c5e0a Michael Hanselmann

1005 188c5e0a Michael Hanselmann
    """
1006 188c5e0a Michael Hanselmann
    logging.debug("Cancelling job %s", job_id)
1007 188c5e0a Michael Hanselmann
1008 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1009 188c5e0a Michael Hanselmann
    if not job:
1010 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1011 188c5e0a Michael Hanselmann
      return
1012 188c5e0a Michael Hanselmann
1013 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
1014 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
1015 188c5e0a Michael Hanselmann
      return
1016 188c5e0a Michael Hanselmann
1017 85f03e0d Michael Hanselmann
    try:
1018 85f03e0d Michael Hanselmann
      for op in job.ops:
1019 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1020 85f03e0d Michael Hanselmann
        op.result = "Job cancelled by request"
1021 85f03e0d Michael Hanselmann
    finally:
1022 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
1023 188c5e0a Michael Hanselmann
1024 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1025 07cd723a Iustin Pop
  def _ArchiveJobUnlocked(self, job_id):
1026 c609f802 Michael Hanselmann
    """Archives a job.
1027 c609f802 Michael Hanselmann

1028 c609f802 Michael Hanselmann
    @type job_id: string
1029 c609f802 Michael Hanselmann
    @param job_id: Job ID of job to be archived.
1030 c609f802 Michael Hanselmann

1031 c609f802 Michael Hanselmann
    """
1032 07cd723a Iustin Pop
    logging.info("Archiving job %s", job_id)
1033 c609f802 Michael Hanselmann
1034 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1035 c609f802 Michael Hanselmann
    if not job:
1036 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1037 c609f802 Michael Hanselmann
      return
1038 c609f802 Michael Hanselmann
1039 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1040 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_SUCCESS,
1041 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_ERROR):
1042 85f03e0d Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
1043 c609f802 Michael Hanselmann
      return
1044 c609f802 Michael Hanselmann
1045 5685c1a5 Michael Hanselmann
    old = self._GetJobPath(job.id)
1046 5685c1a5 Michael Hanselmann
    new = self._GetArchivedJobPath(job.id)
1047 c609f802 Michael Hanselmann
1048 5685c1a5 Michael Hanselmann
    self._RenameFileUnlocked(old, new)
1049 c609f802 Michael Hanselmann
1050 5685c1a5 Michael Hanselmann
    logging.debug("Successfully archived job %s", job.id)
1051 f1da30e6 Michael Hanselmann
1052 07cd723a Iustin Pop
  @utils.LockedMethod
1053 07cd723a Iustin Pop
  @_RequireOpenQueue
1054 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1055 07cd723a Iustin Pop
    """Archives a job.
1056 07cd723a Iustin Pop

1057 ea03467c Iustin Pop
    This is just a wrapper over L{_ArchiveJobUnlocked}.
1058 ea03467c Iustin Pop

1059 07cd723a Iustin Pop
    @type job_id: string
1060 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1061 07cd723a Iustin Pop

1062 07cd723a Iustin Pop
    """
1063 07cd723a Iustin Pop
    return self._ArchiveJobUnlocked(job_id)
1064 07cd723a Iustin Pop
1065 07cd723a Iustin Pop
  @utils.LockedMethod
1066 07cd723a Iustin Pop
  @_RequireOpenQueue
1067 07cd723a Iustin Pop
  def AutoArchiveJobs(self, age):
1068 07cd723a Iustin Pop
    """Archives all jobs based on age.
1069 07cd723a Iustin Pop

1070 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1071 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1072 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1073 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1074 07cd723a Iustin Pop

1075 07cd723a Iustin Pop
    @type age: int
1076 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1077 07cd723a Iustin Pop

1078 07cd723a Iustin Pop
    """
1079 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1080 07cd723a Iustin Pop
1081 07cd723a Iustin Pop
    now = time.time()
1082 07cd723a Iustin Pop
    for jid in self._GetJobIDsUnlocked(archived=False):
1083 07cd723a Iustin Pop
      job = self._LoadJobUnlocked(jid)
1084 07cd723a Iustin Pop
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
1085 07cd723a Iustin Pop
                                  constants.OP_STATUS_ERROR,
1086 07cd723a Iustin Pop
                                  constants.OP_STATUS_CANCELED):
1087 07cd723a Iustin Pop
        continue
1088 07cd723a Iustin Pop
      if job.end_timestamp is None:
1089 07cd723a Iustin Pop
        if job.start_timestamp is None:
1090 07cd723a Iustin Pop
          job_age = job.received_timestamp
1091 07cd723a Iustin Pop
        else:
1092 07cd723a Iustin Pop
          job_age = job.start_timestamp
1093 07cd723a Iustin Pop
      else:
1094 07cd723a Iustin Pop
        job_age = job.end_timestamp
1095 07cd723a Iustin Pop
1096 07cd723a Iustin Pop
      if age == -1 or now - job_age[0] > age:
1097 07cd723a Iustin Pop
        self._ArchiveJobUnlocked(jid)
1098 07cd723a Iustin Pop
1099 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
1100 ea03467c Iustin Pop
    """Returns information about a job.
1101 ea03467c Iustin Pop

1102 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1103 ea03467c Iustin Pop
    @param job: the job which we query
1104 ea03467c Iustin Pop
    @type fields: list
1105 ea03467c Iustin Pop
    @param fields: names of fields to return
1106 ea03467c Iustin Pop
    @rtype: list
1107 ea03467c Iustin Pop
    @return: list with one element for each field
1108 ea03467c Iustin Pop
    @raise errors.OpExecError: when an invalid field
1109 ea03467c Iustin Pop
        has been passed
1110 ea03467c Iustin Pop

1111 ea03467c Iustin Pop
    """
1112 e2715f69 Michael Hanselmann
    row = []
1113 e2715f69 Michael Hanselmann
    for fname in fields:
1114 e2715f69 Michael Hanselmann
      if fname == "id":
1115 e2715f69 Michael Hanselmann
        row.append(job.id)
1116 e2715f69 Michael Hanselmann
      elif fname == "status":
1117 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
1118 af30b2fd Michael Hanselmann
      elif fname == "ops":
1119 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
1120 af30b2fd Michael Hanselmann
      elif fname == "opresult":
1121 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
1122 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
1123 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
1124 5b23c34c Iustin Pop
      elif fname == "oplog":
1125 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
1126 c56ec146 Iustin Pop
      elif fname == "opstart":
1127 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
1128 c56ec146 Iustin Pop
      elif fname == "opend":
1129 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
1130 c56ec146 Iustin Pop
      elif fname == "received_ts":
1131 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
1132 c56ec146 Iustin Pop
      elif fname == "start_ts":
1133 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
1134 c56ec146 Iustin Pop
      elif fname == "end_ts":
1135 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
1136 60dd1473 Iustin Pop
      elif fname == "summary":
1137 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
1138 e2715f69 Michael Hanselmann
      else:
1139 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1140 e2715f69 Michael Hanselmann
    return row
1141 e2715f69 Michael Hanselmann
1142 85f03e0d Michael Hanselmann
  @utils.LockedMethod
1143 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1144 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1145 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1146 e2715f69 Michael Hanselmann

1147 ea03467c Iustin Pop
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1148 ea03467c Iustin Pop
    processing for each job.
1149 ea03467c Iustin Pop

1150 ea03467c Iustin Pop
    @type job_ids: list
1151 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1152 ea03467c Iustin Pop
    @type fields: list
1153 ea03467c Iustin Pop
    @param fields: names of fields to return
1154 ea03467c Iustin Pop
    @rtype: list
1155 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1156 ea03467c Iustin Pop
        the requested fields
1157 e2715f69 Michael Hanselmann

1158 e2715f69 Michael Hanselmann
    """
1159 85f03e0d Michael Hanselmann
    jobs = []
1160 e2715f69 Michael Hanselmann
1161 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
1162 85f03e0d Michael Hanselmann
      if job is None:
1163 85f03e0d Michael Hanselmann
        jobs.append(None)
1164 85f03e0d Michael Hanselmann
      else:
1165 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1166 e2715f69 Michael Hanselmann
1167 85f03e0d Michael Hanselmann
    return jobs
1168 e2715f69 Michael Hanselmann
1169 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1170 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1171 e2715f69 Michael Hanselmann
  def Shutdown(self):
1172 e2715f69 Michael Hanselmann
    """Stops the job queue.
1173 e2715f69 Michael Hanselmann

1174 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1175 ea03467c Iustin Pop

1176 e2715f69 Michael Hanselmann
    """
1177 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1178 85f03e0d Michael Hanselmann
1179 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
1180 04ab05ce Michael Hanselmann
    self._queue_lock = None