Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 33987705

History | View | Annotate | Download (34.2 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 e2715f69 Michael Hanselmann
import threading
35 f1da30e6 Michael Hanselmann
import errno
36 f1da30e6 Michael Hanselmann
import re
37 f1048938 Iustin Pop
import time
38 5685c1a5 Michael Hanselmann
import weakref
39 498ae1cc Iustin Pop
40 e2715f69 Michael Hanselmann
from ganeti import constants
41 f1da30e6 Michael Hanselmann
from ganeti import serializer
42 e2715f69 Michael Hanselmann
from ganeti import workerpool
43 f1da30e6 Michael Hanselmann
from ganeti import opcodes
44 7a1ecaed Iustin Pop
from ganeti import errors
45 e2715f69 Michael Hanselmann
from ganeti import mcpu
46 7996a135 Iustin Pop
from ganeti import utils
47 04ab05ce Michael Hanselmann
from ganeti import jstore
48 c3f0a12f Iustin Pop
from ganeti import rpc
49 e2715f69 Michael Hanselmann
50 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
51 e2715f69 Michael Hanselmann
52 498ae1cc Iustin Pop
53 70552c46 Michael Hanselmann
def TimeStampNow():
54 ea03467c Iustin Pop
  """Returns the current timestamp.
55 ea03467c Iustin Pop

56 ea03467c Iustin Pop
  @rtype: tuple
57 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
58 ea03467c Iustin Pop

59 ea03467c Iustin Pop
  """
60 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
61 70552c46 Michael Hanselmann
62 70552c46 Michael Hanselmann
63 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
64 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
65 e2715f69 Michael Hanselmann

66 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
67 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
68 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
69 ea03467c Iustin Pop
  @ivar status: the current status
70 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
71 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
72 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
73 f1048938 Iustin Pop

74 e2715f69 Michael Hanselmann
  """
75 85f03e0d Michael Hanselmann
  def __init__(self, op):
76 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
77 ea03467c Iustin Pop

78 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
79 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
80 ea03467c Iustin Pop

81 ea03467c Iustin Pop
    """
82 85f03e0d Michael Hanselmann
    self.input = op
83 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
84 85f03e0d Michael Hanselmann
    self.result = None
85 85f03e0d Michael Hanselmann
    self.log = []
86 70552c46 Michael Hanselmann
    self.start_timestamp = None
87 70552c46 Michael Hanselmann
    self.end_timestamp = None
88 f1da30e6 Michael Hanselmann
89 f1da30e6 Michael Hanselmann
  @classmethod
90 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
91 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
92 ea03467c Iustin Pop

93 ea03467c Iustin Pop
    @type state: dict
94 ea03467c Iustin Pop
    @param state: the serialized state
95 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
96 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
97 ea03467c Iustin Pop

98 ea03467c Iustin Pop
    """
99 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
100 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
101 85f03e0d Michael Hanselmann
    obj.status = state["status"]
102 85f03e0d Michael Hanselmann
    obj.result = state["result"]
103 85f03e0d Michael Hanselmann
    obj.log = state["log"]
104 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
105 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
106 f1da30e6 Michael Hanselmann
    return obj
107 f1da30e6 Michael Hanselmann
108 f1da30e6 Michael Hanselmann
  def Serialize(self):
109 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
110 ea03467c Iustin Pop

111 ea03467c Iustin Pop
    @rtype: dict
112 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
113 ea03467c Iustin Pop

114 ea03467c Iustin Pop
    """
115 6c5a7090 Michael Hanselmann
    return {
116 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
117 6c5a7090 Michael Hanselmann
      "status": self.status,
118 6c5a7090 Michael Hanselmann
      "result": self.result,
119 6c5a7090 Michael Hanselmann
      "log": self.log,
120 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
121 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
122 6c5a7090 Michael Hanselmann
      }
123 f1048938 Iustin Pop
124 e2715f69 Michael Hanselmann
125 e2715f69 Michael Hanselmann
class _QueuedJob(object):
126 e2715f69 Michael Hanselmann
  """In-memory job representation.
127 e2715f69 Michael Hanselmann

128 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
129 ea03467c Iustin Pop
  be taken care of by users of this class.
130 ea03467c Iustin Pop

131 ea03467c Iustin Pop
  @type queue: L{JobQueue}
132 ea03467c Iustin Pop
  @ivar queue: the parent queue
133 ea03467c Iustin Pop
  @ivar id: the job ID
134 ea03467c Iustin Pop
  @type ops: list
135 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
136 ea03467c Iustin Pop
  @type run_op_index: int
137 ea03467c Iustin Pop
  @ivar run_op_index: the currently executing opcode, or -1 if
138 ea03467c Iustin Pop
      we didn't yet start executing
139 ea03467c Iustin Pop
  @type log_serial: int
140 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
141 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
142 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
143 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
144 ea03467c Iustin Pop
  @ivar change: a Condition variable we use for waiting for job changes
145 e2715f69 Michael Hanselmann

146 e2715f69 Michael Hanselmann
  """
147 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
148 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
149 ea03467c Iustin Pop

150 ea03467c Iustin Pop
    @type queue: L{JobQueue}
151 ea03467c Iustin Pop
    @param queue: our parent queue
152 ea03467c Iustin Pop
    @type job_id: job_id
153 ea03467c Iustin Pop
    @param job_id: our job id
154 ea03467c Iustin Pop
    @type ops: list
155 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
156 ea03467c Iustin Pop
        in _QueuedOpCodes
157 ea03467c Iustin Pop

158 ea03467c Iustin Pop
    """
159 e2715f69 Michael Hanselmann
    if not ops:
160 ea03467c Iustin Pop
      # TODO: use a better exception
161 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
162 e2715f69 Michael Hanselmann
163 85f03e0d Michael Hanselmann
    self.queue = queue
164 f1da30e6 Michael Hanselmann
    self.id = job_id
165 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
166 85f03e0d Michael Hanselmann
    self.run_op_index = -1
167 6c5a7090 Michael Hanselmann
    self.log_serial = 0
168 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
169 c56ec146 Iustin Pop
    self.start_timestamp = None
170 c56ec146 Iustin Pop
    self.end_timestamp = None
171 6c5a7090 Michael Hanselmann
172 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
173 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
174 f1da30e6 Michael Hanselmann
175 f1da30e6 Michael Hanselmann
  @classmethod
176 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
177 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
178 ea03467c Iustin Pop

179 ea03467c Iustin Pop
    @type queue: L{JobQueue}
180 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
181 ea03467c Iustin Pop
    @type state: dict
182 ea03467c Iustin Pop
    @param state: the serialized state
183 ea03467c Iustin Pop
    @rtype: _JobQueue
184 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
185 ea03467c Iustin Pop

186 ea03467c Iustin Pop
    """
187 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
188 85f03e0d Michael Hanselmann
    obj.queue = queue
189 85f03e0d Michael Hanselmann
    obj.id = state["id"]
190 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
191 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
192 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
193 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
194 6c5a7090 Michael Hanselmann
195 6c5a7090 Michael Hanselmann
    obj.ops = []
196 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
197 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
198 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
199 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
200 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
201 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
202 6c5a7090 Michael Hanselmann
203 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
204 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
205 6c5a7090 Michael Hanselmann
206 f1da30e6 Michael Hanselmann
    return obj
207 f1da30e6 Michael Hanselmann
208 f1da30e6 Michael Hanselmann
  def Serialize(self):
209 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
210 ea03467c Iustin Pop

211 ea03467c Iustin Pop
    @rtype: dict
212 ea03467c Iustin Pop
    @return: the serialized state
213 ea03467c Iustin Pop

214 ea03467c Iustin Pop
    """
215 f1da30e6 Michael Hanselmann
    return {
216 f1da30e6 Michael Hanselmann
      "id": self.id,
217 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
218 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
219 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
220 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
221 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
222 f1da30e6 Michael Hanselmann
      }
223 f1da30e6 Michael Hanselmann
224 85f03e0d Michael Hanselmann
  def CalcStatus(self):
225 ea03467c Iustin Pop
    """Compute the status of this job.
226 ea03467c Iustin Pop

227 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
228 ea03467c Iustin Pop
    based on their status, computes the job status.
229 ea03467c Iustin Pop

230 ea03467c Iustin Pop
    The algorithm is:
231 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
232 ea03467c Iustin Pop
        status will be the same
233 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
234 ea03467c Iustin Pop
          - waitlock
235 ea03467c Iustin Pop
          - running
236 ea03467c Iustin Pop

237 ea03467c Iustin Pop
        will determine the job status
238 ea03467c Iustin Pop

239 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
240 ea03467c Iustin Pop
        and the job status will be the same
241 ea03467c Iustin Pop

242 ea03467c Iustin Pop
    @return: the job status
243 ea03467c Iustin Pop

244 ea03467c Iustin Pop
    """
245 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
246 e2715f69 Michael Hanselmann
247 e2715f69 Michael Hanselmann
    all_success = True
248 85f03e0d Michael Hanselmann
    for op in self.ops:
249 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
250 e2715f69 Michael Hanselmann
        continue
251 e2715f69 Michael Hanselmann
252 e2715f69 Michael Hanselmann
      all_success = False
253 e2715f69 Michael Hanselmann
254 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
255 e2715f69 Michael Hanselmann
        pass
256 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
257 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
258 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
259 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
260 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
261 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
262 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
263 f1da30e6 Michael Hanselmann
        break
264 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
265 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
266 4cb1d919 Michael Hanselmann
        break
267 e2715f69 Michael Hanselmann
268 e2715f69 Michael Hanselmann
    if all_success:
269 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
270 e2715f69 Michael Hanselmann
271 e2715f69 Michael Hanselmann
    return status
272 e2715f69 Michael Hanselmann
273 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
274 ea03467c Iustin Pop
    """Selectively returns the log entries.
275 ea03467c Iustin Pop

276 ea03467c Iustin Pop
    @type newer_than: None or int
277 ea03467c Iustin Pop
    @param newer_than: if this is None, return all log enties,
278 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
279 ea03467c Iustin Pop
        than this value
280 ea03467c Iustin Pop
    @rtype: list
281 ea03467c Iustin Pop
    @return: the list of the log entries selected
282 ea03467c Iustin Pop

283 ea03467c Iustin Pop
    """
284 6c5a7090 Michael Hanselmann
    if newer_than is None:
285 6c5a7090 Michael Hanselmann
      serial = -1
286 6c5a7090 Michael Hanselmann
    else:
287 6c5a7090 Michael Hanselmann
      serial = newer_than
288 6c5a7090 Michael Hanselmann
289 6c5a7090 Michael Hanselmann
    entries = []
290 6c5a7090 Michael Hanselmann
    for op in self.ops:
291 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
292 6c5a7090 Michael Hanselmann
293 6c5a7090 Michael Hanselmann
    return entries
294 6c5a7090 Michael Hanselmann
295 f1048938 Iustin Pop
296 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
297 ea03467c Iustin Pop
  """The actual job workers.
298 ea03467c Iustin Pop

299 ea03467c Iustin Pop
  """
300 e92376d7 Iustin Pop
  def _NotifyStart(self):
301 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
302 e92376d7 Iustin Pop

303 e92376d7 Iustin Pop
    This is called from the mcpu code as a notifier function, when the
304 e92376d7 Iustin Pop
    LU is finally about to start the Exec() method. Of course, to have
305 e92376d7 Iustin Pop
    end-user visible results, the opcode must be initially (before
306 e92376d7 Iustin Pop
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
307 e92376d7 Iustin Pop

308 e92376d7 Iustin Pop
    """
309 e92376d7 Iustin Pop
    assert self.queue, "Queue attribute is missing"
310 e92376d7 Iustin Pop
    assert self.opcode, "Opcode attribute is missing"
311 e92376d7 Iustin Pop
312 e92376d7 Iustin Pop
    self.queue.acquire()
313 e92376d7 Iustin Pop
    try:
314 e92376d7 Iustin Pop
      self.opcode.status = constants.OP_STATUS_RUNNING
315 e92376d7 Iustin Pop
    finally:
316 e92376d7 Iustin Pop
      self.queue.release()
317 e92376d7 Iustin Pop
318 85f03e0d Michael Hanselmann
  def RunTask(self, job):
319 e2715f69 Michael Hanselmann
    """Job executor.
320 e2715f69 Michael Hanselmann

321 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
322 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
323 e2715f69 Michael Hanselmann

324 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
325 ea03467c Iustin Pop
    @param job: the job to be processed
326 ea03467c Iustin Pop

327 e2715f69 Michael Hanselmann
    """
328 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
329 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
330 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
331 e92376d7 Iustin Pop
    self.queue = queue = job.queue
332 e2715f69 Michael Hanselmann
    try:
333 85f03e0d Michael Hanselmann
      try:
334 85f03e0d Michael Hanselmann
        count = len(job.ops)
335 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
336 85f03e0d Michael Hanselmann
          try:
337 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
338 85f03e0d Michael Hanselmann
339 85f03e0d Michael Hanselmann
            queue.acquire()
340 85f03e0d Michael Hanselmann
            try:
341 85f03e0d Michael Hanselmann
              job.run_op_index = idx
342 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
343 85f03e0d Michael Hanselmann
              op.result = None
344 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
345 c56ec146 Iustin Pop
              if idx == 0: # first opcode
346 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
347 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
348 85f03e0d Michael Hanselmann
349 38206f3c Iustin Pop
              input_opcode = op.input
350 85f03e0d Michael Hanselmann
            finally:
351 85f03e0d Michael Hanselmann
              queue.release()
352 85f03e0d Michael Hanselmann
353 dfe57c22 Michael Hanselmann
            def _Log(*args):
354 6c5a7090 Michael Hanselmann
              """Append a log entry.
355 6c5a7090 Michael Hanselmann

356 6c5a7090 Michael Hanselmann
              """
357 6c5a7090 Michael Hanselmann
              assert len(args) < 3
358 6c5a7090 Michael Hanselmann
359 6c5a7090 Michael Hanselmann
              if len(args) == 1:
360 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
361 6c5a7090 Michael Hanselmann
                log_msg = args[0]
362 6c5a7090 Michael Hanselmann
              else:
363 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
364 6c5a7090 Michael Hanselmann
365 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
366 6c5a7090 Michael Hanselmann
              # precision.
367 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
368 dfe57c22 Michael Hanselmann
369 6c5a7090 Michael Hanselmann
              queue.acquire()
370 dfe57c22 Michael Hanselmann
              try:
371 6c5a7090 Michael Hanselmann
                job.log_serial += 1
372 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
373 6c5a7090 Michael Hanselmann
374 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
375 dfe57c22 Michael Hanselmann
              finally:
376 6c5a7090 Michael Hanselmann
                queue.release()
377 dfe57c22 Michael Hanselmann
378 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
379 e92376d7 Iustin Pop
            self.opcode = op
380 e92376d7 Iustin Pop
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
381 85f03e0d Michael Hanselmann
382 85f03e0d Michael Hanselmann
            queue.acquire()
383 85f03e0d Michael Hanselmann
            try:
384 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
385 85f03e0d Michael Hanselmann
              op.result = result
386 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
387 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
388 85f03e0d Michael Hanselmann
            finally:
389 85f03e0d Michael Hanselmann
              queue.release()
390 85f03e0d Michael Hanselmann
391 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
392 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
393 85f03e0d Michael Hanselmann
          except Exception, err:
394 85f03e0d Michael Hanselmann
            queue.acquire()
395 85f03e0d Michael Hanselmann
            try:
396 85f03e0d Michael Hanselmann
              try:
397 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
398 85f03e0d Michael Hanselmann
                op.result = str(err)
399 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
400 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
401 85f03e0d Michael Hanselmann
              finally:
402 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
403 85f03e0d Michael Hanselmann
            finally:
404 85f03e0d Michael Hanselmann
              queue.release()
405 85f03e0d Michael Hanselmann
            raise
406 85f03e0d Michael Hanselmann
407 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
408 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
409 85f03e0d Michael Hanselmann
      except:
410 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
411 e2715f69 Michael Hanselmann
    finally:
412 85f03e0d Michael Hanselmann
      queue.acquire()
413 85f03e0d Michael Hanselmann
      try:
414 65548ed5 Michael Hanselmann
        try:
415 65548ed5 Michael Hanselmann
          job.run_op_idx = -1
416 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
417 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
418 65548ed5 Michael Hanselmann
        finally:
419 65548ed5 Michael Hanselmann
          job_id = job.id
420 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
421 85f03e0d Michael Hanselmann
      finally:
422 85f03e0d Michael Hanselmann
        queue.release()
423 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
424 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
425 e2715f69 Michael Hanselmann
426 e2715f69 Michael Hanselmann
427 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
428 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
429 ea03467c Iustin Pop

430 ea03467c Iustin Pop
  """
431 5bdce580 Michael Hanselmann
  def __init__(self, queue):
432 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
433 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
434 5bdce580 Michael Hanselmann
    self.queue = queue
435 e2715f69 Michael Hanselmann
436 e2715f69 Michael Hanselmann
437 85f03e0d Michael Hanselmann
class JobQueue(object):
438 ea03467c Iustin Pop
  """Quue used to manaage the jobs.
439 ea03467c Iustin Pop

440 ea03467c Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
441 ea03467c Iustin Pop

442 ea03467c Iustin Pop
  """
443 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
444 f1da30e6 Michael Hanselmann
445 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
446 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
447 db37da70 Michael Hanselmann

448 ea03467c Iustin Pop
    This function should be used for all 'public' functions. That is,
449 ea03467c Iustin Pop
    functions usually called from other classes.
450 db37da70 Michael Hanselmann

451 ea03467c Iustin Pop
    @warning: Use this decorator only after utils.LockedMethod!
452 db37da70 Michael Hanselmann

453 ea03467c Iustin Pop
    Example::
454 db37da70 Michael Hanselmann
      @utils.LockedMethod
455 db37da70 Michael Hanselmann
      @_RequireOpenQueue
456 db37da70 Michael Hanselmann
      def Example(self):
457 db37da70 Michael Hanselmann
        pass
458 db37da70 Michael Hanselmann

459 db37da70 Michael Hanselmann
    """
460 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
461 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
462 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
463 db37da70 Michael Hanselmann
    return wrapper
464 db37da70 Michael Hanselmann
465 85f03e0d Michael Hanselmann
  def __init__(self, context):
466 ea03467c Iustin Pop
    """Constructor for JobQueue.
467 ea03467c Iustin Pop

468 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
469 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
470 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
471 ea03467c Iustin Pop
    running).
472 ea03467c Iustin Pop

473 ea03467c Iustin Pop
    @type context: GanetiContext
474 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
475 ea03467c Iustin Pop
        data and other ganeti objects
476 ea03467c Iustin Pop

477 ea03467c Iustin Pop
    """
478 5bdce580 Michael Hanselmann
    self.context = context
479 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
480 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
481 f1da30e6 Michael Hanselmann
482 85f03e0d Michael Hanselmann
    # Locking
483 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
484 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
485 85f03e0d Michael Hanselmann
    self.release = self._lock.release
486 85f03e0d Michael Hanselmann
487 04ab05ce Michael Hanselmann
    # Initialize
488 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
489 f1da30e6 Michael Hanselmann
490 04ab05ce Michael Hanselmann
    # Read serial file
491 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
492 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
493 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
494 c4beba1c Iustin Pop
495 23752136 Michael Hanselmann
    # Get initial list of nodes
496 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
497 99aabbed Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values())
498 8e00939c Michael Hanselmann
499 8e00939c Michael Hanselmann
    # Remove master node
500 8e00939c Michael Hanselmann
    try:
501 99aabbed Iustin Pop
      del self._nodes[self._my_hostname]
502 33987705 Iustin Pop
    except KeyError:
503 8e00939c Michael Hanselmann
      pass
504 23752136 Michael Hanselmann
505 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
506 23752136 Michael Hanselmann
507 85f03e0d Michael Hanselmann
    # Setup worker pool
508 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
509 85f03e0d Michael Hanselmann
    try:
510 16714921 Michael Hanselmann
      # We need to lock here because WorkerPool.AddTask() may start a job while
511 16714921 Michael Hanselmann
      # we're still doing our work.
512 16714921 Michael Hanselmann
      self.acquire()
513 16714921 Michael Hanselmann
      try:
514 711b5124 Michael Hanselmann
        logging.info("Inspecting job queue")
515 711b5124 Michael Hanselmann
516 711b5124 Michael Hanselmann
        all_job_ids = self._GetJobIDsUnlocked()
517 711b5124 Michael Hanselmann
        lastinfo = time.time()
518 711b5124 Michael Hanselmann
        for idx, job_id in enumerate(all_job_ids):
519 711b5124 Michael Hanselmann
          # Give an update every 1000 jobs or 10 seconds
520 711b5124 Michael Hanselmann
          if idx % 1000 == 0 or time.time() >= (lastinfo + 10.0):
521 711b5124 Michael Hanselmann
            jobs_count = len(all_job_ids)
522 711b5124 Michael Hanselmann
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
523 711b5124 Michael Hanselmann
                         idx, jobs_count, 100.0 * (idx + 1) / jobs_count)
524 711b5124 Michael Hanselmann
            lastinfo = time.time()
525 711b5124 Michael Hanselmann
526 711b5124 Michael Hanselmann
          job = self._LoadJobUnlocked(job_id)
527 711b5124 Michael Hanselmann
528 16714921 Michael Hanselmann
          # a failure in loading the job can cause 'None' to be returned
529 16714921 Michael Hanselmann
          if job is None:
530 16714921 Michael Hanselmann
            continue
531 94ed59a5 Iustin Pop
532 16714921 Michael Hanselmann
          status = job.CalcStatus()
533 85f03e0d Michael Hanselmann
534 16714921 Michael Hanselmann
          if status in (constants.JOB_STATUS_QUEUED, ):
535 16714921 Michael Hanselmann
            self._wpool.AddTask(job)
536 85f03e0d Michael Hanselmann
537 16714921 Michael Hanselmann
          elif status in (constants.JOB_STATUS_RUNNING,
538 16714921 Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK):
539 16714921 Michael Hanselmann
            logging.warning("Unfinished job %s found: %s", job.id, job)
540 16714921 Michael Hanselmann
            try:
541 16714921 Michael Hanselmann
              for op in job.ops:
542 16714921 Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
543 16714921 Michael Hanselmann
                op.result = "Unclean master daemon shutdown"
544 16714921 Michael Hanselmann
            finally:
545 16714921 Michael Hanselmann
              self.UpdateJobUnlocked(job)
546 711b5124 Michael Hanselmann
547 711b5124 Michael Hanselmann
        logging.info("Job queue inspection finished")
548 16714921 Michael Hanselmann
      finally:
549 16714921 Michael Hanselmann
        self.release()
550 16714921 Michael Hanselmann
    except:
551 16714921 Michael Hanselmann
      self._wpool.TerminateWorkers()
552 16714921 Michael Hanselmann
      raise
553 85f03e0d Michael Hanselmann
554 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
555 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
556 99aabbed Iustin Pop
  def AddNode(self, node):
557 99aabbed Iustin Pop
    """Register a new node with the queue.
558 99aabbed Iustin Pop

559 99aabbed Iustin Pop
    @type node: L{objects.Node}
560 99aabbed Iustin Pop
    @param node: the node object to be added
561 99aabbed Iustin Pop

562 99aabbed Iustin Pop
    """
563 99aabbed Iustin Pop
    node_name = node.name
564 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
565 23752136 Michael Hanselmann
566 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
567 a3811745 Michael Hanselmann
    rpc.RpcRunner.call_jobqueue_purge(node_name)
568 23752136 Michael Hanselmann
569 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
570 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
571 23752136 Michael Hanselmann
572 d2e03a33 Michael Hanselmann
    # Upload current serial file
573 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
574 d2e03a33 Michael Hanselmann
575 d2e03a33 Michael Hanselmann
    for file_name in files:
576 9f774ee8 Michael Hanselmann
      # Read file content
577 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
578 9f774ee8 Michael Hanselmann
      try:
579 9f774ee8 Michael Hanselmann
        content = fd.read()
580 9f774ee8 Michael Hanselmann
      finally:
581 9f774ee8 Michael Hanselmann
        fd.close()
582 9f774ee8 Michael Hanselmann
583 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
584 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
585 a3811745 Michael Hanselmann
                                                  file_name, content)
586 d2e03a33 Michael Hanselmann
      if not result[node_name]:
587 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
588 d2e03a33 Michael Hanselmann
589 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
590 d2e03a33 Michael Hanselmann
591 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
592 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
593 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
594 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
595 ea03467c Iustin Pop

596 ea03467c Iustin Pop
    @type node_name: str
597 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
598 ea03467c Iustin Pop

599 ea03467c Iustin Pop
    """
600 23752136 Michael Hanselmann
    try:
601 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
602 99aabbed Iustin Pop
      del self._nodes[node_name]
603 d2e03a33 Michael Hanselmann
    except KeyError:
604 23752136 Michael Hanselmann
      pass
605 23752136 Michael Hanselmann
606 e74798c1 Michael Hanselmann
  def _CheckRpcResult(self, result, nodes, failmsg):
607 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
608 ea03467c Iustin Pop

609 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
610 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
611 ea03467c Iustin Pop
    log the case when more than half of the nodes failes.
612 ea03467c Iustin Pop

613 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
614 ea03467c Iustin Pop
    @type nodes: list
615 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
616 ea03467c Iustin Pop
    @type failmsg: str
617 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
618 ea03467c Iustin Pop

619 ea03467c Iustin Pop
    """
620 e74798c1 Michael Hanselmann
    failed = []
621 e74798c1 Michael Hanselmann
    success = []
622 e74798c1 Michael Hanselmann
623 e74798c1 Michael Hanselmann
    for node in nodes:
624 e74798c1 Michael Hanselmann
      if result[node]:
625 e74798c1 Michael Hanselmann
        success.append(node)
626 e74798c1 Michael Hanselmann
      else:
627 e74798c1 Michael Hanselmann
        failed.append(node)
628 e74798c1 Michael Hanselmann
629 e74798c1 Michael Hanselmann
    if failed:
630 e74798c1 Michael Hanselmann
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
631 e74798c1 Michael Hanselmann
632 e74798c1 Michael Hanselmann
    # +1 for the master node
633 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
634 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
635 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
636 e74798c1 Michael Hanselmann
637 99aabbed Iustin Pop
  def _GetNodeIp(self):
638 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
639 99aabbed Iustin Pop

640 ea03467c Iustin Pop
    @rtype: (list, list)
641 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
642 ea03467c Iustin Pop
        names and the second one with the node addresses
643 ea03467c Iustin Pop

644 99aabbed Iustin Pop
    """
645 99aabbed Iustin Pop
    name_list = self._nodes.keys()
646 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
647 99aabbed Iustin Pop
    return name_list, addr_list
648 99aabbed Iustin Pop
649 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
650 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
651 8e00939c Michael Hanselmann

652 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
653 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
654 ea03467c Iustin Pop

655 ea03467c Iustin Pop
    @type file_name: str
656 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
657 ea03467c Iustin Pop
    @type data: str
658 ea03467c Iustin Pop
    @param data: the new contents of the file
659 ea03467c Iustin Pop

660 8e00939c Michael Hanselmann
    """
661 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
662 8e00939c Michael Hanselmann
663 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
664 a3811745 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
665 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
666 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
667 23752136 Michael Hanselmann
668 abc1f2ce Michael Hanselmann
  def _RenameFileUnlocked(self, old, new):
669 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
670 ea03467c Iustin Pop

671 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
672 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
673 ea03467c Iustin Pop

674 ea03467c Iustin Pop
    @type old: str
675 ea03467c Iustin Pop
    @param old: the current name of the file
676 ea03467c Iustin Pop
    @type new: str
677 ea03467c Iustin Pop
    @param new: the new name of the file
678 ea03467c Iustin Pop

679 ea03467c Iustin Pop
    """
680 abc1f2ce Michael Hanselmann
    os.rename(old, new)
681 abc1f2ce Michael Hanselmann
682 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
683 a3811745 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
684 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
685 e74798c1 Michael Hanselmann
                         "Moving %s to %s" % (old, new))
686 abc1f2ce Michael Hanselmann
687 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
688 ea03467c Iustin Pop
    """Convert a job ID to string format.
689 ea03467c Iustin Pop

690 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
691 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
692 ea03467c Iustin Pop
    abstract this change.
693 ea03467c Iustin Pop

694 ea03467c Iustin Pop
    @type job_id: int or long
695 ea03467c Iustin Pop
    @param job_id: the numeric job id
696 ea03467c Iustin Pop
    @rtype: str
697 ea03467c Iustin Pop
    @return: the formatted job id
698 ea03467c Iustin Pop

699 ea03467c Iustin Pop
    """
700 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
701 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
702 85f03e0d Michael Hanselmann
    if job_id < 0:
703 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
704 85f03e0d Michael Hanselmann
705 85f03e0d Michael Hanselmann
    return str(job_id)
706 85f03e0d Michael Hanselmann
707 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
708 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
709 f1da30e6 Michael Hanselmann

710 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
711 f1da30e6 Michael Hanselmann

712 ea03467c Iustin Pop
    @rtype: str
713 ea03467c Iustin Pop
    @return: a string representing the job identifier.
714 f1da30e6 Michael Hanselmann

715 f1da30e6 Michael Hanselmann
    """
716 f1da30e6 Michael Hanselmann
    # New number
717 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
718 f1da30e6 Michael Hanselmann
719 f1da30e6 Michael Hanselmann
    # Write to file
720 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
721 23752136 Michael Hanselmann
                                        "%s\n" % serial)
722 f1da30e6 Michael Hanselmann
723 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
724 f1da30e6 Michael Hanselmann
    self._last_serial = serial
725 f1da30e6 Michael Hanselmann
726 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
727 f1da30e6 Michael Hanselmann
728 85f03e0d Michael Hanselmann
  @staticmethod
729 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
730 ea03467c Iustin Pop
    """Returns the job file for a given job id.
731 ea03467c Iustin Pop

732 ea03467c Iustin Pop
    @type job_id: str
733 ea03467c Iustin Pop
    @param job_id: the job identifier
734 ea03467c Iustin Pop
    @rtype: str
735 ea03467c Iustin Pop
    @return: the path to the job file
736 ea03467c Iustin Pop

737 ea03467c Iustin Pop
    """
738 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
739 f1da30e6 Michael Hanselmann
740 85f03e0d Michael Hanselmann
  @staticmethod
741 85f03e0d Michael Hanselmann
  def _GetArchivedJobPath(job_id):
742 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
743 ea03467c Iustin Pop

744 ea03467c Iustin Pop
    @type job_id: str
745 ea03467c Iustin Pop
    @param job_id: the job identifier
746 ea03467c Iustin Pop
    @rtype: str
747 ea03467c Iustin Pop
    @return: the path to the archived job file
748 ea03467c Iustin Pop

749 ea03467c Iustin Pop
    """
750 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
751 0cb94105 Michael Hanselmann
752 85f03e0d Michael Hanselmann
  @classmethod
753 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
754 ea03467c Iustin Pop
    """Extract the job id from a filename.
755 ea03467c Iustin Pop

756 ea03467c Iustin Pop
    @type name: str
757 ea03467c Iustin Pop
    @param name: the job filename
758 ea03467c Iustin Pop
    @rtype: job id or None
759 ea03467c Iustin Pop
    @return: the job id corresponding to the given filename,
760 ea03467c Iustin Pop
        or None if the filename does not represent a valid
761 ea03467c Iustin Pop
        job file
762 ea03467c Iustin Pop

763 ea03467c Iustin Pop
    """
764 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
765 fae737ac Michael Hanselmann
    if m:
766 fae737ac Michael Hanselmann
      return m.group(1)
767 fae737ac Michael Hanselmann
    else:
768 fae737ac Michael Hanselmann
      return None
769 fae737ac Michael Hanselmann
770 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
771 911a495b Iustin Pop
    """Return all known job IDs.
772 911a495b Iustin Pop

773 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
774 911a495b Iustin Pop
    included. Currently this argument is unused.
775 911a495b Iustin Pop

776 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
777 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
778 ac0930b9 Iustin Pop
    extra IDs).
779 ac0930b9 Iustin Pop

780 ea03467c Iustin Pop
    @rtype: list
781 ea03467c Iustin Pop
    @return: the list of job IDs
782 ea03467c Iustin Pop

783 911a495b Iustin Pop
    """
784 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
785 3b87986e Iustin Pop
    jlist = utils.NiceSort(jlist)
786 f0d874fe Iustin Pop
    return jlist
787 911a495b Iustin Pop
788 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
789 ea03467c Iustin Pop
    """Returns the list of current job files.
790 ea03467c Iustin Pop

791 ea03467c Iustin Pop
    @rtype: list
792 ea03467c Iustin Pop
    @return: the list of job file names
793 ea03467c Iustin Pop

794 ea03467c Iustin Pop
    """
795 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
796 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
797 f1da30e6 Michael Hanselmann
798 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
799 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
800 ea03467c Iustin Pop

801 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
802 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
803 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
804 ea03467c Iustin Pop

805 ea03467c Iustin Pop
    @param job_id: the job id
806 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
807 ea03467c Iustin Pop
    @return: either None or the job object
808 ea03467c Iustin Pop

809 ea03467c Iustin Pop
    """
810 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
811 5685c1a5 Michael Hanselmann
    if job:
812 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
813 5685c1a5 Michael Hanselmann
      return job
814 ac0930b9 Iustin Pop
815 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
816 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
817 f1da30e6 Michael Hanselmann
    try:
818 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
819 f1da30e6 Michael Hanselmann
    except IOError, err:
820 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
821 f1da30e6 Michael Hanselmann
        return None
822 f1da30e6 Michael Hanselmann
      raise
823 f1da30e6 Michael Hanselmann
    try:
824 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
825 f1da30e6 Michael Hanselmann
    finally:
826 f1da30e6 Michael Hanselmann
      fd.close()
827 f1da30e6 Michael Hanselmann
828 94ed59a5 Iustin Pop
    try:
829 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
830 94ed59a5 Iustin Pop
    except Exception, err:
831 94ed59a5 Iustin Pop
      new_path = self._GetArchivedJobPath(job_id)
832 94ed59a5 Iustin Pop
      if filepath == new_path:
833 94ed59a5 Iustin Pop
        # job already archived (future case)
834 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s", job_id)
835 94ed59a5 Iustin Pop
      else:
836 94ed59a5 Iustin Pop
        # non-archived case
837 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s, will archive.", job_id)
838 94ed59a5 Iustin Pop
        self._RenameFileUnlocked(filepath, new_path)
839 94ed59a5 Iustin Pop
      return None
840 94ed59a5 Iustin Pop
841 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
842 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
843 ac0930b9 Iustin Pop
    return job
844 f1da30e6 Michael Hanselmann
845 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
846 ea03467c Iustin Pop
    """Return a list of jobs based on their IDs.
847 ea03467c Iustin Pop

848 ea03467c Iustin Pop
    @type job_ids: list
849 ea03467c Iustin Pop
    @param job_ids: either an empty list (meaning all jobs),
850 ea03467c Iustin Pop
        or a list of job IDs
851 ea03467c Iustin Pop
    @rtype: list
852 ea03467c Iustin Pop
    @return: the list of job objects
853 ea03467c Iustin Pop

854 ea03467c Iustin Pop
    """
855 911a495b Iustin Pop
    if not job_ids:
856 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
857 f1da30e6 Michael Hanselmann
858 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
859 f1da30e6 Michael Hanselmann
860 686d7433 Iustin Pop
  @staticmethod
861 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
862 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
863 686d7433 Iustin Pop

864 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
865 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
866 686d7433 Iustin Pop

867 ea03467c Iustin Pop
    @rtype: boolean
868 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
869 ea03467c Iustin Pop

870 686d7433 Iustin Pop
    """
871 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
872 686d7433 Iustin Pop
873 3ccafd0e Iustin Pop
  @staticmethod
874 3ccafd0e Iustin Pop
  def SetDrainFlag(drain_flag):
875 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
876 3ccafd0e Iustin Pop

877 3ccafd0e Iustin Pop
    This is similar to the function L{backend.JobQueueSetDrainFlag},
878 3ccafd0e Iustin Pop
    and in the future we might merge them.
879 3ccafd0e Iustin Pop

880 ea03467c Iustin Pop
    @type drain_flag: boolean
881 ea03467c Iustin Pop
    @param drain_flag: wheter to set or unset the drain flag
882 ea03467c Iustin Pop

883 3ccafd0e Iustin Pop
    """
884 3ccafd0e Iustin Pop
    if drain_flag:
885 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
886 3ccafd0e Iustin Pop
    else:
887 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
888 3ccafd0e Iustin Pop
    return True
889 3ccafd0e Iustin Pop
890 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
891 db37da70 Michael Hanselmann
  @_RequireOpenQueue
892 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
893 85f03e0d Michael Hanselmann
    """Create and store a new job.
894 f1da30e6 Michael Hanselmann

895 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
896 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
897 c3f0a12f Iustin Pop

898 c3f0a12f Iustin Pop
    @type ops: list
899 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
900 ea03467c Iustin Pop
    @rtype: job ID
901 ea03467c Iustin Pop
    @return: the job ID of the newly created job
902 ea03467c Iustin Pop
    @raise errors.JobQueueDrainError: if the job is marked for draining
903 c3f0a12f Iustin Pop

904 c3f0a12f Iustin Pop
    """
905 686d7433 Iustin Pop
    if self._IsQueueMarkedDrain():
906 686d7433 Iustin Pop
      raise errors.JobQueueDrainError()
907 f1da30e6 Michael Hanselmann
    # Get job identifier
908 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
909 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
910 f1da30e6 Michael Hanselmann
911 f1da30e6 Michael Hanselmann
    # Write to disk
912 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
913 f1da30e6 Michael Hanselmann
914 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
915 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
916 ac0930b9 Iustin Pop
917 85f03e0d Michael Hanselmann
    # Add to worker pool
918 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
919 85f03e0d Michael Hanselmann
920 85f03e0d Michael Hanselmann
    return job.id
921 f1da30e6 Michael Hanselmann
922 db37da70 Michael Hanselmann
  @_RequireOpenQueue
923 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
924 ea03467c Iustin Pop
    """Update a job's on disk storage.
925 ea03467c Iustin Pop

926 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
927 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
928 ea03467c Iustin Pop
    nodes.
929 ea03467c Iustin Pop

930 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
931 ea03467c Iustin Pop
    @param job: the changed job
932 ea03467c Iustin Pop

933 ea03467c Iustin Pop
    """
934 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
935 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
936 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
937 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
938 ac0930b9 Iustin Pop
939 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
940 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
941 dfe57c22 Michael Hanselmann
942 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
943 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
944 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
945 5c735209 Iustin Pop
                        timeout):
946 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
947 6c5a7090 Michael Hanselmann

948 6c5a7090 Michael Hanselmann
    @type job_id: string
949 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
950 6c5a7090 Michael Hanselmann
    @type fields: list of strings
951 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
952 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
953 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
954 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
955 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
956 5c735209 Iustin Pop
    @type timeout: float
957 5c735209 Iustin Pop
    @param timeout: maximum time to wait
958 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
959 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
960 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
961 ea03467c Iustin Pop

962 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
963 ea03467c Iustin Pop
        we instead return a special value,
964 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
965 ea03467c Iustin Pop
        as such by the clients
966 6c5a7090 Michael Hanselmann

967 6c5a7090 Michael Hanselmann
    """
968 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
969 5c735209 Iustin Pop
    end_time = time.time() + timeout
970 dfe57c22 Michael Hanselmann
    while True:
971 5c735209 Iustin Pop
      delta_time = end_time - time.time()
972 5c735209 Iustin Pop
      if delta_time < 0:
973 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
974 5c735209 Iustin Pop
975 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
976 6c5a7090 Michael Hanselmann
      if not job:
977 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
978 6c5a7090 Michael Hanselmann
        break
979 dfe57c22 Michael Hanselmann
980 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
981 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
982 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
983 dfe57c22 Michael Hanselmann
984 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
985 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
986 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
987 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
988 dfe57c22 Michael Hanselmann
      # significantly different.
989 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
990 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
991 dfe57c22 Michael Hanselmann
992 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
993 e92376d7 Iustin Pop
                        constants.JOB_STATUS_RUNNING,
994 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
995 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
996 6c5a7090 Michael Hanselmann
        # no changes.
997 dfe57c22 Michael Hanselmann
        break
998 dfe57c22 Michael Hanselmann
999 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
1000 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
1001 6c5a7090 Michael Hanselmann
        break
1002 6c5a7090 Michael Hanselmann
1003 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
1004 6c5a7090 Michael Hanselmann
1005 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
1006 5c735209 Iustin Pop
      job.change.wait(delta_time)
1007 dfe57c22 Michael Hanselmann
1008 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
1009 dfe57c22 Michael Hanselmann
1010 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
1011 dfe57c22 Michael Hanselmann
1012 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1013 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1014 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1015 188c5e0a Michael Hanselmann
    """Cancels a job.
1016 188c5e0a Michael Hanselmann

1017 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1018 ea03467c Iustin Pop

1019 188c5e0a Michael Hanselmann
    @type job_id: string
1020 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1021 188c5e0a Michael Hanselmann

1022 188c5e0a Michael Hanselmann
    """
1023 188c5e0a Michael Hanselmann
    logging.debug("Cancelling job %s", job_id)
1024 188c5e0a Michael Hanselmann
1025 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1026 188c5e0a Michael Hanselmann
    if not job:
1027 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1028 188c5e0a Michael Hanselmann
      return
1029 188c5e0a Michael Hanselmann
1030 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
1031 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
1032 188c5e0a Michael Hanselmann
      return
1033 188c5e0a Michael Hanselmann
1034 85f03e0d Michael Hanselmann
    try:
1035 85f03e0d Michael Hanselmann
      for op in job.ops:
1036 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1037 85f03e0d Michael Hanselmann
        op.result = "Job cancelled by request"
1038 85f03e0d Michael Hanselmann
    finally:
1039 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
1040 188c5e0a Michael Hanselmann
1041 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1042 07cd723a Iustin Pop
  def _ArchiveJobUnlocked(self, job_id):
1043 c609f802 Michael Hanselmann
    """Archives a job.
1044 c609f802 Michael Hanselmann

1045 c609f802 Michael Hanselmann
    @type job_id: string
1046 c609f802 Michael Hanselmann
    @param job_id: Job ID of job to be archived.
1047 c609f802 Michael Hanselmann

1048 c609f802 Michael Hanselmann
    """
1049 07cd723a Iustin Pop
    logging.info("Archiving job %s", job_id)
1050 c609f802 Michael Hanselmann
1051 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1052 c609f802 Michael Hanselmann
    if not job:
1053 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1054 c609f802 Michael Hanselmann
      return
1055 c609f802 Michael Hanselmann
1056 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1057 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_SUCCESS,
1058 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_ERROR):
1059 85f03e0d Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
1060 c609f802 Michael Hanselmann
      return
1061 c609f802 Michael Hanselmann
1062 5685c1a5 Michael Hanselmann
    old = self._GetJobPath(job.id)
1063 5685c1a5 Michael Hanselmann
    new = self._GetArchivedJobPath(job.id)
1064 c609f802 Michael Hanselmann
1065 5685c1a5 Michael Hanselmann
    self._RenameFileUnlocked(old, new)
1066 c609f802 Michael Hanselmann
1067 5685c1a5 Michael Hanselmann
    logging.debug("Successfully archived job %s", job.id)
1068 f1da30e6 Michael Hanselmann
1069 07cd723a Iustin Pop
  @utils.LockedMethod
1070 07cd723a Iustin Pop
  @_RequireOpenQueue
1071 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1072 07cd723a Iustin Pop
    """Archives a job.
1073 07cd723a Iustin Pop

1074 ea03467c Iustin Pop
    This is just a wrapper over L{_ArchiveJobUnlocked}.
1075 ea03467c Iustin Pop

1076 07cd723a Iustin Pop
    @type job_id: string
1077 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1078 07cd723a Iustin Pop

1079 07cd723a Iustin Pop
    """
1080 07cd723a Iustin Pop
    return self._ArchiveJobUnlocked(job_id)
1081 07cd723a Iustin Pop
1082 07cd723a Iustin Pop
  @utils.LockedMethod
1083 07cd723a Iustin Pop
  @_RequireOpenQueue
1084 07cd723a Iustin Pop
  def AutoArchiveJobs(self, age):
1085 07cd723a Iustin Pop
    """Archives all jobs based on age.
1086 07cd723a Iustin Pop

1087 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1088 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1089 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1090 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1091 07cd723a Iustin Pop

1092 07cd723a Iustin Pop
    @type age: int
1093 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1094 07cd723a Iustin Pop

1095 07cd723a Iustin Pop
    """
1096 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1097 07cd723a Iustin Pop
1098 07cd723a Iustin Pop
    now = time.time()
1099 07cd723a Iustin Pop
    for jid in self._GetJobIDsUnlocked(archived=False):
1100 07cd723a Iustin Pop
      job = self._LoadJobUnlocked(jid)
1101 07cd723a Iustin Pop
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
1102 07cd723a Iustin Pop
                                  constants.OP_STATUS_ERROR,
1103 07cd723a Iustin Pop
                                  constants.OP_STATUS_CANCELED):
1104 07cd723a Iustin Pop
        continue
1105 07cd723a Iustin Pop
      if job.end_timestamp is None:
1106 07cd723a Iustin Pop
        if job.start_timestamp is None:
1107 07cd723a Iustin Pop
          job_age = job.received_timestamp
1108 07cd723a Iustin Pop
        else:
1109 07cd723a Iustin Pop
          job_age = job.start_timestamp
1110 07cd723a Iustin Pop
      else:
1111 07cd723a Iustin Pop
        job_age = job.end_timestamp
1112 07cd723a Iustin Pop
1113 07cd723a Iustin Pop
      if age == -1 or now - job_age[0] > age:
1114 07cd723a Iustin Pop
        self._ArchiveJobUnlocked(jid)
1115 07cd723a Iustin Pop
1116 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
1117 ea03467c Iustin Pop
    """Returns information about a job.
1118 ea03467c Iustin Pop

1119 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1120 ea03467c Iustin Pop
    @param job: the job which we query
1121 ea03467c Iustin Pop
    @type fields: list
1122 ea03467c Iustin Pop
    @param fields: names of fields to return
1123 ea03467c Iustin Pop
    @rtype: list
1124 ea03467c Iustin Pop
    @return: list with one element for each field
1125 ea03467c Iustin Pop
    @raise errors.OpExecError: when an invalid field
1126 ea03467c Iustin Pop
        has been passed
1127 ea03467c Iustin Pop

1128 ea03467c Iustin Pop
    """
1129 e2715f69 Michael Hanselmann
    row = []
1130 e2715f69 Michael Hanselmann
    for fname in fields:
1131 e2715f69 Michael Hanselmann
      if fname == "id":
1132 e2715f69 Michael Hanselmann
        row.append(job.id)
1133 e2715f69 Michael Hanselmann
      elif fname == "status":
1134 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
1135 af30b2fd Michael Hanselmann
      elif fname == "ops":
1136 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
1137 af30b2fd Michael Hanselmann
      elif fname == "opresult":
1138 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
1139 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
1140 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
1141 5b23c34c Iustin Pop
      elif fname == "oplog":
1142 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
1143 c56ec146 Iustin Pop
      elif fname == "opstart":
1144 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
1145 c56ec146 Iustin Pop
      elif fname == "opend":
1146 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
1147 c56ec146 Iustin Pop
      elif fname == "received_ts":
1148 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
1149 c56ec146 Iustin Pop
      elif fname == "start_ts":
1150 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
1151 c56ec146 Iustin Pop
      elif fname == "end_ts":
1152 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
1153 60dd1473 Iustin Pop
      elif fname == "summary":
1154 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
1155 e2715f69 Michael Hanselmann
      else:
1156 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1157 e2715f69 Michael Hanselmann
    return row
1158 e2715f69 Michael Hanselmann
1159 85f03e0d Michael Hanselmann
  @utils.LockedMethod
1160 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1161 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1162 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1163 e2715f69 Michael Hanselmann

1164 ea03467c Iustin Pop
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1165 ea03467c Iustin Pop
    processing for each job.
1166 ea03467c Iustin Pop

1167 ea03467c Iustin Pop
    @type job_ids: list
1168 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1169 ea03467c Iustin Pop
    @type fields: list
1170 ea03467c Iustin Pop
    @param fields: names of fields to return
1171 ea03467c Iustin Pop
    @rtype: list
1172 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1173 ea03467c Iustin Pop
        the requested fields
1174 e2715f69 Michael Hanselmann

1175 e2715f69 Michael Hanselmann
    """
1176 85f03e0d Michael Hanselmann
    jobs = []
1177 e2715f69 Michael Hanselmann
1178 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
1179 85f03e0d Michael Hanselmann
      if job is None:
1180 85f03e0d Michael Hanselmann
        jobs.append(None)
1181 85f03e0d Michael Hanselmann
      else:
1182 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1183 e2715f69 Michael Hanselmann
1184 85f03e0d Michael Hanselmann
    return jobs
1185 e2715f69 Michael Hanselmann
1186 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1187 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1188 e2715f69 Michael Hanselmann
  def Shutdown(self):
1189 e2715f69 Michael Hanselmann
    """Stops the job queue.
1190 e2715f69 Michael Hanselmann

1191 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1192 ea03467c Iustin Pop

1193 e2715f69 Michael Hanselmann
    """
1194 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1195 85f03e0d Michael Hanselmann
1196 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
1197 04ab05ce Michael Hanselmann
    self._queue_lock = None