Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 1492cca7

History | View | Annotate | Download (37.7 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 e2715f69 Michael Hanselmann
import threading
35 f1da30e6 Michael Hanselmann
import errno
36 f1da30e6 Michael Hanselmann
import re
37 f1048938 Iustin Pop
import time
38 5685c1a5 Michael Hanselmann
import weakref
39 498ae1cc Iustin Pop
40 e2715f69 Michael Hanselmann
from ganeti import constants
41 f1da30e6 Michael Hanselmann
from ganeti import serializer
42 e2715f69 Michael Hanselmann
from ganeti import workerpool
43 f1da30e6 Michael Hanselmann
from ganeti import opcodes
44 7a1ecaed Iustin Pop
from ganeti import errors
45 e2715f69 Michael Hanselmann
from ganeti import mcpu
46 7996a135 Iustin Pop
from ganeti import utils
47 04ab05ce Michael Hanselmann
from ganeti import jstore
48 c3f0a12f Iustin Pop
from ganeti import rpc
49 e2715f69 Michael Hanselmann
50 fbf0262f Michael Hanselmann
51 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
52 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53 e2715f69 Michael Hanselmann
54 498ae1cc Iustin Pop
55 9728ae5d Iustin Pop
class CancelJob(Exception):
56 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
57 fbf0262f Michael Hanselmann

58 fbf0262f Michael Hanselmann
  """
59 fbf0262f Michael Hanselmann
60 fbf0262f Michael Hanselmann
61 70552c46 Michael Hanselmann
def TimeStampNow():
62 ea03467c Iustin Pop
  """Returns the current timestamp.
63 ea03467c Iustin Pop

64 ea03467c Iustin Pop
  @rtype: tuple
65 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
66 ea03467c Iustin Pop

67 ea03467c Iustin Pop
  """
68 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
69 70552c46 Michael Hanselmann
70 70552c46 Michael Hanselmann
71 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
72 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
73 e2715f69 Michael Hanselmann

74 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
75 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
76 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
77 ea03467c Iustin Pop
  @ivar status: the current status
78 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
79 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
80 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
81 f1048938 Iustin Pop

82 e2715f69 Michael Hanselmann
  """
83 85f03e0d Michael Hanselmann
  def __init__(self, op):
84 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
85 ea03467c Iustin Pop

86 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
87 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
88 ea03467c Iustin Pop

89 ea03467c Iustin Pop
    """
90 85f03e0d Michael Hanselmann
    self.input = op
91 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
92 85f03e0d Michael Hanselmann
    self.result = None
93 85f03e0d Michael Hanselmann
    self.log = []
94 70552c46 Michael Hanselmann
    self.start_timestamp = None
95 70552c46 Michael Hanselmann
    self.end_timestamp = None
96 f1da30e6 Michael Hanselmann
97 f1da30e6 Michael Hanselmann
  @classmethod
98 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
99 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
100 ea03467c Iustin Pop

101 ea03467c Iustin Pop
    @type state: dict
102 ea03467c Iustin Pop
    @param state: the serialized state
103 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
104 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
105 ea03467c Iustin Pop

106 ea03467c Iustin Pop
    """
107 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
108 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
109 85f03e0d Michael Hanselmann
    obj.status = state["status"]
110 85f03e0d Michael Hanselmann
    obj.result = state["result"]
111 85f03e0d Michael Hanselmann
    obj.log = state["log"]
112 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
113 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
114 f1da30e6 Michael Hanselmann
    return obj
115 f1da30e6 Michael Hanselmann
116 f1da30e6 Michael Hanselmann
  def Serialize(self):
117 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
118 ea03467c Iustin Pop

119 ea03467c Iustin Pop
    @rtype: dict
120 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
121 ea03467c Iustin Pop

122 ea03467c Iustin Pop
    """
123 6c5a7090 Michael Hanselmann
    return {
124 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
125 6c5a7090 Michael Hanselmann
      "status": self.status,
126 6c5a7090 Michael Hanselmann
      "result": self.result,
127 6c5a7090 Michael Hanselmann
      "log": self.log,
128 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
129 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
130 6c5a7090 Michael Hanselmann
      }
131 f1048938 Iustin Pop
132 e2715f69 Michael Hanselmann
133 e2715f69 Michael Hanselmann
class _QueuedJob(object):
134 e2715f69 Michael Hanselmann
  """In-memory job representation.
135 e2715f69 Michael Hanselmann

136 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
137 ea03467c Iustin Pop
  be taken care of by users of this class.
138 ea03467c Iustin Pop

139 ea03467c Iustin Pop
  @type queue: L{JobQueue}
140 ea03467c Iustin Pop
  @ivar queue: the parent queue
141 ea03467c Iustin Pop
  @ivar id: the job ID
142 ea03467c Iustin Pop
  @type ops: list
143 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
144 ea03467c Iustin Pop
  @type run_op_index: int
145 ea03467c Iustin Pop
  @ivar run_op_index: the currently executing opcode, or -1 if
146 ea03467c Iustin Pop
      we didn't yet start executing
147 ea03467c Iustin Pop
  @type log_serial: int
148 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
149 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
150 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
151 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
152 ea03467c Iustin Pop
  @ivar change: a Condition variable we use for waiting for job changes
153 e2715f69 Michael Hanselmann

154 e2715f69 Michael Hanselmann
  """
155 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
156 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
157 ea03467c Iustin Pop

158 ea03467c Iustin Pop
    @type queue: L{JobQueue}
159 ea03467c Iustin Pop
    @param queue: our parent queue
160 ea03467c Iustin Pop
    @type job_id: job_id
161 ea03467c Iustin Pop
    @param job_id: our job id
162 ea03467c Iustin Pop
    @type ops: list
163 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
164 ea03467c Iustin Pop
        in _QueuedOpCodes
165 ea03467c Iustin Pop

166 ea03467c Iustin Pop
    """
167 e2715f69 Michael Hanselmann
    if not ops:
168 ea03467c Iustin Pop
      # TODO: use a better exception
169 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
170 e2715f69 Michael Hanselmann
171 85f03e0d Michael Hanselmann
    self.queue = queue
172 f1da30e6 Michael Hanselmann
    self.id = job_id
173 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
174 85f03e0d Michael Hanselmann
    self.run_op_index = -1
175 6c5a7090 Michael Hanselmann
    self.log_serial = 0
176 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
177 c56ec146 Iustin Pop
    self.start_timestamp = None
178 c56ec146 Iustin Pop
    self.end_timestamp = None
179 6c5a7090 Michael Hanselmann
180 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
181 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
182 f1da30e6 Michael Hanselmann
183 f1da30e6 Michael Hanselmann
  @classmethod
184 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
185 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
186 ea03467c Iustin Pop

187 ea03467c Iustin Pop
    @type queue: L{JobQueue}
188 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
189 ea03467c Iustin Pop
    @type state: dict
190 ea03467c Iustin Pop
    @param state: the serialized state
191 ea03467c Iustin Pop
    @rtype: _JobQueue
192 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
193 ea03467c Iustin Pop

194 ea03467c Iustin Pop
    """
195 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
196 85f03e0d Michael Hanselmann
    obj.queue = queue
197 85f03e0d Michael Hanselmann
    obj.id = state["id"]
198 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
199 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
200 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
201 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
202 6c5a7090 Michael Hanselmann
203 6c5a7090 Michael Hanselmann
    obj.ops = []
204 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
205 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
206 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
207 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
208 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
209 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
210 6c5a7090 Michael Hanselmann
211 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
212 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
213 6c5a7090 Michael Hanselmann
214 f1da30e6 Michael Hanselmann
    return obj
215 f1da30e6 Michael Hanselmann
216 f1da30e6 Michael Hanselmann
  def Serialize(self):
217 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
218 ea03467c Iustin Pop

219 ea03467c Iustin Pop
    @rtype: dict
220 ea03467c Iustin Pop
    @return: the serialized state
221 ea03467c Iustin Pop

222 ea03467c Iustin Pop
    """
223 f1da30e6 Michael Hanselmann
    return {
224 f1da30e6 Michael Hanselmann
      "id": self.id,
225 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
226 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
227 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
228 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
229 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
230 f1da30e6 Michael Hanselmann
      }
231 f1da30e6 Michael Hanselmann
232 85f03e0d Michael Hanselmann
  def CalcStatus(self):
233 ea03467c Iustin Pop
    """Compute the status of this job.
234 ea03467c Iustin Pop

235 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
236 ea03467c Iustin Pop
    based on their status, computes the job status.
237 ea03467c Iustin Pop

238 ea03467c Iustin Pop
    The algorithm is:
239 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
240 ea03467c Iustin Pop
        status will be the same
241 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
242 ea03467c Iustin Pop
          - waitlock
243 fbf0262f Michael Hanselmann
          - canceling
244 ea03467c Iustin Pop
          - running
245 ea03467c Iustin Pop

246 ea03467c Iustin Pop
        will determine the job status
247 ea03467c Iustin Pop

248 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
249 ea03467c Iustin Pop
        and the job status will be the same
250 ea03467c Iustin Pop

251 ea03467c Iustin Pop
    @return: the job status
252 ea03467c Iustin Pop

253 ea03467c Iustin Pop
    """
254 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
255 e2715f69 Michael Hanselmann
256 e2715f69 Michael Hanselmann
    all_success = True
257 85f03e0d Michael Hanselmann
    for op in self.ops:
258 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
259 e2715f69 Michael Hanselmann
        continue
260 e2715f69 Michael Hanselmann
261 e2715f69 Michael Hanselmann
      all_success = False
262 e2715f69 Michael Hanselmann
263 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
264 e2715f69 Michael Hanselmann
        pass
265 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
266 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
267 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
268 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
269 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
270 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
271 fbf0262f Michael Hanselmann
        break
272 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
273 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
274 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
275 f1da30e6 Michael Hanselmann
        break
276 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
277 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
278 4cb1d919 Michael Hanselmann
        break
279 e2715f69 Michael Hanselmann
280 e2715f69 Michael Hanselmann
    if all_success:
281 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
282 e2715f69 Michael Hanselmann
283 e2715f69 Michael Hanselmann
    return status
284 e2715f69 Michael Hanselmann
285 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
286 ea03467c Iustin Pop
    """Selectively returns the log entries.
287 ea03467c Iustin Pop

288 ea03467c Iustin Pop
    @type newer_than: None or int
289 ea03467c Iustin Pop
    @param newer_than: if this is None, return all log enties,
290 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
291 ea03467c Iustin Pop
        than this value
292 ea03467c Iustin Pop
    @rtype: list
293 ea03467c Iustin Pop
    @return: the list of the log entries selected
294 ea03467c Iustin Pop

295 ea03467c Iustin Pop
    """
296 6c5a7090 Michael Hanselmann
    if newer_than is None:
297 6c5a7090 Michael Hanselmann
      serial = -1
298 6c5a7090 Michael Hanselmann
    else:
299 6c5a7090 Michael Hanselmann
      serial = newer_than
300 6c5a7090 Michael Hanselmann
301 6c5a7090 Michael Hanselmann
    entries = []
302 6c5a7090 Michael Hanselmann
    for op in self.ops:
303 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
304 6c5a7090 Michael Hanselmann
305 6c5a7090 Michael Hanselmann
    return entries
306 6c5a7090 Michael Hanselmann
307 f1048938 Iustin Pop
308 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
309 ea03467c Iustin Pop
  """The actual job workers.
310 ea03467c Iustin Pop

311 ea03467c Iustin Pop
  """
312 e92376d7 Iustin Pop
  def _NotifyStart(self):
313 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
314 e92376d7 Iustin Pop

315 e92376d7 Iustin Pop
    This is called from the mcpu code as a notifier function, when the
316 e92376d7 Iustin Pop
    LU is finally about to start the Exec() method. Of course, to have
317 e92376d7 Iustin Pop
    end-user visible results, the opcode must be initially (before
318 e92376d7 Iustin Pop
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
319 e92376d7 Iustin Pop

320 e92376d7 Iustin Pop
    """
321 e92376d7 Iustin Pop
    assert self.queue, "Queue attribute is missing"
322 e92376d7 Iustin Pop
    assert self.opcode, "Opcode attribute is missing"
323 e92376d7 Iustin Pop
324 e92376d7 Iustin Pop
    self.queue.acquire()
325 e92376d7 Iustin Pop
    try:
326 fbf0262f Michael Hanselmann
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
327 fbf0262f Michael Hanselmann
                                    constants.OP_STATUS_CANCELING)
328 fbf0262f Michael Hanselmann
329 fbf0262f Michael Hanselmann
      # Cancel here if we were asked to
330 fbf0262f Michael Hanselmann
      if self.opcode.status == constants.OP_STATUS_CANCELING:
331 fbf0262f Michael Hanselmann
        raise CancelJob()
332 fbf0262f Michael Hanselmann
333 e92376d7 Iustin Pop
      self.opcode.status = constants.OP_STATUS_RUNNING
334 e92376d7 Iustin Pop
    finally:
335 e92376d7 Iustin Pop
      self.queue.release()
336 e92376d7 Iustin Pop
337 85f03e0d Michael Hanselmann
  def RunTask(self, job):
338 e2715f69 Michael Hanselmann
    """Job executor.
339 e2715f69 Michael Hanselmann

340 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
341 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
342 e2715f69 Michael Hanselmann

343 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
344 ea03467c Iustin Pop
    @param job: the job to be processed
345 ea03467c Iustin Pop

346 e2715f69 Michael Hanselmann
    """
347 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
348 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
349 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
350 e92376d7 Iustin Pop
    self.queue = queue = job.queue
351 e2715f69 Michael Hanselmann
    try:
352 85f03e0d Michael Hanselmann
      try:
353 85f03e0d Michael Hanselmann
        count = len(job.ops)
354 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
355 85f03e0d Michael Hanselmann
          try:
356 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
357 85f03e0d Michael Hanselmann
358 85f03e0d Michael Hanselmann
            queue.acquire()
359 85f03e0d Michael Hanselmann
            try:
360 fbf0262f Michael Hanselmann
              assert op.status == constants.OP_STATUS_QUEUED
361 85f03e0d Michael Hanselmann
              job.run_op_index = idx
362 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
363 85f03e0d Michael Hanselmann
              op.result = None
364 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
365 c56ec146 Iustin Pop
              if idx == 0: # first opcode
366 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
367 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
368 85f03e0d Michael Hanselmann
369 38206f3c Iustin Pop
              input_opcode = op.input
370 85f03e0d Michael Hanselmann
            finally:
371 85f03e0d Michael Hanselmann
              queue.release()
372 85f03e0d Michael Hanselmann
373 dfe57c22 Michael Hanselmann
            def _Log(*args):
374 6c5a7090 Michael Hanselmann
              """Append a log entry.
375 6c5a7090 Michael Hanselmann

376 6c5a7090 Michael Hanselmann
              """
377 6c5a7090 Michael Hanselmann
              assert len(args) < 3
378 6c5a7090 Michael Hanselmann
379 6c5a7090 Michael Hanselmann
              if len(args) == 1:
380 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
381 6c5a7090 Michael Hanselmann
                log_msg = args[0]
382 6c5a7090 Michael Hanselmann
              else:
383 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
384 6c5a7090 Michael Hanselmann
385 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
386 6c5a7090 Michael Hanselmann
              # precision.
387 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
388 dfe57c22 Michael Hanselmann
389 6c5a7090 Michael Hanselmann
              queue.acquire()
390 dfe57c22 Michael Hanselmann
              try:
391 6c5a7090 Michael Hanselmann
                job.log_serial += 1
392 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
393 6c5a7090 Michael Hanselmann
394 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
395 dfe57c22 Michael Hanselmann
              finally:
396 6c5a7090 Michael Hanselmann
                queue.release()
397 dfe57c22 Michael Hanselmann
398 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
399 e92376d7 Iustin Pop
            self.opcode = op
400 e92376d7 Iustin Pop
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
401 85f03e0d Michael Hanselmann
402 85f03e0d Michael Hanselmann
            queue.acquire()
403 85f03e0d Michael Hanselmann
            try:
404 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
405 85f03e0d Michael Hanselmann
              op.result = result
406 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
407 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
408 85f03e0d Michael Hanselmann
            finally:
409 85f03e0d Michael Hanselmann
              queue.release()
410 85f03e0d Michael Hanselmann
411 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
412 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
413 fbf0262f Michael Hanselmann
          except CancelJob:
414 fbf0262f Michael Hanselmann
            # Will be handled further up
415 fbf0262f Michael Hanselmann
            raise
416 85f03e0d Michael Hanselmann
          except Exception, err:
417 85f03e0d Michael Hanselmann
            queue.acquire()
418 85f03e0d Michael Hanselmann
            try:
419 85f03e0d Michael Hanselmann
              try:
420 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
421 85f03e0d Michael Hanselmann
                op.result = str(err)
422 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
423 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
424 85f03e0d Michael Hanselmann
              finally:
425 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
426 85f03e0d Michael Hanselmann
            finally:
427 85f03e0d Michael Hanselmann
              queue.release()
428 85f03e0d Michael Hanselmann
            raise
429 85f03e0d Michael Hanselmann
430 fbf0262f Michael Hanselmann
      except CancelJob:
431 fbf0262f Michael Hanselmann
        queue.acquire()
432 fbf0262f Michael Hanselmann
        try:
433 fbf0262f Michael Hanselmann
          queue.CancelJobUnlocked(job)
434 fbf0262f Michael Hanselmann
        finally:
435 fbf0262f Michael Hanselmann
          queue.release()
436 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
437 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
438 85f03e0d Michael Hanselmann
      except:
439 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
440 e2715f69 Michael Hanselmann
    finally:
441 85f03e0d Michael Hanselmann
      queue.acquire()
442 85f03e0d Michael Hanselmann
      try:
443 65548ed5 Michael Hanselmann
        try:
444 65548ed5 Michael Hanselmann
          job.run_op_idx = -1
445 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
446 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
447 65548ed5 Michael Hanselmann
        finally:
448 65548ed5 Michael Hanselmann
          job_id = job.id
449 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
450 85f03e0d Michael Hanselmann
      finally:
451 85f03e0d Michael Hanselmann
        queue.release()
452 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
453 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
454 e2715f69 Michael Hanselmann
455 e2715f69 Michael Hanselmann
456 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
457 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
458 ea03467c Iustin Pop

459 ea03467c Iustin Pop
  """
460 5bdce580 Michael Hanselmann
  def __init__(self, queue):
461 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
462 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
463 5bdce580 Michael Hanselmann
    self.queue = queue
464 e2715f69 Michael Hanselmann
465 e2715f69 Michael Hanselmann
466 85f03e0d Michael Hanselmann
class JobQueue(object):
467 ea03467c Iustin Pop
  """Quue used to manaage the jobs.
468 ea03467c Iustin Pop

469 ea03467c Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
470 ea03467c Iustin Pop

471 ea03467c Iustin Pop
  """
472 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
473 f1da30e6 Michael Hanselmann
474 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
475 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
476 db37da70 Michael Hanselmann

477 ea03467c Iustin Pop
    This function should be used for all 'public' functions. That is,
478 ea03467c Iustin Pop
    functions usually called from other classes.
479 db37da70 Michael Hanselmann

480 ea03467c Iustin Pop
    @warning: Use this decorator only after utils.LockedMethod!
481 db37da70 Michael Hanselmann

482 ea03467c Iustin Pop
    Example::
483 db37da70 Michael Hanselmann
      @utils.LockedMethod
484 db37da70 Michael Hanselmann
      @_RequireOpenQueue
485 db37da70 Michael Hanselmann
      def Example(self):
486 db37da70 Michael Hanselmann
        pass
487 db37da70 Michael Hanselmann

488 db37da70 Michael Hanselmann
    """
489 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
490 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
491 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
492 db37da70 Michael Hanselmann
    return wrapper
493 db37da70 Michael Hanselmann
494 85f03e0d Michael Hanselmann
  def __init__(self, context):
495 ea03467c Iustin Pop
    """Constructor for JobQueue.
496 ea03467c Iustin Pop

497 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
498 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
499 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
500 ea03467c Iustin Pop
    running).
501 ea03467c Iustin Pop

502 ea03467c Iustin Pop
    @type context: GanetiContext
503 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
504 ea03467c Iustin Pop
        data and other ganeti objects
505 ea03467c Iustin Pop

506 ea03467c Iustin Pop
    """
507 5bdce580 Michael Hanselmann
    self.context = context
508 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
509 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
510 f1da30e6 Michael Hanselmann
511 85f03e0d Michael Hanselmann
    # Locking
512 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
513 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
514 85f03e0d Michael Hanselmann
    self.release = self._lock.release
515 85f03e0d Michael Hanselmann
516 04ab05ce Michael Hanselmann
    # Initialize
517 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
518 f1da30e6 Michael Hanselmann
519 04ab05ce Michael Hanselmann
    # Read serial file
520 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
521 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
522 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
523 c4beba1c Iustin Pop
524 23752136 Michael Hanselmann
    # Get initial list of nodes
525 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
526 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
527 59303563 Iustin Pop
                       if n.master_candidate)
528 8e00939c Michael Hanselmann
529 8e00939c Michael Hanselmann
    # Remove master node
530 8e00939c Michael Hanselmann
    try:
531 99aabbed Iustin Pop
      del self._nodes[self._my_hostname]
532 33987705 Iustin Pop
    except KeyError:
533 8e00939c Michael Hanselmann
      pass
534 23752136 Michael Hanselmann
535 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
536 23752136 Michael Hanselmann
537 85f03e0d Michael Hanselmann
    # Setup worker pool
538 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
539 85f03e0d Michael Hanselmann
    try:
540 16714921 Michael Hanselmann
      # We need to lock here because WorkerPool.AddTask() may start a job while
541 16714921 Michael Hanselmann
      # we're still doing our work.
542 16714921 Michael Hanselmann
      self.acquire()
543 16714921 Michael Hanselmann
      try:
544 711b5124 Michael Hanselmann
        logging.info("Inspecting job queue")
545 711b5124 Michael Hanselmann
546 711b5124 Michael Hanselmann
        all_job_ids = self._GetJobIDsUnlocked()
547 b7cb9024 Michael Hanselmann
        jobs_count = len(all_job_ids)
548 711b5124 Michael Hanselmann
        lastinfo = time.time()
549 711b5124 Michael Hanselmann
        for idx, job_id in enumerate(all_job_ids):
550 711b5124 Michael Hanselmann
          # Give an update every 1000 jobs or 10 seconds
551 b7cb9024 Michael Hanselmann
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
552 b7cb9024 Michael Hanselmann
              idx == (jobs_count - 1)):
553 711b5124 Michael Hanselmann
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
554 b7cb9024 Michael Hanselmann
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
555 711b5124 Michael Hanselmann
            lastinfo = time.time()
556 711b5124 Michael Hanselmann
557 711b5124 Michael Hanselmann
          job = self._LoadJobUnlocked(job_id)
558 711b5124 Michael Hanselmann
559 16714921 Michael Hanselmann
          # a failure in loading the job can cause 'None' to be returned
560 16714921 Michael Hanselmann
          if job is None:
561 16714921 Michael Hanselmann
            continue
562 94ed59a5 Iustin Pop
563 16714921 Michael Hanselmann
          status = job.CalcStatus()
564 85f03e0d Michael Hanselmann
565 16714921 Michael Hanselmann
          if status in (constants.JOB_STATUS_QUEUED, ):
566 16714921 Michael Hanselmann
            self._wpool.AddTask(job)
567 85f03e0d Michael Hanselmann
568 16714921 Michael Hanselmann
          elif status in (constants.JOB_STATUS_RUNNING,
569 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK,
570 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_CANCELING):
571 16714921 Michael Hanselmann
            logging.warning("Unfinished job %s found: %s", job.id, job)
572 16714921 Michael Hanselmann
            try:
573 16714921 Michael Hanselmann
              for op in job.ops:
574 16714921 Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
575 16714921 Michael Hanselmann
                op.result = "Unclean master daemon shutdown"
576 16714921 Michael Hanselmann
            finally:
577 16714921 Michael Hanselmann
              self.UpdateJobUnlocked(job)
578 711b5124 Michael Hanselmann
579 711b5124 Michael Hanselmann
        logging.info("Job queue inspection finished")
580 16714921 Michael Hanselmann
      finally:
581 16714921 Michael Hanselmann
        self.release()
582 16714921 Michael Hanselmann
    except:
583 16714921 Michael Hanselmann
      self._wpool.TerminateWorkers()
584 16714921 Michael Hanselmann
      raise
585 85f03e0d Michael Hanselmann
586 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
587 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
588 99aabbed Iustin Pop
  def AddNode(self, node):
589 99aabbed Iustin Pop
    """Register a new node with the queue.
590 99aabbed Iustin Pop

591 99aabbed Iustin Pop
    @type node: L{objects.Node}
592 99aabbed Iustin Pop
    @param node: the node object to be added
593 99aabbed Iustin Pop

594 99aabbed Iustin Pop
    """
595 99aabbed Iustin Pop
    node_name = node.name
596 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
597 23752136 Michael Hanselmann
598 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
599 a3811745 Michael Hanselmann
    rpc.RpcRunner.call_jobqueue_purge(node_name)
600 23752136 Michael Hanselmann
601 59303563 Iustin Pop
    if not node.master_candidate:
602 59303563 Iustin Pop
      # remove if existing, ignoring errors
603 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
604 59303563 Iustin Pop
      # and skip the replication of the job ids
605 59303563 Iustin Pop
      return
606 59303563 Iustin Pop
607 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
608 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
609 23752136 Michael Hanselmann
610 d2e03a33 Michael Hanselmann
    # Upload current serial file
611 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
612 d2e03a33 Michael Hanselmann
613 d2e03a33 Michael Hanselmann
    for file_name in files:
614 9f774ee8 Michael Hanselmann
      # Read file content
615 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
616 9f774ee8 Michael Hanselmann
      try:
617 9f774ee8 Michael Hanselmann
        content = fd.read()
618 9f774ee8 Michael Hanselmann
      finally:
619 9f774ee8 Michael Hanselmann
        fd.close()
620 9f774ee8 Michael Hanselmann
621 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
622 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
623 a3811745 Michael Hanselmann
                                                  file_name, content)
624 d2e03a33 Michael Hanselmann
      if not result[node_name]:
625 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
626 d2e03a33 Michael Hanselmann
627 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
628 d2e03a33 Michael Hanselmann
629 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
630 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
631 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
632 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
633 ea03467c Iustin Pop

634 ea03467c Iustin Pop
    @type node_name: str
635 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
636 ea03467c Iustin Pop

637 ea03467c Iustin Pop
    """
638 23752136 Michael Hanselmann
    try:
639 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
640 99aabbed Iustin Pop
      del self._nodes[node_name]
641 d2e03a33 Michael Hanselmann
    except KeyError:
642 23752136 Michael Hanselmann
      pass
643 23752136 Michael Hanselmann
644 e74798c1 Michael Hanselmann
  def _CheckRpcResult(self, result, nodes, failmsg):
645 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
646 ea03467c Iustin Pop

647 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
648 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
649 ea03467c Iustin Pop
    log the case when more than half of the nodes failes.
650 ea03467c Iustin Pop

651 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
652 ea03467c Iustin Pop
    @type nodes: list
653 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
654 ea03467c Iustin Pop
    @type failmsg: str
655 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
656 ea03467c Iustin Pop

657 ea03467c Iustin Pop
    """
658 e74798c1 Michael Hanselmann
    failed = []
659 e74798c1 Michael Hanselmann
    success = []
660 e74798c1 Michael Hanselmann
661 e74798c1 Michael Hanselmann
    for node in nodes:
662 e74798c1 Michael Hanselmann
      if result[node]:
663 e74798c1 Michael Hanselmann
        success.append(node)
664 e74798c1 Michael Hanselmann
      else:
665 e74798c1 Michael Hanselmann
        failed.append(node)
666 e74798c1 Michael Hanselmann
667 e74798c1 Michael Hanselmann
    if failed:
668 e74798c1 Michael Hanselmann
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
669 e74798c1 Michael Hanselmann
670 e74798c1 Michael Hanselmann
    # +1 for the master node
671 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
672 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
673 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
674 e74798c1 Michael Hanselmann
675 99aabbed Iustin Pop
  def _GetNodeIp(self):
676 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
677 99aabbed Iustin Pop

678 ea03467c Iustin Pop
    @rtype: (list, list)
679 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
680 ea03467c Iustin Pop
        names and the second one with the node addresses
681 ea03467c Iustin Pop

682 99aabbed Iustin Pop
    """
683 99aabbed Iustin Pop
    name_list = self._nodes.keys()
684 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
685 99aabbed Iustin Pop
    return name_list, addr_list
686 99aabbed Iustin Pop
687 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
688 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
689 8e00939c Michael Hanselmann

690 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
691 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
692 ea03467c Iustin Pop

693 ea03467c Iustin Pop
    @type file_name: str
694 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
695 ea03467c Iustin Pop
    @type data: str
696 ea03467c Iustin Pop
    @param data: the new contents of the file
697 ea03467c Iustin Pop

698 8e00939c Michael Hanselmann
    """
699 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
700 8e00939c Michael Hanselmann
701 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
702 a3811745 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
703 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
704 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
705 23752136 Michael Hanselmann
706 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
707 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
708 ea03467c Iustin Pop

709 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
710 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
711 ea03467c Iustin Pop

712 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
713 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
714 ea03467c Iustin Pop

715 ea03467c Iustin Pop
    """
716 dd875d32 Michael Hanselmann
    # Rename them locally
717 d7fd1f28 Michael Hanselmann
    for old, new in rename:
718 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
719 abc1f2ce Michael Hanselmann
720 dd875d32 Michael Hanselmann
    # ... and on all nodes
721 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
722 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
723 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
724 abc1f2ce Michael Hanselmann
725 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
726 ea03467c Iustin Pop
    """Convert a job ID to string format.
727 ea03467c Iustin Pop

728 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
729 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
730 ea03467c Iustin Pop
    abstract this change.
731 ea03467c Iustin Pop

732 ea03467c Iustin Pop
    @type job_id: int or long
733 ea03467c Iustin Pop
    @param job_id: the numeric job id
734 ea03467c Iustin Pop
    @rtype: str
735 ea03467c Iustin Pop
    @return: the formatted job id
736 ea03467c Iustin Pop

737 ea03467c Iustin Pop
    """
738 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
739 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
740 85f03e0d Michael Hanselmann
    if job_id < 0:
741 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
742 85f03e0d Michael Hanselmann
743 85f03e0d Michael Hanselmann
    return str(job_id)
744 85f03e0d Michael Hanselmann
745 58b22b6e Michael Hanselmann
  @classmethod
746 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
747 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
748 58b22b6e Michael Hanselmann

749 58b22b6e Michael Hanselmann
    @type job_id: str
750 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
751 58b22b6e Michael Hanselmann
    @rtype: str
752 58b22b6e Michael Hanselmann
    @return: Directory name
753 58b22b6e Michael Hanselmann

754 58b22b6e Michael Hanselmann
    """
755 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
756 58b22b6e Michael Hanselmann
757 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
758 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
759 f1da30e6 Michael Hanselmann

760 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
761 f1da30e6 Michael Hanselmann

762 ea03467c Iustin Pop
    @rtype: str
763 ea03467c Iustin Pop
    @return: a string representing the job identifier.
764 f1da30e6 Michael Hanselmann

765 f1da30e6 Michael Hanselmann
    """
766 f1da30e6 Michael Hanselmann
    # New number
767 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
768 f1da30e6 Michael Hanselmann
769 f1da30e6 Michael Hanselmann
    # Write to file
770 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
771 23752136 Michael Hanselmann
                                        "%s\n" % serial)
772 f1da30e6 Michael Hanselmann
773 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
774 f1da30e6 Michael Hanselmann
    self._last_serial = serial
775 f1da30e6 Michael Hanselmann
776 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
777 f1da30e6 Michael Hanselmann
778 85f03e0d Michael Hanselmann
  @staticmethod
779 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
780 ea03467c Iustin Pop
    """Returns the job file for a given job id.
781 ea03467c Iustin Pop

782 ea03467c Iustin Pop
    @type job_id: str
783 ea03467c Iustin Pop
    @param job_id: the job identifier
784 ea03467c Iustin Pop
    @rtype: str
785 ea03467c Iustin Pop
    @return: the path to the job file
786 ea03467c Iustin Pop

787 ea03467c Iustin Pop
    """
788 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
789 f1da30e6 Michael Hanselmann
790 58b22b6e Michael Hanselmann
  @classmethod
791 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
792 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
793 ea03467c Iustin Pop

794 ea03467c Iustin Pop
    @type job_id: str
795 ea03467c Iustin Pop
    @param job_id: the job identifier
796 ea03467c Iustin Pop
    @rtype: str
797 ea03467c Iustin Pop
    @return: the path to the archived job file
798 ea03467c Iustin Pop

799 ea03467c Iustin Pop
    """
800 58b22b6e Michael Hanselmann
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
801 58b22b6e Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
802 0cb94105 Michael Hanselmann
803 85f03e0d Michael Hanselmann
  @classmethod
804 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
805 ea03467c Iustin Pop
    """Extract the job id from a filename.
806 ea03467c Iustin Pop

807 ea03467c Iustin Pop
    @type name: str
808 ea03467c Iustin Pop
    @param name: the job filename
809 ea03467c Iustin Pop
    @rtype: job id or None
810 ea03467c Iustin Pop
    @return: the job id corresponding to the given filename,
811 ea03467c Iustin Pop
        or None if the filename does not represent a valid
812 ea03467c Iustin Pop
        job file
813 ea03467c Iustin Pop

814 ea03467c Iustin Pop
    """
815 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
816 fae737ac Michael Hanselmann
    if m:
817 fae737ac Michael Hanselmann
      return m.group(1)
818 fae737ac Michael Hanselmann
    else:
819 fae737ac Michael Hanselmann
      return None
820 fae737ac Michael Hanselmann
821 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
822 911a495b Iustin Pop
    """Return all known job IDs.
823 911a495b Iustin Pop

824 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
825 911a495b Iustin Pop
    included. Currently this argument is unused.
826 911a495b Iustin Pop

827 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
828 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
829 ac0930b9 Iustin Pop
    extra IDs).
830 ac0930b9 Iustin Pop

831 ea03467c Iustin Pop
    @rtype: list
832 ea03467c Iustin Pop
    @return: the list of job IDs
833 ea03467c Iustin Pop

834 911a495b Iustin Pop
    """
835 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
836 3b87986e Iustin Pop
    jlist = utils.NiceSort(jlist)
837 f0d874fe Iustin Pop
    return jlist
838 911a495b Iustin Pop
839 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
840 ea03467c Iustin Pop
    """Returns the list of current job files.
841 ea03467c Iustin Pop

842 ea03467c Iustin Pop
    @rtype: list
843 ea03467c Iustin Pop
    @return: the list of job file names
844 ea03467c Iustin Pop

845 ea03467c Iustin Pop
    """
846 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
847 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
848 f1da30e6 Michael Hanselmann
849 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
850 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
851 ea03467c Iustin Pop

852 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
853 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
854 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
855 ea03467c Iustin Pop

856 ea03467c Iustin Pop
    @param job_id: the job id
857 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
858 ea03467c Iustin Pop
    @return: either None or the job object
859 ea03467c Iustin Pop

860 ea03467c Iustin Pop
    """
861 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
862 5685c1a5 Michael Hanselmann
    if job:
863 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
864 5685c1a5 Michael Hanselmann
      return job
865 ac0930b9 Iustin Pop
866 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
867 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
868 f1da30e6 Michael Hanselmann
    try:
869 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
870 f1da30e6 Michael Hanselmann
    except IOError, err:
871 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
872 f1da30e6 Michael Hanselmann
        return None
873 f1da30e6 Michael Hanselmann
      raise
874 f1da30e6 Michael Hanselmann
    try:
875 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
876 f1da30e6 Michael Hanselmann
    finally:
877 f1da30e6 Michael Hanselmann
      fd.close()
878 f1da30e6 Michael Hanselmann
879 94ed59a5 Iustin Pop
    try:
880 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
881 94ed59a5 Iustin Pop
    except Exception, err:
882 94ed59a5 Iustin Pop
      new_path = self._GetArchivedJobPath(job_id)
883 94ed59a5 Iustin Pop
      if filepath == new_path:
884 94ed59a5 Iustin Pop
        # job already archived (future case)
885 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s", job_id)
886 94ed59a5 Iustin Pop
      else:
887 94ed59a5 Iustin Pop
        # non-archived case
888 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s, will archive.", job_id)
889 d7fd1f28 Michael Hanselmann
        self._RenameFilesUnlocked([(filepath, new_path)])
890 94ed59a5 Iustin Pop
      return None
891 94ed59a5 Iustin Pop
892 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
893 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
894 ac0930b9 Iustin Pop
    return job
895 f1da30e6 Michael Hanselmann
896 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
897 ea03467c Iustin Pop
    """Return a list of jobs based on their IDs.
898 ea03467c Iustin Pop

899 ea03467c Iustin Pop
    @type job_ids: list
900 ea03467c Iustin Pop
    @param job_ids: either an empty list (meaning all jobs),
901 ea03467c Iustin Pop
        or a list of job IDs
902 ea03467c Iustin Pop
    @rtype: list
903 ea03467c Iustin Pop
    @return: the list of job objects
904 ea03467c Iustin Pop

905 ea03467c Iustin Pop
    """
906 911a495b Iustin Pop
    if not job_ids:
907 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
908 f1da30e6 Michael Hanselmann
909 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
910 f1da30e6 Michael Hanselmann
911 686d7433 Iustin Pop
  @staticmethod
912 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
913 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
914 686d7433 Iustin Pop

915 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
916 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
917 686d7433 Iustin Pop

918 ea03467c Iustin Pop
    @rtype: boolean
919 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
920 ea03467c Iustin Pop

921 686d7433 Iustin Pop
    """
922 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
923 686d7433 Iustin Pop
924 3ccafd0e Iustin Pop
  @staticmethod
925 3ccafd0e Iustin Pop
  def SetDrainFlag(drain_flag):
926 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
927 3ccafd0e Iustin Pop

928 3ccafd0e Iustin Pop
    This is similar to the function L{backend.JobQueueSetDrainFlag},
929 3ccafd0e Iustin Pop
    and in the future we might merge them.
930 3ccafd0e Iustin Pop

931 ea03467c Iustin Pop
    @type drain_flag: boolean
932 ea03467c Iustin Pop
    @param drain_flag: wheter to set or unset the drain flag
933 ea03467c Iustin Pop

934 3ccafd0e Iustin Pop
    """
935 3ccafd0e Iustin Pop
    if drain_flag:
936 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
937 3ccafd0e Iustin Pop
    else:
938 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
939 3ccafd0e Iustin Pop
    return True
940 3ccafd0e Iustin Pop
941 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
942 db37da70 Michael Hanselmann
  @_RequireOpenQueue
943 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
944 85f03e0d Michael Hanselmann
    """Create and store a new job.
945 f1da30e6 Michael Hanselmann

946 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
947 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
948 c3f0a12f Iustin Pop

949 c3f0a12f Iustin Pop
    @type ops: list
950 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
951 ea03467c Iustin Pop
    @rtype: job ID
952 ea03467c Iustin Pop
    @return: the job ID of the newly created job
953 ea03467c Iustin Pop
    @raise errors.JobQueueDrainError: if the job is marked for draining
954 c3f0a12f Iustin Pop

955 c3f0a12f Iustin Pop
    """
956 686d7433 Iustin Pop
    if self._IsQueueMarkedDrain():
957 686d7433 Iustin Pop
      raise errors.JobQueueDrainError()
958 f87b405e Michael Hanselmann
959 f87b405e Michael Hanselmann
    # Check job queue size
960 f87b405e Michael Hanselmann
    size = len(self._ListJobFiles())
961 f87b405e Michael Hanselmann
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
962 f87b405e Michael Hanselmann
      # TODO: Autoarchive jobs. Make sure it's not done on every job
963 f87b405e Michael Hanselmann
      # submission, though.
964 f87b405e Michael Hanselmann
      #size = ...
965 f87b405e Michael Hanselmann
      pass
966 f87b405e Michael Hanselmann
967 f87b405e Michael Hanselmann
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
968 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
969 f87b405e Michael Hanselmann
970 f1da30e6 Michael Hanselmann
    # Get job identifier
971 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
972 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
973 f1da30e6 Michael Hanselmann
974 f1da30e6 Michael Hanselmann
    # Write to disk
975 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
976 f1da30e6 Michael Hanselmann
977 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
978 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
979 ac0930b9 Iustin Pop
980 85f03e0d Michael Hanselmann
    # Add to worker pool
981 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
982 85f03e0d Michael Hanselmann
983 85f03e0d Michael Hanselmann
    return job.id
984 f1da30e6 Michael Hanselmann
985 db37da70 Michael Hanselmann
  @_RequireOpenQueue
986 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
987 ea03467c Iustin Pop
    """Update a job's on disk storage.
988 ea03467c Iustin Pop

989 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
990 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
991 ea03467c Iustin Pop
    nodes.
992 ea03467c Iustin Pop

993 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
994 ea03467c Iustin Pop
    @param job: the changed job
995 ea03467c Iustin Pop

996 ea03467c Iustin Pop
    """
997 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
998 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
999 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
1000 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
1001 ac0930b9 Iustin Pop
1002 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
1003 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
1004 dfe57c22 Michael Hanselmann
1005 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
1006 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
1007 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1008 5c735209 Iustin Pop
                        timeout):
1009 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
1010 6c5a7090 Michael Hanselmann

1011 6c5a7090 Michael Hanselmann
    @type job_id: string
1012 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
1013 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1014 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1015 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1016 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1017 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1018 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1019 5c735209 Iustin Pop
    @type timeout: float
1020 5c735209 Iustin Pop
    @param timeout: maximum time to wait
1021 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1022 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1023 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1024 ea03467c Iustin Pop

1025 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1026 ea03467c Iustin Pop
        we instead return a special value,
1027 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1028 ea03467c Iustin Pop
        as such by the clients
1029 6c5a7090 Michael Hanselmann

1030 6c5a7090 Michael Hanselmann
    """
1031 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
1032 5c735209 Iustin Pop
    end_time = time.time() + timeout
1033 dfe57c22 Michael Hanselmann
    while True:
1034 5c735209 Iustin Pop
      delta_time = end_time - time.time()
1035 5c735209 Iustin Pop
      if delta_time < 0:
1036 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
1037 5c735209 Iustin Pop
1038 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1039 6c5a7090 Michael Hanselmann
      if not job:
1040 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
1041 6c5a7090 Michael Hanselmann
        break
1042 dfe57c22 Michael Hanselmann
1043 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
1044 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
1045 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
1046 dfe57c22 Michael Hanselmann
1047 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
1048 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
1049 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
1050 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
1051 dfe57c22 Michael Hanselmann
      # significantly different.
1052 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1053 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1054 dfe57c22 Michael Hanselmann
1055 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
1056 e92376d7 Iustin Pop
                        constants.JOB_STATUS_RUNNING,
1057 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
1058 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
1059 6c5a7090 Michael Hanselmann
        # no changes.
1060 dfe57c22 Michael Hanselmann
        break
1061 dfe57c22 Michael Hanselmann
1062 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
1063 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
1064 6c5a7090 Michael Hanselmann
        break
1065 6c5a7090 Michael Hanselmann
1066 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
1067 6c5a7090 Michael Hanselmann
1068 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
1069 5c735209 Iustin Pop
      job.change.wait(delta_time)
1070 dfe57c22 Michael Hanselmann
1071 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
1072 dfe57c22 Michael Hanselmann
1073 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
1074 dfe57c22 Michael Hanselmann
1075 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1076 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1077 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1078 188c5e0a Michael Hanselmann
    """Cancels a job.
1079 188c5e0a Michael Hanselmann

1080 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1081 ea03467c Iustin Pop

1082 188c5e0a Michael Hanselmann
    @type job_id: string
1083 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1084 188c5e0a Michael Hanselmann

1085 188c5e0a Michael Hanselmann
    """
1086 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1087 188c5e0a Michael Hanselmann
1088 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1089 188c5e0a Michael Hanselmann
    if not job:
1090 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1091 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1092 fbf0262f Michael Hanselmann
1093 fbf0262f Michael Hanselmann
    job_status = job.CalcStatus()
1094 188c5e0a Michael Hanselmann
1095 fbf0262f Michael Hanselmann
    if job_status not in (constants.JOB_STATUS_QUEUED,
1096 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK):
1097 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
1098 fbf0262f Michael Hanselmann
      return (False, "Job %s is no longer in the queue" % job.id)
1099 fbf0262f Michael Hanselmann
1100 fbf0262f Michael Hanselmann
    if job_status == constants.JOB_STATUS_QUEUED:
1101 fbf0262f Michael Hanselmann
      self.CancelJobUnlocked(job)
1102 fbf0262f Michael Hanselmann
      return (True, "Job %s canceled" % job.id)
1103 188c5e0a Michael Hanselmann
1104 fbf0262f Michael Hanselmann
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1105 fbf0262f Michael Hanselmann
      # The worker will notice the new status and cancel the job
1106 fbf0262f Michael Hanselmann
      try:
1107 fbf0262f Michael Hanselmann
        for op in job.ops:
1108 fbf0262f Michael Hanselmann
          op.status = constants.OP_STATUS_CANCELING
1109 fbf0262f Michael Hanselmann
      finally:
1110 fbf0262f Michael Hanselmann
        self.UpdateJobUnlocked(job)
1111 fbf0262f Michael Hanselmann
      return (True, "Job %s will be canceled" % job.id)
1112 fbf0262f Michael Hanselmann
1113 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1114 fbf0262f Michael Hanselmann
  def CancelJobUnlocked(self, job):
1115 fbf0262f Michael Hanselmann
    """Marks a job as canceled.
1116 fbf0262f Michael Hanselmann

1117 fbf0262f Michael Hanselmann
    """
1118 85f03e0d Michael Hanselmann
    try:
1119 85f03e0d Michael Hanselmann
      for op in job.ops:
1120 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1121 fbf0262f Michael Hanselmann
        op.result = "Job canceled by request"
1122 85f03e0d Michael Hanselmann
    finally:
1123 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
1124 188c5e0a Michael Hanselmann
1125 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1126 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
1127 d7fd1f28 Michael Hanselmann
    """Archives jobs.
1128 c609f802 Michael Hanselmann

1129 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
1130 25e7b43f Iustin Pop
    @param jobs: Job objects
1131 d7fd1f28 Michael Hanselmann
    @rtype: int
1132 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
1133 c609f802 Michael Hanselmann

1134 c609f802 Michael Hanselmann
    """
1135 d7fd1f28 Michael Hanselmann
    archive_jobs = []
1136 d7fd1f28 Michael Hanselmann
    rename_files = []
1137 d7fd1f28 Michael Hanselmann
    for job in jobs:
1138 d7fd1f28 Michael Hanselmann
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1139 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_SUCCESS,
1140 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_ERROR):
1141 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
1142 d7fd1f28 Michael Hanselmann
        continue
1143 c609f802 Michael Hanselmann
1144 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
1145 c609f802 Michael Hanselmann
1146 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
1147 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
1148 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
1149 c609f802 Michael Hanselmann
1150 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
1151 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
1152 f1da30e6 Michael Hanselmann
1153 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
1154 d7fd1f28 Michael Hanselmann
                  ", ".join(job.id for job in archive_jobs))
1155 d7fd1f28 Michael Hanselmann
1156 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
1157 78d12585 Michael Hanselmann
1158 07cd723a Iustin Pop
  @utils.LockedMethod
1159 07cd723a Iustin Pop
  @_RequireOpenQueue
1160 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1161 07cd723a Iustin Pop
    """Archives a job.
1162 07cd723a Iustin Pop

1163 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1164 ea03467c Iustin Pop

1165 07cd723a Iustin Pop
    @type job_id: string
1166 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1167 78d12585 Michael Hanselmann
    @rtype: bool
1168 78d12585 Michael Hanselmann
    @return: Whether job was archived
1169 07cd723a Iustin Pop

1170 07cd723a Iustin Pop
    """
1171 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
1172 78d12585 Michael Hanselmann
1173 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1174 78d12585 Michael Hanselmann
    if not job:
1175 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1176 78d12585 Michael Hanselmann
      return False
1177 78d12585 Michael Hanselmann
1178 d7fd1f28 Michael Hanselmann
    return self._ArchiveJobUnlocked([job]) == 1
1179 07cd723a Iustin Pop
1180 07cd723a Iustin Pop
  @utils.LockedMethod
1181 07cd723a Iustin Pop
  @_RequireOpenQueue
1182 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
1183 07cd723a Iustin Pop
    """Archives all jobs based on age.
1184 07cd723a Iustin Pop

1185 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1186 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1187 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1188 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1189 07cd723a Iustin Pop

1190 07cd723a Iustin Pop
    @type age: int
1191 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1192 07cd723a Iustin Pop

1193 07cd723a Iustin Pop
    """
1194 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1195 07cd723a Iustin Pop
1196 07cd723a Iustin Pop
    now = time.time()
1197 f8ad5591 Michael Hanselmann
    end_time = now + timeout
1198 f8ad5591 Michael Hanselmann
    archived_count = 0
1199 f8ad5591 Michael Hanselmann
    last_touched = 0
1200 f8ad5591 Michael Hanselmann
1201 f8ad5591 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1202 d7fd1f28 Michael Hanselmann
    pending = []
1203 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1204 f8ad5591 Michael Hanselmann
      last_touched = idx
1205 f8ad5591 Michael Hanselmann
1206 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
1207 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
1208 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
1209 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
1210 f8ad5591 Michael Hanselmann
        break
1211 f8ad5591 Michael Hanselmann
1212 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
1213 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1214 f8ad5591 Michael Hanselmann
      if job:
1215 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
1216 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
1217 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
1218 f8ad5591 Michael Hanselmann
          else:
1219 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
1220 07cd723a Iustin Pop
        else:
1221 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
1222 f8ad5591 Michael Hanselmann
1223 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
1224 d7fd1f28 Michael Hanselmann
          pending.append(job)
1225 d7fd1f28 Michael Hanselmann
1226 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
1227 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
1228 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
1229 d7fd1f28 Michael Hanselmann
            pending = []
1230 f8ad5591 Michael Hanselmann
1231 d7fd1f28 Michael Hanselmann
    if pending:
1232 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
1233 07cd723a Iustin Pop
1234 f8ad5591 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched - 1)
1235 07cd723a Iustin Pop
1236 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
1237 ea03467c Iustin Pop
    """Returns information about a job.
1238 ea03467c Iustin Pop

1239 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1240 ea03467c Iustin Pop
    @param job: the job which we query
1241 ea03467c Iustin Pop
    @type fields: list
1242 ea03467c Iustin Pop
    @param fields: names of fields to return
1243 ea03467c Iustin Pop
    @rtype: list
1244 ea03467c Iustin Pop
    @return: list with one element for each field
1245 ea03467c Iustin Pop
    @raise errors.OpExecError: when an invalid field
1246 ea03467c Iustin Pop
        has been passed
1247 ea03467c Iustin Pop

1248 ea03467c Iustin Pop
    """
1249 e2715f69 Michael Hanselmann
    row = []
1250 e2715f69 Michael Hanselmann
    for fname in fields:
1251 e2715f69 Michael Hanselmann
      if fname == "id":
1252 e2715f69 Michael Hanselmann
        row.append(job.id)
1253 e2715f69 Michael Hanselmann
      elif fname == "status":
1254 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
1255 af30b2fd Michael Hanselmann
      elif fname == "ops":
1256 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
1257 af30b2fd Michael Hanselmann
      elif fname == "opresult":
1258 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
1259 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
1260 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
1261 5b23c34c Iustin Pop
      elif fname == "oplog":
1262 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
1263 c56ec146 Iustin Pop
      elif fname == "opstart":
1264 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
1265 c56ec146 Iustin Pop
      elif fname == "opend":
1266 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
1267 c56ec146 Iustin Pop
      elif fname == "received_ts":
1268 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
1269 c56ec146 Iustin Pop
      elif fname == "start_ts":
1270 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
1271 c56ec146 Iustin Pop
      elif fname == "end_ts":
1272 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
1273 60dd1473 Iustin Pop
      elif fname == "summary":
1274 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
1275 e2715f69 Michael Hanselmann
      else:
1276 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1277 e2715f69 Michael Hanselmann
    return row
1278 e2715f69 Michael Hanselmann
1279 85f03e0d Michael Hanselmann
  @utils.LockedMethod
1280 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1281 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1282 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1283 e2715f69 Michael Hanselmann

1284 ea03467c Iustin Pop
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1285 ea03467c Iustin Pop
    processing for each job.
1286 ea03467c Iustin Pop

1287 ea03467c Iustin Pop
    @type job_ids: list
1288 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1289 ea03467c Iustin Pop
    @type fields: list
1290 ea03467c Iustin Pop
    @param fields: names of fields to return
1291 ea03467c Iustin Pop
    @rtype: list
1292 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1293 ea03467c Iustin Pop
        the requested fields
1294 e2715f69 Michael Hanselmann

1295 e2715f69 Michael Hanselmann
    """
1296 85f03e0d Michael Hanselmann
    jobs = []
1297 e2715f69 Michael Hanselmann
1298 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
1299 85f03e0d Michael Hanselmann
      if job is None:
1300 85f03e0d Michael Hanselmann
        jobs.append(None)
1301 85f03e0d Michael Hanselmann
      else:
1302 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1303 e2715f69 Michael Hanselmann
1304 85f03e0d Michael Hanselmann
    return jobs
1305 e2715f69 Michael Hanselmann
1306 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1307 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1308 e2715f69 Michael Hanselmann
  def Shutdown(self):
1309 e2715f69 Michael Hanselmann
    """Stops the job queue.
1310 e2715f69 Michael Hanselmann

1311 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1312 ea03467c Iustin Pop

1313 e2715f69 Michael Hanselmann
    """
1314 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1315 85f03e0d Michael Hanselmann
1316 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
1317 04ab05ce Michael Hanselmann
    self._queue_lock = None