Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ b7cb9024

History | View | Annotate | Download (35.8 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 e2715f69 Michael Hanselmann
import threading
35 f1da30e6 Michael Hanselmann
import errno
36 f1da30e6 Michael Hanselmann
import re
37 f1048938 Iustin Pop
import time
38 5685c1a5 Michael Hanselmann
import weakref
39 498ae1cc Iustin Pop
40 e2715f69 Michael Hanselmann
from ganeti import constants
41 f1da30e6 Michael Hanselmann
from ganeti import serializer
42 e2715f69 Michael Hanselmann
from ganeti import workerpool
43 f1da30e6 Michael Hanselmann
from ganeti import opcodes
44 7a1ecaed Iustin Pop
from ganeti import errors
45 e2715f69 Michael Hanselmann
from ganeti import mcpu
46 7996a135 Iustin Pop
from ganeti import utils
47 04ab05ce Michael Hanselmann
from ganeti import jstore
48 c3f0a12f Iustin Pop
from ganeti import rpc
49 e2715f69 Michael Hanselmann
50 fbf0262f Michael Hanselmann
51 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
52 e2715f69 Michael Hanselmann
53 498ae1cc Iustin Pop
54 fbf0262f Michael Hanselmann
class CancelJob:
55 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
56 fbf0262f Michael Hanselmann

57 fbf0262f Michael Hanselmann
  """
58 fbf0262f Michael Hanselmann
59 fbf0262f Michael Hanselmann
60 70552c46 Michael Hanselmann
def TimeStampNow():
61 ea03467c Iustin Pop
  """Returns the current timestamp.
62 ea03467c Iustin Pop

63 ea03467c Iustin Pop
  @rtype: tuple
64 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
65 ea03467c Iustin Pop

66 ea03467c Iustin Pop
  """
67 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
68 70552c46 Michael Hanselmann
69 70552c46 Michael Hanselmann
70 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
71 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
72 e2715f69 Michael Hanselmann

73 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
74 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
75 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
76 ea03467c Iustin Pop
  @ivar status: the current status
77 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
78 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
79 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
80 f1048938 Iustin Pop

81 e2715f69 Michael Hanselmann
  """
82 85f03e0d Michael Hanselmann
  def __init__(self, op):
83 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
84 ea03467c Iustin Pop

85 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
86 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
87 ea03467c Iustin Pop

88 ea03467c Iustin Pop
    """
89 85f03e0d Michael Hanselmann
    self.input = op
90 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
91 85f03e0d Michael Hanselmann
    self.result = None
92 85f03e0d Michael Hanselmann
    self.log = []
93 70552c46 Michael Hanselmann
    self.start_timestamp = None
94 70552c46 Michael Hanselmann
    self.end_timestamp = None
95 f1da30e6 Michael Hanselmann
96 f1da30e6 Michael Hanselmann
  @classmethod
97 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
98 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
99 ea03467c Iustin Pop

100 ea03467c Iustin Pop
    @type state: dict
101 ea03467c Iustin Pop
    @param state: the serialized state
102 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
103 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
104 ea03467c Iustin Pop

105 ea03467c Iustin Pop
    """
106 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
107 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
108 85f03e0d Michael Hanselmann
    obj.status = state["status"]
109 85f03e0d Michael Hanselmann
    obj.result = state["result"]
110 85f03e0d Michael Hanselmann
    obj.log = state["log"]
111 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
112 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
113 f1da30e6 Michael Hanselmann
    return obj
114 f1da30e6 Michael Hanselmann
115 f1da30e6 Michael Hanselmann
  def Serialize(self):
116 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
117 ea03467c Iustin Pop

118 ea03467c Iustin Pop
    @rtype: dict
119 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
120 ea03467c Iustin Pop

121 ea03467c Iustin Pop
    """
122 6c5a7090 Michael Hanselmann
    return {
123 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
124 6c5a7090 Michael Hanselmann
      "status": self.status,
125 6c5a7090 Michael Hanselmann
      "result": self.result,
126 6c5a7090 Michael Hanselmann
      "log": self.log,
127 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
128 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
129 6c5a7090 Michael Hanselmann
      }
130 f1048938 Iustin Pop
131 e2715f69 Michael Hanselmann
132 e2715f69 Michael Hanselmann
class _QueuedJob(object):
133 e2715f69 Michael Hanselmann
  """In-memory job representation.
134 e2715f69 Michael Hanselmann

135 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
136 ea03467c Iustin Pop
  be taken care of by users of this class.
137 ea03467c Iustin Pop

138 ea03467c Iustin Pop
  @type queue: L{JobQueue}
139 ea03467c Iustin Pop
  @ivar queue: the parent queue
140 ea03467c Iustin Pop
  @ivar id: the job ID
141 ea03467c Iustin Pop
  @type ops: list
142 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
143 ea03467c Iustin Pop
  @type run_op_index: int
144 ea03467c Iustin Pop
  @ivar run_op_index: the currently executing opcode, or -1 if
145 ea03467c Iustin Pop
      we didn't yet start executing
146 ea03467c Iustin Pop
  @type log_serial: int
147 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
148 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
149 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
150 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
151 ea03467c Iustin Pop
  @ivar change: a Condition variable we use for waiting for job changes
152 e2715f69 Michael Hanselmann

153 e2715f69 Michael Hanselmann
  """
154 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
155 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
156 ea03467c Iustin Pop

157 ea03467c Iustin Pop
    @type queue: L{JobQueue}
158 ea03467c Iustin Pop
    @param queue: our parent queue
159 ea03467c Iustin Pop
    @type job_id: job_id
160 ea03467c Iustin Pop
    @param job_id: our job id
161 ea03467c Iustin Pop
    @type ops: list
162 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
163 ea03467c Iustin Pop
        in _QueuedOpCodes
164 ea03467c Iustin Pop

165 ea03467c Iustin Pop
    """
166 e2715f69 Michael Hanselmann
    if not ops:
167 ea03467c Iustin Pop
      # TODO: use a better exception
168 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
169 e2715f69 Michael Hanselmann
170 85f03e0d Michael Hanselmann
    self.queue = queue
171 f1da30e6 Michael Hanselmann
    self.id = job_id
172 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
173 85f03e0d Michael Hanselmann
    self.run_op_index = -1
174 6c5a7090 Michael Hanselmann
    self.log_serial = 0
175 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
176 c56ec146 Iustin Pop
    self.start_timestamp = None
177 c56ec146 Iustin Pop
    self.end_timestamp = None
178 6c5a7090 Michael Hanselmann
179 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
180 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
181 f1da30e6 Michael Hanselmann
182 f1da30e6 Michael Hanselmann
  @classmethod
183 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
184 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
185 ea03467c Iustin Pop

186 ea03467c Iustin Pop
    @type queue: L{JobQueue}
187 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
188 ea03467c Iustin Pop
    @type state: dict
189 ea03467c Iustin Pop
    @param state: the serialized state
190 ea03467c Iustin Pop
    @rtype: _JobQueue
191 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
192 ea03467c Iustin Pop

193 ea03467c Iustin Pop
    """
194 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
195 85f03e0d Michael Hanselmann
    obj.queue = queue
196 85f03e0d Michael Hanselmann
    obj.id = state["id"]
197 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
198 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
199 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
200 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
201 6c5a7090 Michael Hanselmann
202 6c5a7090 Michael Hanselmann
    obj.ops = []
203 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
204 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
205 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
206 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
207 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
208 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
209 6c5a7090 Michael Hanselmann
210 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
211 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
212 6c5a7090 Michael Hanselmann
213 f1da30e6 Michael Hanselmann
    return obj
214 f1da30e6 Michael Hanselmann
215 f1da30e6 Michael Hanselmann
  def Serialize(self):
216 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
217 ea03467c Iustin Pop

218 ea03467c Iustin Pop
    @rtype: dict
219 ea03467c Iustin Pop
    @return: the serialized state
220 ea03467c Iustin Pop

221 ea03467c Iustin Pop
    """
222 f1da30e6 Michael Hanselmann
    return {
223 f1da30e6 Michael Hanselmann
      "id": self.id,
224 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
225 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
226 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
227 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
228 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
229 f1da30e6 Michael Hanselmann
      }
230 f1da30e6 Michael Hanselmann
231 85f03e0d Michael Hanselmann
  def CalcStatus(self):
232 ea03467c Iustin Pop
    """Compute the status of this job.
233 ea03467c Iustin Pop

234 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
235 ea03467c Iustin Pop
    based on their status, computes the job status.
236 ea03467c Iustin Pop

237 ea03467c Iustin Pop
    The algorithm is:
238 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
239 ea03467c Iustin Pop
        status will be the same
240 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
241 ea03467c Iustin Pop
          - waitlock
242 fbf0262f Michael Hanselmann
          - canceling
243 ea03467c Iustin Pop
          - running
244 ea03467c Iustin Pop

245 ea03467c Iustin Pop
        will determine the job status
246 ea03467c Iustin Pop

247 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
248 ea03467c Iustin Pop
        and the job status will be the same
249 ea03467c Iustin Pop

250 ea03467c Iustin Pop
    @return: the job status
251 ea03467c Iustin Pop

252 ea03467c Iustin Pop
    """
253 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
254 e2715f69 Michael Hanselmann
255 e2715f69 Michael Hanselmann
    all_success = True
256 85f03e0d Michael Hanselmann
    for op in self.ops:
257 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
258 e2715f69 Michael Hanselmann
        continue
259 e2715f69 Michael Hanselmann
260 e2715f69 Michael Hanselmann
      all_success = False
261 e2715f69 Michael Hanselmann
262 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
263 e2715f69 Michael Hanselmann
        pass
264 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
265 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
266 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
267 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
268 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
269 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
270 fbf0262f Michael Hanselmann
        break
271 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
272 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
273 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
274 f1da30e6 Michael Hanselmann
        break
275 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
276 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
277 4cb1d919 Michael Hanselmann
        break
278 e2715f69 Michael Hanselmann
279 e2715f69 Michael Hanselmann
    if all_success:
280 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
281 e2715f69 Michael Hanselmann
282 e2715f69 Michael Hanselmann
    return status
283 e2715f69 Michael Hanselmann
284 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
285 ea03467c Iustin Pop
    """Selectively returns the log entries.
286 ea03467c Iustin Pop

287 ea03467c Iustin Pop
    @type newer_than: None or int
288 ea03467c Iustin Pop
    @param newer_than: if this is None, return all log enties,
289 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
290 ea03467c Iustin Pop
        than this value
291 ea03467c Iustin Pop
    @rtype: list
292 ea03467c Iustin Pop
    @return: the list of the log entries selected
293 ea03467c Iustin Pop

294 ea03467c Iustin Pop
    """
295 6c5a7090 Michael Hanselmann
    if newer_than is None:
296 6c5a7090 Michael Hanselmann
      serial = -1
297 6c5a7090 Michael Hanselmann
    else:
298 6c5a7090 Michael Hanselmann
      serial = newer_than
299 6c5a7090 Michael Hanselmann
300 6c5a7090 Michael Hanselmann
    entries = []
301 6c5a7090 Michael Hanselmann
    for op in self.ops:
302 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
303 6c5a7090 Michael Hanselmann
304 6c5a7090 Michael Hanselmann
    return entries
305 6c5a7090 Michael Hanselmann
306 f1048938 Iustin Pop
307 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
308 ea03467c Iustin Pop
  """The actual job workers.
309 ea03467c Iustin Pop

310 ea03467c Iustin Pop
  """
311 e92376d7 Iustin Pop
  def _NotifyStart(self):
312 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
313 e92376d7 Iustin Pop

314 e92376d7 Iustin Pop
    This is called from the mcpu code as a notifier function, when the
315 e92376d7 Iustin Pop
    LU is finally about to start the Exec() method. Of course, to have
316 e92376d7 Iustin Pop
    end-user visible results, the opcode must be initially (before
317 e92376d7 Iustin Pop
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
318 e92376d7 Iustin Pop

319 e92376d7 Iustin Pop
    """
320 e92376d7 Iustin Pop
    assert self.queue, "Queue attribute is missing"
321 e92376d7 Iustin Pop
    assert self.opcode, "Opcode attribute is missing"
322 e92376d7 Iustin Pop
323 e92376d7 Iustin Pop
    self.queue.acquire()
324 e92376d7 Iustin Pop
    try:
325 fbf0262f Michael Hanselmann
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
326 fbf0262f Michael Hanselmann
                                    constants.OP_STATUS_CANCELING)
327 fbf0262f Michael Hanselmann
328 fbf0262f Michael Hanselmann
      # Cancel here if we were asked to
329 fbf0262f Michael Hanselmann
      if self.opcode.status == constants.OP_STATUS_CANCELING:
330 fbf0262f Michael Hanselmann
        raise CancelJob()
331 fbf0262f Michael Hanselmann
332 e92376d7 Iustin Pop
      self.opcode.status = constants.OP_STATUS_RUNNING
333 e92376d7 Iustin Pop
    finally:
334 e92376d7 Iustin Pop
      self.queue.release()
335 e92376d7 Iustin Pop
336 85f03e0d Michael Hanselmann
  def RunTask(self, job):
337 e2715f69 Michael Hanselmann
    """Job executor.
338 e2715f69 Michael Hanselmann

339 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
340 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
341 e2715f69 Michael Hanselmann

342 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
343 ea03467c Iustin Pop
    @param job: the job to be processed
344 ea03467c Iustin Pop

345 e2715f69 Michael Hanselmann
    """
346 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
347 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
348 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
349 e92376d7 Iustin Pop
    self.queue = queue = job.queue
350 e2715f69 Michael Hanselmann
    try:
351 85f03e0d Michael Hanselmann
      try:
352 85f03e0d Michael Hanselmann
        count = len(job.ops)
353 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
354 85f03e0d Michael Hanselmann
          try:
355 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
356 85f03e0d Michael Hanselmann
357 85f03e0d Michael Hanselmann
            queue.acquire()
358 85f03e0d Michael Hanselmann
            try:
359 fbf0262f Michael Hanselmann
              assert op.status == constants.OP_STATUS_QUEUED
360 85f03e0d Michael Hanselmann
              job.run_op_index = idx
361 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
362 85f03e0d Michael Hanselmann
              op.result = None
363 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
364 c56ec146 Iustin Pop
              if idx == 0: # first opcode
365 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
366 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
367 85f03e0d Michael Hanselmann
368 38206f3c Iustin Pop
              input_opcode = op.input
369 85f03e0d Michael Hanselmann
            finally:
370 85f03e0d Michael Hanselmann
              queue.release()
371 85f03e0d Michael Hanselmann
372 dfe57c22 Michael Hanselmann
            def _Log(*args):
373 6c5a7090 Michael Hanselmann
              """Append a log entry.
374 6c5a7090 Michael Hanselmann

375 6c5a7090 Michael Hanselmann
              """
376 6c5a7090 Michael Hanselmann
              assert len(args) < 3
377 6c5a7090 Michael Hanselmann
378 6c5a7090 Michael Hanselmann
              if len(args) == 1:
379 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
380 6c5a7090 Michael Hanselmann
                log_msg = args[0]
381 6c5a7090 Michael Hanselmann
              else:
382 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
383 6c5a7090 Michael Hanselmann
384 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
385 6c5a7090 Michael Hanselmann
              # precision.
386 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
387 dfe57c22 Michael Hanselmann
388 6c5a7090 Michael Hanselmann
              queue.acquire()
389 dfe57c22 Michael Hanselmann
              try:
390 6c5a7090 Michael Hanselmann
                job.log_serial += 1
391 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
392 6c5a7090 Michael Hanselmann
393 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
394 dfe57c22 Michael Hanselmann
              finally:
395 6c5a7090 Michael Hanselmann
                queue.release()
396 dfe57c22 Michael Hanselmann
397 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
398 e92376d7 Iustin Pop
            self.opcode = op
399 e92376d7 Iustin Pop
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
400 85f03e0d Michael Hanselmann
401 85f03e0d Michael Hanselmann
            queue.acquire()
402 85f03e0d Michael Hanselmann
            try:
403 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
404 85f03e0d Michael Hanselmann
              op.result = result
405 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
406 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
407 85f03e0d Michael Hanselmann
            finally:
408 85f03e0d Michael Hanselmann
              queue.release()
409 85f03e0d Michael Hanselmann
410 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
411 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
412 fbf0262f Michael Hanselmann
          except CancelJob:
413 fbf0262f Michael Hanselmann
            # Will be handled further up
414 fbf0262f Michael Hanselmann
            raise
415 85f03e0d Michael Hanselmann
          except Exception, err:
416 85f03e0d Michael Hanselmann
            queue.acquire()
417 85f03e0d Michael Hanselmann
            try:
418 85f03e0d Michael Hanselmann
              try:
419 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
420 85f03e0d Michael Hanselmann
                op.result = str(err)
421 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
422 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
423 85f03e0d Michael Hanselmann
              finally:
424 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
425 85f03e0d Michael Hanselmann
            finally:
426 85f03e0d Michael Hanselmann
              queue.release()
427 85f03e0d Michael Hanselmann
            raise
428 85f03e0d Michael Hanselmann
429 fbf0262f Michael Hanselmann
      except CancelJob:
430 fbf0262f Michael Hanselmann
        queue.acquire()
431 fbf0262f Michael Hanselmann
        try:
432 fbf0262f Michael Hanselmann
          queue.CancelJobUnlocked(job)
433 fbf0262f Michael Hanselmann
        finally:
434 fbf0262f Michael Hanselmann
          queue.release()
435 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
436 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
437 85f03e0d Michael Hanselmann
      except:
438 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
439 e2715f69 Michael Hanselmann
    finally:
440 85f03e0d Michael Hanselmann
      queue.acquire()
441 85f03e0d Michael Hanselmann
      try:
442 65548ed5 Michael Hanselmann
        try:
443 65548ed5 Michael Hanselmann
          job.run_op_idx = -1
444 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
445 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
446 65548ed5 Michael Hanselmann
        finally:
447 65548ed5 Michael Hanselmann
          job_id = job.id
448 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
449 85f03e0d Michael Hanselmann
      finally:
450 85f03e0d Michael Hanselmann
        queue.release()
451 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
452 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
453 e2715f69 Michael Hanselmann
454 e2715f69 Michael Hanselmann
455 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
456 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
457 ea03467c Iustin Pop

458 ea03467c Iustin Pop
  """
459 5bdce580 Michael Hanselmann
  def __init__(self, queue):
460 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
461 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
462 5bdce580 Michael Hanselmann
    self.queue = queue
463 e2715f69 Michael Hanselmann
464 e2715f69 Michael Hanselmann
465 85f03e0d Michael Hanselmann
class JobQueue(object):
466 ea03467c Iustin Pop
  """Quue used to manaage the jobs.
467 ea03467c Iustin Pop

468 ea03467c Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
469 ea03467c Iustin Pop

470 ea03467c Iustin Pop
  """
471 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
472 f1da30e6 Michael Hanselmann
473 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
474 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
475 db37da70 Michael Hanselmann

476 ea03467c Iustin Pop
    This function should be used for all 'public' functions. That is,
477 ea03467c Iustin Pop
    functions usually called from other classes.
478 db37da70 Michael Hanselmann

479 ea03467c Iustin Pop
    @warning: Use this decorator only after utils.LockedMethod!
480 db37da70 Michael Hanselmann

481 ea03467c Iustin Pop
    Example::
482 db37da70 Michael Hanselmann
      @utils.LockedMethod
483 db37da70 Michael Hanselmann
      @_RequireOpenQueue
484 db37da70 Michael Hanselmann
      def Example(self):
485 db37da70 Michael Hanselmann
        pass
486 db37da70 Michael Hanselmann

487 db37da70 Michael Hanselmann
    """
488 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
489 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
490 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
491 db37da70 Michael Hanselmann
    return wrapper
492 db37da70 Michael Hanselmann
493 85f03e0d Michael Hanselmann
  def __init__(self, context):
494 ea03467c Iustin Pop
    """Constructor for JobQueue.
495 ea03467c Iustin Pop

496 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
497 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
498 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
499 ea03467c Iustin Pop
    running).
500 ea03467c Iustin Pop

501 ea03467c Iustin Pop
    @type context: GanetiContext
502 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
503 ea03467c Iustin Pop
        data and other ganeti objects
504 ea03467c Iustin Pop

505 ea03467c Iustin Pop
    """
506 5bdce580 Michael Hanselmann
    self.context = context
507 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
508 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
509 f1da30e6 Michael Hanselmann
510 85f03e0d Michael Hanselmann
    # Locking
511 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
512 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
513 85f03e0d Michael Hanselmann
    self.release = self._lock.release
514 85f03e0d Michael Hanselmann
515 04ab05ce Michael Hanselmann
    # Initialize
516 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
517 f1da30e6 Michael Hanselmann
518 04ab05ce Michael Hanselmann
    # Read serial file
519 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
520 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
521 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
522 c4beba1c Iustin Pop
523 23752136 Michael Hanselmann
    # Get initial list of nodes
524 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
525 99aabbed Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values())
526 8e00939c Michael Hanselmann
527 8e00939c Michael Hanselmann
    # Remove master node
528 8e00939c Michael Hanselmann
    try:
529 99aabbed Iustin Pop
      del self._nodes[self._my_hostname]
530 33987705 Iustin Pop
    except KeyError:
531 8e00939c Michael Hanselmann
      pass
532 23752136 Michael Hanselmann
533 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
534 23752136 Michael Hanselmann
535 85f03e0d Michael Hanselmann
    # Setup worker pool
536 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
537 85f03e0d Michael Hanselmann
    try:
538 16714921 Michael Hanselmann
      # We need to lock here because WorkerPool.AddTask() may start a job while
539 16714921 Michael Hanselmann
      # we're still doing our work.
540 16714921 Michael Hanselmann
      self.acquire()
541 16714921 Michael Hanselmann
      try:
542 711b5124 Michael Hanselmann
        logging.info("Inspecting job queue")
543 711b5124 Michael Hanselmann
544 711b5124 Michael Hanselmann
        all_job_ids = self._GetJobIDsUnlocked()
545 b7cb9024 Michael Hanselmann
        jobs_count = len(all_job_ids)
546 711b5124 Michael Hanselmann
        lastinfo = time.time()
547 711b5124 Michael Hanselmann
        for idx, job_id in enumerate(all_job_ids):
548 711b5124 Michael Hanselmann
          # Give an update every 1000 jobs or 10 seconds
549 b7cb9024 Michael Hanselmann
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
550 b7cb9024 Michael Hanselmann
              idx == (jobs_count - 1)):
551 711b5124 Michael Hanselmann
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
552 b7cb9024 Michael Hanselmann
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
553 711b5124 Michael Hanselmann
            lastinfo = time.time()
554 711b5124 Michael Hanselmann
555 711b5124 Michael Hanselmann
          job = self._LoadJobUnlocked(job_id)
556 711b5124 Michael Hanselmann
557 16714921 Michael Hanselmann
          # a failure in loading the job can cause 'None' to be returned
558 16714921 Michael Hanselmann
          if job is None:
559 16714921 Michael Hanselmann
            continue
560 94ed59a5 Iustin Pop
561 16714921 Michael Hanselmann
          status = job.CalcStatus()
562 85f03e0d Michael Hanselmann
563 16714921 Michael Hanselmann
          if status in (constants.JOB_STATUS_QUEUED, ):
564 16714921 Michael Hanselmann
            self._wpool.AddTask(job)
565 85f03e0d Michael Hanselmann
566 16714921 Michael Hanselmann
          elif status in (constants.JOB_STATUS_RUNNING,
567 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK,
568 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_CANCELING):
569 16714921 Michael Hanselmann
            logging.warning("Unfinished job %s found: %s", job.id, job)
570 16714921 Michael Hanselmann
            try:
571 16714921 Michael Hanselmann
              for op in job.ops:
572 16714921 Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
573 16714921 Michael Hanselmann
                op.result = "Unclean master daemon shutdown"
574 16714921 Michael Hanselmann
            finally:
575 16714921 Michael Hanselmann
              self.UpdateJobUnlocked(job)
576 711b5124 Michael Hanselmann
577 711b5124 Michael Hanselmann
        logging.info("Job queue inspection finished")
578 16714921 Michael Hanselmann
      finally:
579 16714921 Michael Hanselmann
        self.release()
580 16714921 Michael Hanselmann
    except:
581 16714921 Michael Hanselmann
      self._wpool.TerminateWorkers()
582 16714921 Michael Hanselmann
      raise
583 85f03e0d Michael Hanselmann
584 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
585 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
586 99aabbed Iustin Pop
  def AddNode(self, node):
587 99aabbed Iustin Pop
    """Register a new node with the queue.
588 99aabbed Iustin Pop

589 99aabbed Iustin Pop
    @type node: L{objects.Node}
590 99aabbed Iustin Pop
    @param node: the node object to be added
591 99aabbed Iustin Pop

592 99aabbed Iustin Pop
    """
593 99aabbed Iustin Pop
    node_name = node.name
594 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
595 23752136 Michael Hanselmann
596 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
597 a3811745 Michael Hanselmann
    rpc.RpcRunner.call_jobqueue_purge(node_name)
598 23752136 Michael Hanselmann
599 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
600 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
601 23752136 Michael Hanselmann
602 d2e03a33 Michael Hanselmann
    # Upload current serial file
603 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
604 d2e03a33 Michael Hanselmann
605 d2e03a33 Michael Hanselmann
    for file_name in files:
606 9f774ee8 Michael Hanselmann
      # Read file content
607 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
608 9f774ee8 Michael Hanselmann
      try:
609 9f774ee8 Michael Hanselmann
        content = fd.read()
610 9f774ee8 Michael Hanselmann
      finally:
611 9f774ee8 Michael Hanselmann
        fd.close()
612 9f774ee8 Michael Hanselmann
613 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
614 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
615 a3811745 Michael Hanselmann
                                                  file_name, content)
616 d2e03a33 Michael Hanselmann
      if not result[node_name]:
617 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
618 d2e03a33 Michael Hanselmann
619 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
620 d2e03a33 Michael Hanselmann
621 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
622 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
623 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
624 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
625 ea03467c Iustin Pop

626 ea03467c Iustin Pop
    @type node_name: str
627 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
628 ea03467c Iustin Pop

629 ea03467c Iustin Pop
    """
630 23752136 Michael Hanselmann
    try:
631 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
632 99aabbed Iustin Pop
      del self._nodes[node_name]
633 d2e03a33 Michael Hanselmann
    except KeyError:
634 23752136 Michael Hanselmann
      pass
635 23752136 Michael Hanselmann
636 e74798c1 Michael Hanselmann
  def _CheckRpcResult(self, result, nodes, failmsg):
637 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
638 ea03467c Iustin Pop

639 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
640 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
641 ea03467c Iustin Pop
    log the case when more than half of the nodes failes.
642 ea03467c Iustin Pop

643 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
644 ea03467c Iustin Pop
    @type nodes: list
645 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
646 ea03467c Iustin Pop
    @type failmsg: str
647 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
648 ea03467c Iustin Pop

649 ea03467c Iustin Pop
    """
650 e74798c1 Michael Hanselmann
    failed = []
651 e74798c1 Michael Hanselmann
    success = []
652 e74798c1 Michael Hanselmann
653 e74798c1 Michael Hanselmann
    for node in nodes:
654 e74798c1 Michael Hanselmann
      if result[node]:
655 e74798c1 Michael Hanselmann
        success.append(node)
656 e74798c1 Michael Hanselmann
      else:
657 e74798c1 Michael Hanselmann
        failed.append(node)
658 e74798c1 Michael Hanselmann
659 e74798c1 Michael Hanselmann
    if failed:
660 e74798c1 Michael Hanselmann
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
661 e74798c1 Michael Hanselmann
662 e74798c1 Michael Hanselmann
    # +1 for the master node
663 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
664 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
665 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
666 e74798c1 Michael Hanselmann
667 99aabbed Iustin Pop
  def _GetNodeIp(self):
668 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
669 99aabbed Iustin Pop

670 ea03467c Iustin Pop
    @rtype: (list, list)
671 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
672 ea03467c Iustin Pop
        names and the second one with the node addresses
673 ea03467c Iustin Pop

674 99aabbed Iustin Pop
    """
675 99aabbed Iustin Pop
    name_list = self._nodes.keys()
676 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
677 99aabbed Iustin Pop
    return name_list, addr_list
678 99aabbed Iustin Pop
679 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
680 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
681 8e00939c Michael Hanselmann

682 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
683 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
684 ea03467c Iustin Pop

685 ea03467c Iustin Pop
    @type file_name: str
686 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
687 ea03467c Iustin Pop
    @type data: str
688 ea03467c Iustin Pop
    @param data: the new contents of the file
689 ea03467c Iustin Pop

690 8e00939c Michael Hanselmann
    """
691 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
692 8e00939c Michael Hanselmann
693 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
694 a3811745 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
695 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
696 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
697 23752136 Michael Hanselmann
698 abc1f2ce Michael Hanselmann
  def _RenameFileUnlocked(self, old, new):
699 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
700 ea03467c Iustin Pop

701 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
702 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
703 ea03467c Iustin Pop

704 ea03467c Iustin Pop
    @type old: str
705 ea03467c Iustin Pop
    @param old: the current name of the file
706 ea03467c Iustin Pop
    @type new: str
707 ea03467c Iustin Pop
    @param new: the new name of the file
708 ea03467c Iustin Pop

709 ea03467c Iustin Pop
    """
710 abc1f2ce Michael Hanselmann
    os.rename(old, new)
711 abc1f2ce Michael Hanselmann
712 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
713 a3811745 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
714 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
715 e74798c1 Michael Hanselmann
                         "Moving %s to %s" % (old, new))
716 abc1f2ce Michael Hanselmann
717 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
718 ea03467c Iustin Pop
    """Convert a job ID to string format.
719 ea03467c Iustin Pop

720 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
721 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
722 ea03467c Iustin Pop
    abstract this change.
723 ea03467c Iustin Pop

724 ea03467c Iustin Pop
    @type job_id: int or long
725 ea03467c Iustin Pop
    @param job_id: the numeric job id
726 ea03467c Iustin Pop
    @rtype: str
727 ea03467c Iustin Pop
    @return: the formatted job id
728 ea03467c Iustin Pop

729 ea03467c Iustin Pop
    """
730 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
731 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
732 85f03e0d Michael Hanselmann
    if job_id < 0:
733 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
734 85f03e0d Michael Hanselmann
735 85f03e0d Michael Hanselmann
    return str(job_id)
736 85f03e0d Michael Hanselmann
737 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
738 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
739 f1da30e6 Michael Hanselmann

740 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
741 f1da30e6 Michael Hanselmann

742 ea03467c Iustin Pop
    @rtype: str
743 ea03467c Iustin Pop
    @return: a string representing the job identifier.
744 f1da30e6 Michael Hanselmann

745 f1da30e6 Michael Hanselmann
    """
746 f1da30e6 Michael Hanselmann
    # New number
747 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
748 f1da30e6 Michael Hanselmann
749 f1da30e6 Michael Hanselmann
    # Write to file
750 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
751 23752136 Michael Hanselmann
                                        "%s\n" % serial)
752 f1da30e6 Michael Hanselmann
753 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
754 f1da30e6 Michael Hanselmann
    self._last_serial = serial
755 f1da30e6 Michael Hanselmann
756 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
757 f1da30e6 Michael Hanselmann
758 85f03e0d Michael Hanselmann
  @staticmethod
759 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
760 ea03467c Iustin Pop
    """Returns the job file for a given job id.
761 ea03467c Iustin Pop

762 ea03467c Iustin Pop
    @type job_id: str
763 ea03467c Iustin Pop
    @param job_id: the job identifier
764 ea03467c Iustin Pop
    @rtype: str
765 ea03467c Iustin Pop
    @return: the path to the job file
766 ea03467c Iustin Pop

767 ea03467c Iustin Pop
    """
768 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
769 f1da30e6 Michael Hanselmann
770 85f03e0d Michael Hanselmann
  @staticmethod
771 85f03e0d Michael Hanselmann
  def _GetArchivedJobPath(job_id):
772 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
773 ea03467c Iustin Pop

774 ea03467c Iustin Pop
    @type job_id: str
775 ea03467c Iustin Pop
    @param job_id: the job identifier
776 ea03467c Iustin Pop
    @rtype: str
777 ea03467c Iustin Pop
    @return: the path to the archived job file
778 ea03467c Iustin Pop

779 ea03467c Iustin Pop
    """
780 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
781 0cb94105 Michael Hanselmann
782 85f03e0d Michael Hanselmann
  @classmethod
783 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
784 ea03467c Iustin Pop
    """Extract the job id from a filename.
785 ea03467c Iustin Pop

786 ea03467c Iustin Pop
    @type name: str
787 ea03467c Iustin Pop
    @param name: the job filename
788 ea03467c Iustin Pop
    @rtype: job id or None
789 ea03467c Iustin Pop
    @return: the job id corresponding to the given filename,
790 ea03467c Iustin Pop
        or None if the filename does not represent a valid
791 ea03467c Iustin Pop
        job file
792 ea03467c Iustin Pop

793 ea03467c Iustin Pop
    """
794 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
795 fae737ac Michael Hanselmann
    if m:
796 fae737ac Michael Hanselmann
      return m.group(1)
797 fae737ac Michael Hanselmann
    else:
798 fae737ac Michael Hanselmann
      return None
799 fae737ac Michael Hanselmann
800 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
801 911a495b Iustin Pop
    """Return all known job IDs.
802 911a495b Iustin Pop

803 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
804 911a495b Iustin Pop
    included. Currently this argument is unused.
805 911a495b Iustin Pop

806 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
807 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
808 ac0930b9 Iustin Pop
    extra IDs).
809 ac0930b9 Iustin Pop

810 ea03467c Iustin Pop
    @rtype: list
811 ea03467c Iustin Pop
    @return: the list of job IDs
812 ea03467c Iustin Pop

813 911a495b Iustin Pop
    """
814 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
815 3b87986e Iustin Pop
    jlist = utils.NiceSort(jlist)
816 f0d874fe Iustin Pop
    return jlist
817 911a495b Iustin Pop
818 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
819 ea03467c Iustin Pop
    """Returns the list of current job files.
820 ea03467c Iustin Pop

821 ea03467c Iustin Pop
    @rtype: list
822 ea03467c Iustin Pop
    @return: the list of job file names
823 ea03467c Iustin Pop

824 ea03467c Iustin Pop
    """
825 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
826 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
827 f1da30e6 Michael Hanselmann
828 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
829 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
830 ea03467c Iustin Pop

831 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
832 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
833 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
834 ea03467c Iustin Pop

835 ea03467c Iustin Pop
    @param job_id: the job id
836 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
837 ea03467c Iustin Pop
    @return: either None or the job object
838 ea03467c Iustin Pop

839 ea03467c Iustin Pop
    """
840 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
841 5685c1a5 Michael Hanselmann
    if job:
842 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
843 5685c1a5 Michael Hanselmann
      return job
844 ac0930b9 Iustin Pop
845 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
846 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
847 f1da30e6 Michael Hanselmann
    try:
848 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
849 f1da30e6 Michael Hanselmann
    except IOError, err:
850 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
851 f1da30e6 Michael Hanselmann
        return None
852 f1da30e6 Michael Hanselmann
      raise
853 f1da30e6 Michael Hanselmann
    try:
854 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
855 f1da30e6 Michael Hanselmann
    finally:
856 f1da30e6 Michael Hanselmann
      fd.close()
857 f1da30e6 Michael Hanselmann
858 94ed59a5 Iustin Pop
    try:
859 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
860 94ed59a5 Iustin Pop
    except Exception, err:
861 94ed59a5 Iustin Pop
      new_path = self._GetArchivedJobPath(job_id)
862 94ed59a5 Iustin Pop
      if filepath == new_path:
863 94ed59a5 Iustin Pop
        # job already archived (future case)
864 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s", job_id)
865 94ed59a5 Iustin Pop
      else:
866 94ed59a5 Iustin Pop
        # non-archived case
867 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s, will archive.", job_id)
868 94ed59a5 Iustin Pop
        self._RenameFileUnlocked(filepath, new_path)
869 94ed59a5 Iustin Pop
      return None
870 94ed59a5 Iustin Pop
871 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
872 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
873 ac0930b9 Iustin Pop
    return job
874 f1da30e6 Michael Hanselmann
875 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
876 ea03467c Iustin Pop
    """Return a list of jobs based on their IDs.
877 ea03467c Iustin Pop

878 ea03467c Iustin Pop
    @type job_ids: list
879 ea03467c Iustin Pop
    @param job_ids: either an empty list (meaning all jobs),
880 ea03467c Iustin Pop
        or a list of job IDs
881 ea03467c Iustin Pop
    @rtype: list
882 ea03467c Iustin Pop
    @return: the list of job objects
883 ea03467c Iustin Pop

884 ea03467c Iustin Pop
    """
885 911a495b Iustin Pop
    if not job_ids:
886 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
887 f1da30e6 Michael Hanselmann
888 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
889 f1da30e6 Michael Hanselmann
890 686d7433 Iustin Pop
  @staticmethod
891 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
892 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
893 686d7433 Iustin Pop

894 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
895 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
896 686d7433 Iustin Pop

897 ea03467c Iustin Pop
    @rtype: boolean
898 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
899 ea03467c Iustin Pop

900 686d7433 Iustin Pop
    """
901 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
902 686d7433 Iustin Pop
903 3ccafd0e Iustin Pop
  @staticmethod
904 3ccafd0e Iustin Pop
  def SetDrainFlag(drain_flag):
905 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
906 3ccafd0e Iustin Pop

907 3ccafd0e Iustin Pop
    This is similar to the function L{backend.JobQueueSetDrainFlag},
908 3ccafd0e Iustin Pop
    and in the future we might merge them.
909 3ccafd0e Iustin Pop

910 ea03467c Iustin Pop
    @type drain_flag: boolean
911 ea03467c Iustin Pop
    @param drain_flag: wheter to set or unset the drain flag
912 ea03467c Iustin Pop

913 3ccafd0e Iustin Pop
    """
914 3ccafd0e Iustin Pop
    if drain_flag:
915 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
916 3ccafd0e Iustin Pop
    else:
917 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
918 3ccafd0e Iustin Pop
    return True
919 3ccafd0e Iustin Pop
920 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
921 db37da70 Michael Hanselmann
  @_RequireOpenQueue
922 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
923 85f03e0d Michael Hanselmann
    """Create and store a new job.
924 f1da30e6 Michael Hanselmann

925 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
926 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
927 c3f0a12f Iustin Pop

928 c3f0a12f Iustin Pop
    @type ops: list
929 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
930 ea03467c Iustin Pop
    @rtype: job ID
931 ea03467c Iustin Pop
    @return: the job ID of the newly created job
932 ea03467c Iustin Pop
    @raise errors.JobQueueDrainError: if the job is marked for draining
933 c3f0a12f Iustin Pop

934 c3f0a12f Iustin Pop
    """
935 686d7433 Iustin Pop
    if self._IsQueueMarkedDrain():
936 686d7433 Iustin Pop
      raise errors.JobQueueDrainError()
937 f1da30e6 Michael Hanselmann
    # Get job identifier
938 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
939 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
940 f1da30e6 Michael Hanselmann
941 f1da30e6 Michael Hanselmann
    # Write to disk
942 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
943 f1da30e6 Michael Hanselmann
944 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
945 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
946 ac0930b9 Iustin Pop
947 85f03e0d Michael Hanselmann
    # Add to worker pool
948 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
949 85f03e0d Michael Hanselmann
950 85f03e0d Michael Hanselmann
    return job.id
951 f1da30e6 Michael Hanselmann
952 db37da70 Michael Hanselmann
  @_RequireOpenQueue
953 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
954 ea03467c Iustin Pop
    """Update a job's on disk storage.
955 ea03467c Iustin Pop

956 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
957 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
958 ea03467c Iustin Pop
    nodes.
959 ea03467c Iustin Pop

960 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
961 ea03467c Iustin Pop
    @param job: the changed job
962 ea03467c Iustin Pop

963 ea03467c Iustin Pop
    """
964 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
965 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
966 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
967 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
968 ac0930b9 Iustin Pop
969 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
970 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
971 dfe57c22 Michael Hanselmann
972 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
973 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
974 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
975 5c735209 Iustin Pop
                        timeout):
976 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
977 6c5a7090 Michael Hanselmann

978 6c5a7090 Michael Hanselmann
    @type job_id: string
979 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
980 6c5a7090 Michael Hanselmann
    @type fields: list of strings
981 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
982 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
983 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
984 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
985 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
986 5c735209 Iustin Pop
    @type timeout: float
987 5c735209 Iustin Pop
    @param timeout: maximum time to wait
988 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
989 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
990 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
991 ea03467c Iustin Pop

992 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
993 ea03467c Iustin Pop
        we instead return a special value,
994 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
995 ea03467c Iustin Pop
        as such by the clients
996 6c5a7090 Michael Hanselmann

997 6c5a7090 Michael Hanselmann
    """
998 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
999 5c735209 Iustin Pop
    end_time = time.time() + timeout
1000 dfe57c22 Michael Hanselmann
    while True:
1001 5c735209 Iustin Pop
      delta_time = end_time - time.time()
1002 5c735209 Iustin Pop
      if delta_time < 0:
1003 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
1004 5c735209 Iustin Pop
1005 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1006 6c5a7090 Michael Hanselmann
      if not job:
1007 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
1008 6c5a7090 Michael Hanselmann
        break
1009 dfe57c22 Michael Hanselmann
1010 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
1011 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
1012 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
1013 dfe57c22 Michael Hanselmann
1014 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
1015 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
1016 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
1017 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
1018 dfe57c22 Michael Hanselmann
      # significantly different.
1019 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1020 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1021 dfe57c22 Michael Hanselmann
1022 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
1023 e92376d7 Iustin Pop
                        constants.JOB_STATUS_RUNNING,
1024 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
1025 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
1026 6c5a7090 Michael Hanselmann
        # no changes.
1027 dfe57c22 Michael Hanselmann
        break
1028 dfe57c22 Michael Hanselmann
1029 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
1030 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
1031 6c5a7090 Michael Hanselmann
        break
1032 6c5a7090 Michael Hanselmann
1033 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
1034 6c5a7090 Michael Hanselmann
1035 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
1036 5c735209 Iustin Pop
      job.change.wait(delta_time)
1037 dfe57c22 Michael Hanselmann
1038 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
1039 dfe57c22 Michael Hanselmann
1040 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
1041 dfe57c22 Michael Hanselmann
1042 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1043 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1044 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1045 188c5e0a Michael Hanselmann
    """Cancels a job.
1046 188c5e0a Michael Hanselmann

1047 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1048 ea03467c Iustin Pop

1049 188c5e0a Michael Hanselmann
    @type job_id: string
1050 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1051 188c5e0a Michael Hanselmann

1052 188c5e0a Michael Hanselmann
    """
1053 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1054 188c5e0a Michael Hanselmann
1055 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1056 188c5e0a Michael Hanselmann
    if not job:
1057 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1058 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1059 fbf0262f Michael Hanselmann
1060 fbf0262f Michael Hanselmann
    job_status = job.CalcStatus()
1061 188c5e0a Michael Hanselmann
1062 fbf0262f Michael Hanselmann
    if job_status not in (constants.JOB_STATUS_QUEUED,
1063 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK):
1064 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
1065 fbf0262f Michael Hanselmann
      return (False, "Job %s is no longer in the queue" % job.id)
1066 fbf0262f Michael Hanselmann
1067 fbf0262f Michael Hanselmann
    if job_status == constants.JOB_STATUS_QUEUED:
1068 fbf0262f Michael Hanselmann
      self.CancelJobUnlocked(job)
1069 fbf0262f Michael Hanselmann
      return (True, "Job %s canceled" % job.id)
1070 188c5e0a Michael Hanselmann
1071 fbf0262f Michael Hanselmann
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1072 fbf0262f Michael Hanselmann
      # The worker will notice the new status and cancel the job
1073 fbf0262f Michael Hanselmann
      try:
1074 fbf0262f Michael Hanselmann
        for op in job.ops:
1075 fbf0262f Michael Hanselmann
          op.status = constants.OP_STATUS_CANCELING
1076 fbf0262f Michael Hanselmann
      finally:
1077 fbf0262f Michael Hanselmann
        self.UpdateJobUnlocked(job)
1078 fbf0262f Michael Hanselmann
      return (True, "Job %s will be canceled" % job.id)
1079 fbf0262f Michael Hanselmann
1080 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1081 fbf0262f Michael Hanselmann
  def CancelJobUnlocked(self, job):
1082 fbf0262f Michael Hanselmann
    """Marks a job as canceled.
1083 fbf0262f Michael Hanselmann

1084 fbf0262f Michael Hanselmann
    """
1085 85f03e0d Michael Hanselmann
    try:
1086 85f03e0d Michael Hanselmann
      for op in job.ops:
1087 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1088 fbf0262f Michael Hanselmann
        op.result = "Job canceled by request"
1089 85f03e0d Michael Hanselmann
    finally:
1090 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
1091 188c5e0a Michael Hanselmann
1092 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1093 07cd723a Iustin Pop
  def _ArchiveJobUnlocked(self, job_id):
1094 c609f802 Michael Hanselmann
    """Archives a job.
1095 c609f802 Michael Hanselmann

1096 c609f802 Michael Hanselmann
    @type job_id: string
1097 c609f802 Michael Hanselmann
    @param job_id: Job ID of job to be archived.
1098 c609f802 Michael Hanselmann

1099 c609f802 Michael Hanselmann
    """
1100 07cd723a Iustin Pop
    logging.info("Archiving job %s", job_id)
1101 c609f802 Michael Hanselmann
1102 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1103 c609f802 Michael Hanselmann
    if not job:
1104 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1105 c609f802 Michael Hanselmann
      return
1106 c609f802 Michael Hanselmann
1107 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1108 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_SUCCESS,
1109 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_ERROR):
1110 85f03e0d Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
1111 c609f802 Michael Hanselmann
      return
1112 c609f802 Michael Hanselmann
1113 5685c1a5 Michael Hanselmann
    old = self._GetJobPath(job.id)
1114 5685c1a5 Michael Hanselmann
    new = self._GetArchivedJobPath(job.id)
1115 c609f802 Michael Hanselmann
1116 5685c1a5 Michael Hanselmann
    self._RenameFileUnlocked(old, new)
1117 c609f802 Michael Hanselmann
1118 5685c1a5 Michael Hanselmann
    logging.debug("Successfully archived job %s", job.id)
1119 f1da30e6 Michael Hanselmann
1120 07cd723a Iustin Pop
  @utils.LockedMethod
1121 07cd723a Iustin Pop
  @_RequireOpenQueue
1122 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1123 07cd723a Iustin Pop
    """Archives a job.
1124 07cd723a Iustin Pop

1125 ea03467c Iustin Pop
    This is just a wrapper over L{_ArchiveJobUnlocked}.
1126 ea03467c Iustin Pop

1127 07cd723a Iustin Pop
    @type job_id: string
1128 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1129 07cd723a Iustin Pop

1130 07cd723a Iustin Pop
    """
1131 07cd723a Iustin Pop
    return self._ArchiveJobUnlocked(job_id)
1132 07cd723a Iustin Pop
1133 07cd723a Iustin Pop
  @utils.LockedMethod
1134 07cd723a Iustin Pop
  @_RequireOpenQueue
1135 07cd723a Iustin Pop
  def AutoArchiveJobs(self, age):
1136 07cd723a Iustin Pop
    """Archives all jobs based on age.
1137 07cd723a Iustin Pop

1138 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1139 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1140 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1141 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1142 07cd723a Iustin Pop

1143 07cd723a Iustin Pop
    @type age: int
1144 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1145 07cd723a Iustin Pop

1146 07cd723a Iustin Pop
    """
1147 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1148 07cd723a Iustin Pop
1149 07cd723a Iustin Pop
    now = time.time()
1150 07cd723a Iustin Pop
    for jid in self._GetJobIDsUnlocked(archived=False):
1151 07cd723a Iustin Pop
      job = self._LoadJobUnlocked(jid)
1152 07cd723a Iustin Pop
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
1153 07cd723a Iustin Pop
                                  constants.OP_STATUS_ERROR,
1154 07cd723a Iustin Pop
                                  constants.OP_STATUS_CANCELED):
1155 07cd723a Iustin Pop
        continue
1156 07cd723a Iustin Pop
      if job.end_timestamp is None:
1157 07cd723a Iustin Pop
        if job.start_timestamp is None:
1158 07cd723a Iustin Pop
          job_age = job.received_timestamp
1159 07cd723a Iustin Pop
        else:
1160 07cd723a Iustin Pop
          job_age = job.start_timestamp
1161 07cd723a Iustin Pop
      else:
1162 07cd723a Iustin Pop
        job_age = job.end_timestamp
1163 07cd723a Iustin Pop
1164 07cd723a Iustin Pop
      if age == -1 or now - job_age[0] > age:
1165 07cd723a Iustin Pop
        self._ArchiveJobUnlocked(jid)
1166 07cd723a Iustin Pop
1167 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
1168 ea03467c Iustin Pop
    """Returns information about a job.
1169 ea03467c Iustin Pop

1170 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1171 ea03467c Iustin Pop
    @param job: the job which we query
1172 ea03467c Iustin Pop
    @type fields: list
1173 ea03467c Iustin Pop
    @param fields: names of fields to return
1174 ea03467c Iustin Pop
    @rtype: list
1175 ea03467c Iustin Pop
    @return: list with one element for each field
1176 ea03467c Iustin Pop
    @raise errors.OpExecError: when an invalid field
1177 ea03467c Iustin Pop
        has been passed
1178 ea03467c Iustin Pop

1179 ea03467c Iustin Pop
    """
1180 e2715f69 Michael Hanselmann
    row = []
1181 e2715f69 Michael Hanselmann
    for fname in fields:
1182 e2715f69 Michael Hanselmann
      if fname == "id":
1183 e2715f69 Michael Hanselmann
        row.append(job.id)
1184 e2715f69 Michael Hanselmann
      elif fname == "status":
1185 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
1186 af30b2fd Michael Hanselmann
      elif fname == "ops":
1187 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
1188 af30b2fd Michael Hanselmann
      elif fname == "opresult":
1189 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
1190 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
1191 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
1192 5b23c34c Iustin Pop
      elif fname == "oplog":
1193 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
1194 c56ec146 Iustin Pop
      elif fname == "opstart":
1195 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
1196 c56ec146 Iustin Pop
      elif fname == "opend":
1197 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
1198 c56ec146 Iustin Pop
      elif fname == "received_ts":
1199 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
1200 c56ec146 Iustin Pop
      elif fname == "start_ts":
1201 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
1202 c56ec146 Iustin Pop
      elif fname == "end_ts":
1203 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
1204 60dd1473 Iustin Pop
      elif fname == "summary":
1205 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
1206 e2715f69 Michael Hanselmann
      else:
1207 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1208 e2715f69 Michael Hanselmann
    return row
1209 e2715f69 Michael Hanselmann
1210 85f03e0d Michael Hanselmann
  @utils.LockedMethod
1211 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1212 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1213 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1214 e2715f69 Michael Hanselmann

1215 ea03467c Iustin Pop
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1216 ea03467c Iustin Pop
    processing for each job.
1217 ea03467c Iustin Pop

1218 ea03467c Iustin Pop
    @type job_ids: list
1219 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1220 ea03467c Iustin Pop
    @type fields: list
1221 ea03467c Iustin Pop
    @param fields: names of fields to return
1222 ea03467c Iustin Pop
    @rtype: list
1223 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1224 ea03467c Iustin Pop
        the requested fields
1225 e2715f69 Michael Hanselmann

1226 e2715f69 Michael Hanselmann
    """
1227 85f03e0d Michael Hanselmann
    jobs = []
1228 e2715f69 Michael Hanselmann
1229 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
1230 85f03e0d Michael Hanselmann
      if job is None:
1231 85f03e0d Michael Hanselmann
        jobs.append(None)
1232 85f03e0d Michael Hanselmann
      else:
1233 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1234 e2715f69 Michael Hanselmann
1235 85f03e0d Michael Hanselmann
    return jobs
1236 e2715f69 Michael Hanselmann
1237 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1238 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1239 e2715f69 Michael Hanselmann
  def Shutdown(self):
1240 e2715f69 Michael Hanselmann
    """Stops the job queue.
1241 e2715f69 Michael Hanselmann

1242 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1243 ea03467c Iustin Pop

1244 e2715f69 Michael Hanselmann
    """
1245 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1246 85f03e0d Michael Hanselmann
1247 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
1248 04ab05ce Michael Hanselmann
    self._queue_lock = None