Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ f87b405e

History | View | Annotate | Download (36.3 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 e2715f69 Michael Hanselmann
import threading
35 f1da30e6 Michael Hanselmann
import errno
36 f1da30e6 Michael Hanselmann
import re
37 f1048938 Iustin Pop
import time
38 5685c1a5 Michael Hanselmann
import weakref
39 498ae1cc Iustin Pop
40 e2715f69 Michael Hanselmann
from ganeti import constants
41 f1da30e6 Michael Hanselmann
from ganeti import serializer
42 e2715f69 Michael Hanselmann
from ganeti import workerpool
43 f1da30e6 Michael Hanselmann
from ganeti import opcodes
44 7a1ecaed Iustin Pop
from ganeti import errors
45 e2715f69 Michael Hanselmann
from ganeti import mcpu
46 7996a135 Iustin Pop
from ganeti import utils
47 04ab05ce Michael Hanselmann
from ganeti import jstore
48 c3f0a12f Iustin Pop
from ganeti import rpc
49 e2715f69 Michael Hanselmann
50 fbf0262f Michael Hanselmann
51 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
52 e2715f69 Michael Hanselmann
53 498ae1cc Iustin Pop
54 9728ae5d Iustin Pop
class CancelJob(Exception):
55 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
56 fbf0262f Michael Hanselmann

57 fbf0262f Michael Hanselmann
  """
58 fbf0262f Michael Hanselmann
59 fbf0262f Michael Hanselmann
60 70552c46 Michael Hanselmann
def TimeStampNow():
61 ea03467c Iustin Pop
  """Returns the current timestamp.
62 ea03467c Iustin Pop

63 ea03467c Iustin Pop
  @rtype: tuple
64 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
65 ea03467c Iustin Pop

66 ea03467c Iustin Pop
  """
67 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
68 70552c46 Michael Hanselmann
69 70552c46 Michael Hanselmann
70 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
71 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
72 e2715f69 Michael Hanselmann

73 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
74 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
75 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
76 ea03467c Iustin Pop
  @ivar status: the current status
77 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
78 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
79 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
80 f1048938 Iustin Pop

81 e2715f69 Michael Hanselmann
  """
82 85f03e0d Michael Hanselmann
  def __init__(self, op):
83 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
84 ea03467c Iustin Pop

85 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
86 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
87 ea03467c Iustin Pop

88 ea03467c Iustin Pop
    """
89 85f03e0d Michael Hanselmann
    self.input = op
90 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
91 85f03e0d Michael Hanselmann
    self.result = None
92 85f03e0d Michael Hanselmann
    self.log = []
93 70552c46 Michael Hanselmann
    self.start_timestamp = None
94 70552c46 Michael Hanselmann
    self.end_timestamp = None
95 f1da30e6 Michael Hanselmann
96 f1da30e6 Michael Hanselmann
  @classmethod
97 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
98 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
99 ea03467c Iustin Pop

100 ea03467c Iustin Pop
    @type state: dict
101 ea03467c Iustin Pop
    @param state: the serialized state
102 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
103 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
104 ea03467c Iustin Pop

105 ea03467c Iustin Pop
    """
106 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
107 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
108 85f03e0d Michael Hanselmann
    obj.status = state["status"]
109 85f03e0d Michael Hanselmann
    obj.result = state["result"]
110 85f03e0d Michael Hanselmann
    obj.log = state["log"]
111 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
112 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
113 f1da30e6 Michael Hanselmann
    return obj
114 f1da30e6 Michael Hanselmann
115 f1da30e6 Michael Hanselmann
  def Serialize(self):
116 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
117 ea03467c Iustin Pop

118 ea03467c Iustin Pop
    @rtype: dict
119 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
120 ea03467c Iustin Pop

121 ea03467c Iustin Pop
    """
122 6c5a7090 Michael Hanselmann
    return {
123 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
124 6c5a7090 Michael Hanselmann
      "status": self.status,
125 6c5a7090 Michael Hanselmann
      "result": self.result,
126 6c5a7090 Michael Hanselmann
      "log": self.log,
127 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
128 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
129 6c5a7090 Michael Hanselmann
      }
130 f1048938 Iustin Pop
131 e2715f69 Michael Hanselmann
132 e2715f69 Michael Hanselmann
class _QueuedJob(object):
133 e2715f69 Michael Hanselmann
  """In-memory job representation.
134 e2715f69 Michael Hanselmann

135 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
136 ea03467c Iustin Pop
  be taken care of by users of this class.
137 ea03467c Iustin Pop

138 ea03467c Iustin Pop
  @type queue: L{JobQueue}
139 ea03467c Iustin Pop
  @ivar queue: the parent queue
140 ea03467c Iustin Pop
  @ivar id: the job ID
141 ea03467c Iustin Pop
  @type ops: list
142 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
143 ea03467c Iustin Pop
  @type run_op_index: int
144 ea03467c Iustin Pop
  @ivar run_op_index: the currently executing opcode, or -1 if
145 ea03467c Iustin Pop
      we didn't yet start executing
146 ea03467c Iustin Pop
  @type log_serial: int
147 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
148 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
149 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
150 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
151 ea03467c Iustin Pop
  @ivar change: a Condition variable we use for waiting for job changes
152 e2715f69 Michael Hanselmann

153 e2715f69 Michael Hanselmann
  """
154 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
155 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
156 ea03467c Iustin Pop

157 ea03467c Iustin Pop
    @type queue: L{JobQueue}
158 ea03467c Iustin Pop
    @param queue: our parent queue
159 ea03467c Iustin Pop
    @type job_id: job_id
160 ea03467c Iustin Pop
    @param job_id: our job id
161 ea03467c Iustin Pop
    @type ops: list
162 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
163 ea03467c Iustin Pop
        in _QueuedOpCodes
164 ea03467c Iustin Pop

165 ea03467c Iustin Pop
    """
166 e2715f69 Michael Hanselmann
    if not ops:
167 ea03467c Iustin Pop
      # TODO: use a better exception
168 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
169 e2715f69 Michael Hanselmann
170 85f03e0d Michael Hanselmann
    self.queue = queue
171 f1da30e6 Michael Hanselmann
    self.id = job_id
172 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
173 85f03e0d Michael Hanselmann
    self.run_op_index = -1
174 6c5a7090 Michael Hanselmann
    self.log_serial = 0
175 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
176 c56ec146 Iustin Pop
    self.start_timestamp = None
177 c56ec146 Iustin Pop
    self.end_timestamp = None
178 6c5a7090 Michael Hanselmann
179 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
180 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
181 f1da30e6 Michael Hanselmann
182 f1da30e6 Michael Hanselmann
  @classmethod
183 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
184 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
185 ea03467c Iustin Pop

186 ea03467c Iustin Pop
    @type queue: L{JobQueue}
187 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
188 ea03467c Iustin Pop
    @type state: dict
189 ea03467c Iustin Pop
    @param state: the serialized state
190 ea03467c Iustin Pop
    @rtype: _JobQueue
191 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
192 ea03467c Iustin Pop

193 ea03467c Iustin Pop
    """
194 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
195 85f03e0d Michael Hanselmann
    obj.queue = queue
196 85f03e0d Michael Hanselmann
    obj.id = state["id"]
197 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
198 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
199 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
200 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
201 6c5a7090 Michael Hanselmann
202 6c5a7090 Michael Hanselmann
    obj.ops = []
203 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
204 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
205 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
206 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
207 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
208 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
209 6c5a7090 Michael Hanselmann
210 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
211 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
212 6c5a7090 Michael Hanselmann
213 f1da30e6 Michael Hanselmann
    return obj
214 f1da30e6 Michael Hanselmann
215 f1da30e6 Michael Hanselmann
  def Serialize(self):
216 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
217 ea03467c Iustin Pop

218 ea03467c Iustin Pop
    @rtype: dict
219 ea03467c Iustin Pop
    @return: the serialized state
220 ea03467c Iustin Pop

221 ea03467c Iustin Pop
    """
222 f1da30e6 Michael Hanselmann
    return {
223 f1da30e6 Michael Hanselmann
      "id": self.id,
224 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
225 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
226 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
227 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
228 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
229 f1da30e6 Michael Hanselmann
      }
230 f1da30e6 Michael Hanselmann
231 85f03e0d Michael Hanselmann
  def CalcStatus(self):
232 ea03467c Iustin Pop
    """Compute the status of this job.
233 ea03467c Iustin Pop

234 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
235 ea03467c Iustin Pop
    based on their status, computes the job status.
236 ea03467c Iustin Pop

237 ea03467c Iustin Pop
    The algorithm is:
238 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
239 ea03467c Iustin Pop
        status will be the same
240 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
241 ea03467c Iustin Pop
          - waitlock
242 fbf0262f Michael Hanselmann
          - canceling
243 ea03467c Iustin Pop
          - running
244 ea03467c Iustin Pop

245 ea03467c Iustin Pop
        will determine the job status
246 ea03467c Iustin Pop

247 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
248 ea03467c Iustin Pop
        and the job status will be the same
249 ea03467c Iustin Pop

250 ea03467c Iustin Pop
    @return: the job status
251 ea03467c Iustin Pop

252 ea03467c Iustin Pop
    """
253 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
254 e2715f69 Michael Hanselmann
255 e2715f69 Michael Hanselmann
    all_success = True
256 85f03e0d Michael Hanselmann
    for op in self.ops:
257 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
258 e2715f69 Michael Hanselmann
        continue
259 e2715f69 Michael Hanselmann
260 e2715f69 Michael Hanselmann
      all_success = False
261 e2715f69 Michael Hanselmann
262 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
263 e2715f69 Michael Hanselmann
        pass
264 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
265 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
266 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
267 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
268 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
269 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
270 fbf0262f Michael Hanselmann
        break
271 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
272 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
273 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
274 f1da30e6 Michael Hanselmann
        break
275 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
276 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
277 4cb1d919 Michael Hanselmann
        break
278 e2715f69 Michael Hanselmann
279 e2715f69 Michael Hanselmann
    if all_success:
280 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
281 e2715f69 Michael Hanselmann
282 e2715f69 Michael Hanselmann
    return status
283 e2715f69 Michael Hanselmann
284 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
285 ea03467c Iustin Pop
    """Selectively returns the log entries.
286 ea03467c Iustin Pop

287 ea03467c Iustin Pop
    @type newer_than: None or int
288 ea03467c Iustin Pop
    @param newer_than: if this is None, return all log enties,
289 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
290 ea03467c Iustin Pop
        than this value
291 ea03467c Iustin Pop
    @rtype: list
292 ea03467c Iustin Pop
    @return: the list of the log entries selected
293 ea03467c Iustin Pop

294 ea03467c Iustin Pop
    """
295 6c5a7090 Michael Hanselmann
    if newer_than is None:
296 6c5a7090 Michael Hanselmann
      serial = -1
297 6c5a7090 Michael Hanselmann
    else:
298 6c5a7090 Michael Hanselmann
      serial = newer_than
299 6c5a7090 Michael Hanselmann
300 6c5a7090 Michael Hanselmann
    entries = []
301 6c5a7090 Michael Hanselmann
    for op in self.ops:
302 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
303 6c5a7090 Michael Hanselmann
304 6c5a7090 Michael Hanselmann
    return entries
305 6c5a7090 Michael Hanselmann
306 f1048938 Iustin Pop
307 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
308 ea03467c Iustin Pop
  """The actual job workers.
309 ea03467c Iustin Pop

310 ea03467c Iustin Pop
  """
311 e92376d7 Iustin Pop
  def _NotifyStart(self):
312 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
313 e92376d7 Iustin Pop

314 e92376d7 Iustin Pop
    This is called from the mcpu code as a notifier function, when the
315 e92376d7 Iustin Pop
    LU is finally about to start the Exec() method. Of course, to have
316 e92376d7 Iustin Pop
    end-user visible results, the opcode must be initially (before
317 e92376d7 Iustin Pop
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
318 e92376d7 Iustin Pop

319 e92376d7 Iustin Pop
    """
320 e92376d7 Iustin Pop
    assert self.queue, "Queue attribute is missing"
321 e92376d7 Iustin Pop
    assert self.opcode, "Opcode attribute is missing"
322 e92376d7 Iustin Pop
323 e92376d7 Iustin Pop
    self.queue.acquire()
324 e92376d7 Iustin Pop
    try:
325 fbf0262f Michael Hanselmann
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
326 fbf0262f Michael Hanselmann
                                    constants.OP_STATUS_CANCELING)
327 fbf0262f Michael Hanselmann
328 fbf0262f Michael Hanselmann
      # Cancel here if we were asked to
329 fbf0262f Michael Hanselmann
      if self.opcode.status == constants.OP_STATUS_CANCELING:
330 fbf0262f Michael Hanselmann
        raise CancelJob()
331 fbf0262f Michael Hanselmann
332 e92376d7 Iustin Pop
      self.opcode.status = constants.OP_STATUS_RUNNING
333 e92376d7 Iustin Pop
    finally:
334 e92376d7 Iustin Pop
      self.queue.release()
335 e92376d7 Iustin Pop
336 85f03e0d Michael Hanselmann
  def RunTask(self, job):
337 e2715f69 Michael Hanselmann
    """Job executor.
338 e2715f69 Michael Hanselmann

339 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
340 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
341 e2715f69 Michael Hanselmann

342 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
343 ea03467c Iustin Pop
    @param job: the job to be processed
344 ea03467c Iustin Pop

345 e2715f69 Michael Hanselmann
    """
346 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
347 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
348 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
349 e92376d7 Iustin Pop
    self.queue = queue = job.queue
350 e2715f69 Michael Hanselmann
    try:
351 85f03e0d Michael Hanselmann
      try:
352 85f03e0d Michael Hanselmann
        count = len(job.ops)
353 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
354 85f03e0d Michael Hanselmann
          try:
355 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
356 85f03e0d Michael Hanselmann
357 85f03e0d Michael Hanselmann
            queue.acquire()
358 85f03e0d Michael Hanselmann
            try:
359 fbf0262f Michael Hanselmann
              assert op.status == constants.OP_STATUS_QUEUED
360 85f03e0d Michael Hanselmann
              job.run_op_index = idx
361 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
362 85f03e0d Michael Hanselmann
              op.result = None
363 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
364 c56ec146 Iustin Pop
              if idx == 0: # first opcode
365 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
366 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
367 85f03e0d Michael Hanselmann
368 38206f3c Iustin Pop
              input_opcode = op.input
369 85f03e0d Michael Hanselmann
            finally:
370 85f03e0d Michael Hanselmann
              queue.release()
371 85f03e0d Michael Hanselmann
372 dfe57c22 Michael Hanselmann
            def _Log(*args):
373 6c5a7090 Michael Hanselmann
              """Append a log entry.
374 6c5a7090 Michael Hanselmann

375 6c5a7090 Michael Hanselmann
              """
376 6c5a7090 Michael Hanselmann
              assert len(args) < 3
377 6c5a7090 Michael Hanselmann
378 6c5a7090 Michael Hanselmann
              if len(args) == 1:
379 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
380 6c5a7090 Michael Hanselmann
                log_msg = args[0]
381 6c5a7090 Michael Hanselmann
              else:
382 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
383 6c5a7090 Michael Hanselmann
384 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
385 6c5a7090 Michael Hanselmann
              # precision.
386 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
387 dfe57c22 Michael Hanselmann
388 6c5a7090 Michael Hanselmann
              queue.acquire()
389 dfe57c22 Michael Hanselmann
              try:
390 6c5a7090 Michael Hanselmann
                job.log_serial += 1
391 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
392 6c5a7090 Michael Hanselmann
393 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
394 dfe57c22 Michael Hanselmann
              finally:
395 6c5a7090 Michael Hanselmann
                queue.release()
396 dfe57c22 Michael Hanselmann
397 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
398 e92376d7 Iustin Pop
            self.opcode = op
399 e92376d7 Iustin Pop
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
400 85f03e0d Michael Hanselmann
401 85f03e0d Michael Hanselmann
            queue.acquire()
402 85f03e0d Michael Hanselmann
            try:
403 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
404 85f03e0d Michael Hanselmann
              op.result = result
405 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
406 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
407 85f03e0d Michael Hanselmann
            finally:
408 85f03e0d Michael Hanselmann
              queue.release()
409 85f03e0d Michael Hanselmann
410 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
411 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
412 fbf0262f Michael Hanselmann
          except CancelJob:
413 fbf0262f Michael Hanselmann
            # Will be handled further up
414 fbf0262f Michael Hanselmann
            raise
415 85f03e0d Michael Hanselmann
          except Exception, err:
416 85f03e0d Michael Hanselmann
            queue.acquire()
417 85f03e0d Michael Hanselmann
            try:
418 85f03e0d Michael Hanselmann
              try:
419 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
420 85f03e0d Michael Hanselmann
                op.result = str(err)
421 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
422 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
423 85f03e0d Michael Hanselmann
              finally:
424 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
425 85f03e0d Michael Hanselmann
            finally:
426 85f03e0d Michael Hanselmann
              queue.release()
427 85f03e0d Michael Hanselmann
            raise
428 85f03e0d Michael Hanselmann
429 fbf0262f Michael Hanselmann
      except CancelJob:
430 fbf0262f Michael Hanselmann
        queue.acquire()
431 fbf0262f Michael Hanselmann
        try:
432 fbf0262f Michael Hanselmann
          queue.CancelJobUnlocked(job)
433 fbf0262f Michael Hanselmann
        finally:
434 fbf0262f Michael Hanselmann
          queue.release()
435 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
436 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
437 85f03e0d Michael Hanselmann
      except:
438 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
439 e2715f69 Michael Hanselmann
    finally:
440 85f03e0d Michael Hanselmann
      queue.acquire()
441 85f03e0d Michael Hanselmann
      try:
442 65548ed5 Michael Hanselmann
        try:
443 65548ed5 Michael Hanselmann
          job.run_op_idx = -1
444 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
445 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
446 65548ed5 Michael Hanselmann
        finally:
447 65548ed5 Michael Hanselmann
          job_id = job.id
448 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
449 85f03e0d Michael Hanselmann
      finally:
450 85f03e0d Michael Hanselmann
        queue.release()
451 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
452 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
453 e2715f69 Michael Hanselmann
454 e2715f69 Michael Hanselmann
455 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
456 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
457 ea03467c Iustin Pop

458 ea03467c Iustin Pop
  """
459 5bdce580 Michael Hanselmann
  def __init__(self, queue):
460 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
461 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
462 5bdce580 Michael Hanselmann
    self.queue = queue
463 e2715f69 Michael Hanselmann
464 e2715f69 Michael Hanselmann
465 85f03e0d Michael Hanselmann
class JobQueue(object):
466 ea03467c Iustin Pop
  """Quue used to manaage the jobs.
467 ea03467c Iustin Pop

468 ea03467c Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
469 ea03467c Iustin Pop

470 ea03467c Iustin Pop
  """
471 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
472 f1da30e6 Michael Hanselmann
473 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
474 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
475 db37da70 Michael Hanselmann

476 ea03467c Iustin Pop
    This function should be used for all 'public' functions. That is,
477 ea03467c Iustin Pop
    functions usually called from other classes.
478 db37da70 Michael Hanselmann

479 ea03467c Iustin Pop
    @warning: Use this decorator only after utils.LockedMethod!
480 db37da70 Michael Hanselmann

481 ea03467c Iustin Pop
    Example::
482 db37da70 Michael Hanselmann
      @utils.LockedMethod
483 db37da70 Michael Hanselmann
      @_RequireOpenQueue
484 db37da70 Michael Hanselmann
      def Example(self):
485 db37da70 Michael Hanselmann
        pass
486 db37da70 Michael Hanselmann

487 db37da70 Michael Hanselmann
    """
488 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
489 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
490 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
491 db37da70 Michael Hanselmann
    return wrapper
492 db37da70 Michael Hanselmann
493 85f03e0d Michael Hanselmann
  def __init__(self, context):
494 ea03467c Iustin Pop
    """Constructor for JobQueue.
495 ea03467c Iustin Pop

496 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
497 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
498 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
499 ea03467c Iustin Pop
    running).
500 ea03467c Iustin Pop

501 ea03467c Iustin Pop
    @type context: GanetiContext
502 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
503 ea03467c Iustin Pop
        data and other ganeti objects
504 ea03467c Iustin Pop

505 ea03467c Iustin Pop
    """
506 5bdce580 Michael Hanselmann
    self.context = context
507 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
508 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
509 f1da30e6 Michael Hanselmann
510 85f03e0d Michael Hanselmann
    # Locking
511 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
512 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
513 85f03e0d Michael Hanselmann
    self.release = self._lock.release
514 85f03e0d Michael Hanselmann
515 04ab05ce Michael Hanselmann
    # Initialize
516 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
517 f1da30e6 Michael Hanselmann
518 04ab05ce Michael Hanselmann
    # Read serial file
519 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
520 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
521 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
522 c4beba1c Iustin Pop
523 23752136 Michael Hanselmann
    # Get initial list of nodes
524 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
525 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
526 59303563 Iustin Pop
                       if n.master_candidate)
527 8e00939c Michael Hanselmann
528 8e00939c Michael Hanselmann
    # Remove master node
529 8e00939c Michael Hanselmann
    try:
530 99aabbed Iustin Pop
      del self._nodes[self._my_hostname]
531 33987705 Iustin Pop
    except KeyError:
532 8e00939c Michael Hanselmann
      pass
533 23752136 Michael Hanselmann
534 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
535 23752136 Michael Hanselmann
536 85f03e0d Michael Hanselmann
    # Setup worker pool
537 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
538 85f03e0d Michael Hanselmann
    try:
539 16714921 Michael Hanselmann
      # We need to lock here because WorkerPool.AddTask() may start a job while
540 16714921 Michael Hanselmann
      # we're still doing our work.
541 16714921 Michael Hanselmann
      self.acquire()
542 16714921 Michael Hanselmann
      try:
543 711b5124 Michael Hanselmann
        logging.info("Inspecting job queue")
544 711b5124 Michael Hanselmann
545 711b5124 Michael Hanselmann
        all_job_ids = self._GetJobIDsUnlocked()
546 b7cb9024 Michael Hanselmann
        jobs_count = len(all_job_ids)
547 711b5124 Michael Hanselmann
        lastinfo = time.time()
548 711b5124 Michael Hanselmann
        for idx, job_id in enumerate(all_job_ids):
549 711b5124 Michael Hanselmann
          # Give an update every 1000 jobs or 10 seconds
550 b7cb9024 Michael Hanselmann
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
551 b7cb9024 Michael Hanselmann
              idx == (jobs_count - 1)):
552 711b5124 Michael Hanselmann
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
553 b7cb9024 Michael Hanselmann
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
554 711b5124 Michael Hanselmann
            lastinfo = time.time()
555 711b5124 Michael Hanselmann
556 711b5124 Michael Hanselmann
          job = self._LoadJobUnlocked(job_id)
557 711b5124 Michael Hanselmann
558 16714921 Michael Hanselmann
          # a failure in loading the job can cause 'None' to be returned
559 16714921 Michael Hanselmann
          if job is None:
560 16714921 Michael Hanselmann
            continue
561 94ed59a5 Iustin Pop
562 16714921 Michael Hanselmann
          status = job.CalcStatus()
563 85f03e0d Michael Hanselmann
564 16714921 Michael Hanselmann
          if status in (constants.JOB_STATUS_QUEUED, ):
565 16714921 Michael Hanselmann
            self._wpool.AddTask(job)
566 85f03e0d Michael Hanselmann
567 16714921 Michael Hanselmann
          elif status in (constants.JOB_STATUS_RUNNING,
568 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK,
569 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_CANCELING):
570 16714921 Michael Hanselmann
            logging.warning("Unfinished job %s found: %s", job.id, job)
571 16714921 Michael Hanselmann
            try:
572 16714921 Michael Hanselmann
              for op in job.ops:
573 16714921 Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
574 16714921 Michael Hanselmann
                op.result = "Unclean master daemon shutdown"
575 16714921 Michael Hanselmann
            finally:
576 16714921 Michael Hanselmann
              self.UpdateJobUnlocked(job)
577 711b5124 Michael Hanselmann
578 711b5124 Michael Hanselmann
        logging.info("Job queue inspection finished")
579 16714921 Michael Hanselmann
      finally:
580 16714921 Michael Hanselmann
        self.release()
581 16714921 Michael Hanselmann
    except:
582 16714921 Michael Hanselmann
      self._wpool.TerminateWorkers()
583 16714921 Michael Hanselmann
      raise
584 85f03e0d Michael Hanselmann
585 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
586 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
587 99aabbed Iustin Pop
  def AddNode(self, node):
588 99aabbed Iustin Pop
    """Register a new node with the queue.
589 99aabbed Iustin Pop

590 99aabbed Iustin Pop
    @type node: L{objects.Node}
591 99aabbed Iustin Pop
    @param node: the node object to be added
592 99aabbed Iustin Pop

593 99aabbed Iustin Pop
    """
594 99aabbed Iustin Pop
    node_name = node.name
595 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
596 23752136 Michael Hanselmann
597 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
598 a3811745 Michael Hanselmann
    rpc.RpcRunner.call_jobqueue_purge(node_name)
599 23752136 Michael Hanselmann
600 59303563 Iustin Pop
    if not node.master_candidate:
601 59303563 Iustin Pop
      # remove if existing, ignoring errors
602 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
603 59303563 Iustin Pop
      # and skip the replication of the job ids
604 59303563 Iustin Pop
      return
605 59303563 Iustin Pop
606 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
607 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
608 23752136 Michael Hanselmann
609 d2e03a33 Michael Hanselmann
    # Upload current serial file
610 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
611 d2e03a33 Michael Hanselmann
612 d2e03a33 Michael Hanselmann
    for file_name in files:
613 9f774ee8 Michael Hanselmann
      # Read file content
614 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
615 9f774ee8 Michael Hanselmann
      try:
616 9f774ee8 Michael Hanselmann
        content = fd.read()
617 9f774ee8 Michael Hanselmann
      finally:
618 9f774ee8 Michael Hanselmann
        fd.close()
619 9f774ee8 Michael Hanselmann
620 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
621 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
622 a3811745 Michael Hanselmann
                                                  file_name, content)
623 d2e03a33 Michael Hanselmann
      if not result[node_name]:
624 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
625 d2e03a33 Michael Hanselmann
626 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
627 d2e03a33 Michael Hanselmann
628 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
629 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
630 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
631 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
632 ea03467c Iustin Pop

633 ea03467c Iustin Pop
    @type node_name: str
634 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
635 ea03467c Iustin Pop

636 ea03467c Iustin Pop
    """
637 23752136 Michael Hanselmann
    try:
638 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
639 99aabbed Iustin Pop
      del self._nodes[node_name]
640 d2e03a33 Michael Hanselmann
    except KeyError:
641 23752136 Michael Hanselmann
      pass
642 23752136 Michael Hanselmann
643 e74798c1 Michael Hanselmann
  def _CheckRpcResult(self, result, nodes, failmsg):
644 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
645 ea03467c Iustin Pop

646 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
647 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
648 ea03467c Iustin Pop
    log the case when more than half of the nodes failes.
649 ea03467c Iustin Pop

650 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
651 ea03467c Iustin Pop
    @type nodes: list
652 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
653 ea03467c Iustin Pop
    @type failmsg: str
654 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
655 ea03467c Iustin Pop

656 ea03467c Iustin Pop
    """
657 e74798c1 Michael Hanselmann
    failed = []
658 e74798c1 Michael Hanselmann
    success = []
659 e74798c1 Michael Hanselmann
660 e74798c1 Michael Hanselmann
    for node in nodes:
661 e74798c1 Michael Hanselmann
      if result[node]:
662 e74798c1 Michael Hanselmann
        success.append(node)
663 e74798c1 Michael Hanselmann
      else:
664 e74798c1 Michael Hanselmann
        failed.append(node)
665 e74798c1 Michael Hanselmann
666 e74798c1 Michael Hanselmann
    if failed:
667 e74798c1 Michael Hanselmann
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
668 e74798c1 Michael Hanselmann
669 e74798c1 Michael Hanselmann
    # +1 for the master node
670 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
671 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
672 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
673 e74798c1 Michael Hanselmann
674 99aabbed Iustin Pop
  def _GetNodeIp(self):
675 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
676 99aabbed Iustin Pop

677 ea03467c Iustin Pop
    @rtype: (list, list)
678 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
679 ea03467c Iustin Pop
        names and the second one with the node addresses
680 ea03467c Iustin Pop

681 99aabbed Iustin Pop
    """
682 99aabbed Iustin Pop
    name_list = self._nodes.keys()
683 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
684 99aabbed Iustin Pop
    return name_list, addr_list
685 99aabbed Iustin Pop
686 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
687 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
688 8e00939c Michael Hanselmann

689 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
690 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
691 ea03467c Iustin Pop

692 ea03467c Iustin Pop
    @type file_name: str
693 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
694 ea03467c Iustin Pop
    @type data: str
695 ea03467c Iustin Pop
    @param data: the new contents of the file
696 ea03467c Iustin Pop

697 8e00939c Michael Hanselmann
    """
698 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
699 8e00939c Michael Hanselmann
700 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
701 a3811745 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
702 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
703 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
704 23752136 Michael Hanselmann
705 abc1f2ce Michael Hanselmann
  def _RenameFileUnlocked(self, old, new):
706 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
707 ea03467c Iustin Pop

708 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
709 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
710 ea03467c Iustin Pop

711 ea03467c Iustin Pop
    @type old: str
712 ea03467c Iustin Pop
    @param old: the current name of the file
713 ea03467c Iustin Pop
    @type new: str
714 ea03467c Iustin Pop
    @param new: the new name of the file
715 ea03467c Iustin Pop

716 ea03467c Iustin Pop
    """
717 abc1f2ce Michael Hanselmann
    os.rename(old, new)
718 abc1f2ce Michael Hanselmann
719 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
720 a3811745 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
721 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
722 e74798c1 Michael Hanselmann
                         "Moving %s to %s" % (old, new))
723 abc1f2ce Michael Hanselmann
724 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
725 ea03467c Iustin Pop
    """Convert a job ID to string format.
726 ea03467c Iustin Pop

727 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
728 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
729 ea03467c Iustin Pop
    abstract this change.
730 ea03467c Iustin Pop

731 ea03467c Iustin Pop
    @type job_id: int or long
732 ea03467c Iustin Pop
    @param job_id: the numeric job id
733 ea03467c Iustin Pop
    @rtype: str
734 ea03467c Iustin Pop
    @return: the formatted job id
735 ea03467c Iustin Pop

736 ea03467c Iustin Pop
    """
737 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
738 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
739 85f03e0d Michael Hanselmann
    if job_id < 0:
740 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
741 85f03e0d Michael Hanselmann
742 85f03e0d Michael Hanselmann
    return str(job_id)
743 85f03e0d Michael Hanselmann
744 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
745 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
746 f1da30e6 Michael Hanselmann

747 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
748 f1da30e6 Michael Hanselmann

749 ea03467c Iustin Pop
    @rtype: str
750 ea03467c Iustin Pop
    @return: a string representing the job identifier.
751 f1da30e6 Michael Hanselmann

752 f1da30e6 Michael Hanselmann
    """
753 f1da30e6 Michael Hanselmann
    # New number
754 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
755 f1da30e6 Michael Hanselmann
756 f1da30e6 Michael Hanselmann
    # Write to file
757 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
758 23752136 Michael Hanselmann
                                        "%s\n" % serial)
759 f1da30e6 Michael Hanselmann
760 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
761 f1da30e6 Michael Hanselmann
    self._last_serial = serial
762 f1da30e6 Michael Hanselmann
763 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
764 f1da30e6 Michael Hanselmann
765 85f03e0d Michael Hanselmann
  @staticmethod
766 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
767 ea03467c Iustin Pop
    """Returns the job file for a given job id.
768 ea03467c Iustin Pop

769 ea03467c Iustin Pop
    @type job_id: str
770 ea03467c Iustin Pop
    @param job_id: the job identifier
771 ea03467c Iustin Pop
    @rtype: str
772 ea03467c Iustin Pop
    @return: the path to the job file
773 ea03467c Iustin Pop

774 ea03467c Iustin Pop
    """
775 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
776 f1da30e6 Michael Hanselmann
777 85f03e0d Michael Hanselmann
  @staticmethod
778 85f03e0d Michael Hanselmann
  def _GetArchivedJobPath(job_id):
779 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
780 ea03467c Iustin Pop

781 ea03467c Iustin Pop
    @type job_id: str
782 ea03467c Iustin Pop
    @param job_id: the job identifier
783 ea03467c Iustin Pop
    @rtype: str
784 ea03467c Iustin Pop
    @return: the path to the archived job file
785 ea03467c Iustin Pop

786 ea03467c Iustin Pop
    """
787 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
788 0cb94105 Michael Hanselmann
789 85f03e0d Michael Hanselmann
  @classmethod
790 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
791 ea03467c Iustin Pop
    """Extract the job id from a filename.
792 ea03467c Iustin Pop

793 ea03467c Iustin Pop
    @type name: str
794 ea03467c Iustin Pop
    @param name: the job filename
795 ea03467c Iustin Pop
    @rtype: job id or None
796 ea03467c Iustin Pop
    @return: the job id corresponding to the given filename,
797 ea03467c Iustin Pop
        or None if the filename does not represent a valid
798 ea03467c Iustin Pop
        job file
799 ea03467c Iustin Pop

800 ea03467c Iustin Pop
    """
801 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
802 fae737ac Michael Hanselmann
    if m:
803 fae737ac Michael Hanselmann
      return m.group(1)
804 fae737ac Michael Hanselmann
    else:
805 fae737ac Michael Hanselmann
      return None
806 fae737ac Michael Hanselmann
807 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
808 911a495b Iustin Pop
    """Return all known job IDs.
809 911a495b Iustin Pop

810 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
811 911a495b Iustin Pop
    included. Currently this argument is unused.
812 911a495b Iustin Pop

813 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
814 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
815 ac0930b9 Iustin Pop
    extra IDs).
816 ac0930b9 Iustin Pop

817 ea03467c Iustin Pop
    @rtype: list
818 ea03467c Iustin Pop
    @return: the list of job IDs
819 ea03467c Iustin Pop

820 911a495b Iustin Pop
    """
821 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
822 3b87986e Iustin Pop
    jlist = utils.NiceSort(jlist)
823 f0d874fe Iustin Pop
    return jlist
824 911a495b Iustin Pop
825 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
826 ea03467c Iustin Pop
    """Returns the list of current job files.
827 ea03467c Iustin Pop

828 ea03467c Iustin Pop
    @rtype: list
829 ea03467c Iustin Pop
    @return: the list of job file names
830 ea03467c Iustin Pop

831 ea03467c Iustin Pop
    """
832 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
833 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
834 f1da30e6 Michael Hanselmann
835 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
836 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
837 ea03467c Iustin Pop

838 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
839 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
840 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
841 ea03467c Iustin Pop

842 ea03467c Iustin Pop
    @param job_id: the job id
843 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
844 ea03467c Iustin Pop
    @return: either None or the job object
845 ea03467c Iustin Pop

846 ea03467c Iustin Pop
    """
847 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
848 5685c1a5 Michael Hanselmann
    if job:
849 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
850 5685c1a5 Michael Hanselmann
      return job
851 ac0930b9 Iustin Pop
852 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
853 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
854 f1da30e6 Michael Hanselmann
    try:
855 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
856 f1da30e6 Michael Hanselmann
    except IOError, err:
857 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
858 f1da30e6 Michael Hanselmann
        return None
859 f1da30e6 Michael Hanselmann
      raise
860 f1da30e6 Michael Hanselmann
    try:
861 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
862 f1da30e6 Michael Hanselmann
    finally:
863 f1da30e6 Michael Hanselmann
      fd.close()
864 f1da30e6 Michael Hanselmann
865 94ed59a5 Iustin Pop
    try:
866 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
867 94ed59a5 Iustin Pop
    except Exception, err:
868 94ed59a5 Iustin Pop
      new_path = self._GetArchivedJobPath(job_id)
869 94ed59a5 Iustin Pop
      if filepath == new_path:
870 94ed59a5 Iustin Pop
        # job already archived (future case)
871 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s", job_id)
872 94ed59a5 Iustin Pop
      else:
873 94ed59a5 Iustin Pop
        # non-archived case
874 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s, will archive.", job_id)
875 94ed59a5 Iustin Pop
        self._RenameFileUnlocked(filepath, new_path)
876 94ed59a5 Iustin Pop
      return None
877 94ed59a5 Iustin Pop
878 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
879 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
880 ac0930b9 Iustin Pop
    return job
881 f1da30e6 Michael Hanselmann
882 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
883 ea03467c Iustin Pop
    """Return a list of jobs based on their IDs.
884 ea03467c Iustin Pop

885 ea03467c Iustin Pop
    @type job_ids: list
886 ea03467c Iustin Pop
    @param job_ids: either an empty list (meaning all jobs),
887 ea03467c Iustin Pop
        or a list of job IDs
888 ea03467c Iustin Pop
    @rtype: list
889 ea03467c Iustin Pop
    @return: the list of job objects
890 ea03467c Iustin Pop

891 ea03467c Iustin Pop
    """
892 911a495b Iustin Pop
    if not job_ids:
893 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
894 f1da30e6 Michael Hanselmann
895 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
896 f1da30e6 Michael Hanselmann
897 686d7433 Iustin Pop
  @staticmethod
898 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
899 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
900 686d7433 Iustin Pop

901 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
902 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
903 686d7433 Iustin Pop

904 ea03467c Iustin Pop
    @rtype: boolean
905 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
906 ea03467c Iustin Pop

907 686d7433 Iustin Pop
    """
908 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
909 686d7433 Iustin Pop
910 3ccafd0e Iustin Pop
  @staticmethod
911 3ccafd0e Iustin Pop
  def SetDrainFlag(drain_flag):
912 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
913 3ccafd0e Iustin Pop

914 3ccafd0e Iustin Pop
    This is similar to the function L{backend.JobQueueSetDrainFlag},
915 3ccafd0e Iustin Pop
    and in the future we might merge them.
916 3ccafd0e Iustin Pop

917 ea03467c Iustin Pop
    @type drain_flag: boolean
918 ea03467c Iustin Pop
    @param drain_flag: wheter to set or unset the drain flag
919 ea03467c Iustin Pop

920 3ccafd0e Iustin Pop
    """
921 3ccafd0e Iustin Pop
    if drain_flag:
922 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
923 3ccafd0e Iustin Pop
    else:
924 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
925 3ccafd0e Iustin Pop
    return True
926 3ccafd0e Iustin Pop
927 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
928 db37da70 Michael Hanselmann
  @_RequireOpenQueue
929 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
930 85f03e0d Michael Hanselmann
    """Create and store a new job.
931 f1da30e6 Michael Hanselmann

932 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
933 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
934 c3f0a12f Iustin Pop

935 c3f0a12f Iustin Pop
    @type ops: list
936 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
937 ea03467c Iustin Pop
    @rtype: job ID
938 ea03467c Iustin Pop
    @return: the job ID of the newly created job
939 ea03467c Iustin Pop
    @raise errors.JobQueueDrainError: if the job is marked for draining
940 c3f0a12f Iustin Pop

941 c3f0a12f Iustin Pop
    """
942 686d7433 Iustin Pop
    if self._IsQueueMarkedDrain():
943 686d7433 Iustin Pop
      raise errors.JobQueueDrainError()
944 f87b405e Michael Hanselmann
945 f87b405e Michael Hanselmann
    # Check job queue size
946 f87b405e Michael Hanselmann
    size = len(self._ListJobFiles())
947 f87b405e Michael Hanselmann
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
948 f87b405e Michael Hanselmann
      # TODO: Autoarchive jobs. Make sure it's not done on every job
949 f87b405e Michael Hanselmann
      # submission, though.
950 f87b405e Michael Hanselmann
      #size = ...
951 f87b405e Michael Hanselmann
      pass
952 f87b405e Michael Hanselmann
953 f87b405e Michael Hanselmann
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
954 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
955 f87b405e Michael Hanselmann
956 f1da30e6 Michael Hanselmann
    # Get job identifier
957 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
958 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
959 f1da30e6 Michael Hanselmann
960 f1da30e6 Michael Hanselmann
    # Write to disk
961 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
962 f1da30e6 Michael Hanselmann
963 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
964 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
965 ac0930b9 Iustin Pop
966 85f03e0d Michael Hanselmann
    # Add to worker pool
967 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
968 85f03e0d Michael Hanselmann
969 85f03e0d Michael Hanselmann
    return job.id
970 f1da30e6 Michael Hanselmann
971 db37da70 Michael Hanselmann
  @_RequireOpenQueue
972 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
973 ea03467c Iustin Pop
    """Update a job's on disk storage.
974 ea03467c Iustin Pop

975 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
976 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
977 ea03467c Iustin Pop
    nodes.
978 ea03467c Iustin Pop

979 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
980 ea03467c Iustin Pop
    @param job: the changed job
981 ea03467c Iustin Pop

982 ea03467c Iustin Pop
    """
983 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
984 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
985 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
986 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
987 ac0930b9 Iustin Pop
988 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
989 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
990 dfe57c22 Michael Hanselmann
991 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
992 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
993 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
994 5c735209 Iustin Pop
                        timeout):
995 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
996 6c5a7090 Michael Hanselmann

997 6c5a7090 Michael Hanselmann
    @type job_id: string
998 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
999 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1000 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1001 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1002 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1003 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1004 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1005 5c735209 Iustin Pop
    @type timeout: float
1006 5c735209 Iustin Pop
    @param timeout: maximum time to wait
1007 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1008 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1009 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1010 ea03467c Iustin Pop

1011 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1012 ea03467c Iustin Pop
        we instead return a special value,
1013 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1014 ea03467c Iustin Pop
        as such by the clients
1015 6c5a7090 Michael Hanselmann

1016 6c5a7090 Michael Hanselmann
    """
1017 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
1018 5c735209 Iustin Pop
    end_time = time.time() + timeout
1019 dfe57c22 Michael Hanselmann
    while True:
1020 5c735209 Iustin Pop
      delta_time = end_time - time.time()
1021 5c735209 Iustin Pop
      if delta_time < 0:
1022 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
1023 5c735209 Iustin Pop
1024 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1025 6c5a7090 Michael Hanselmann
      if not job:
1026 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
1027 6c5a7090 Michael Hanselmann
        break
1028 dfe57c22 Michael Hanselmann
1029 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
1030 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
1031 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
1032 dfe57c22 Michael Hanselmann
1033 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
1034 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
1035 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
1036 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
1037 dfe57c22 Michael Hanselmann
      # significantly different.
1038 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1039 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1040 dfe57c22 Michael Hanselmann
1041 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
1042 e92376d7 Iustin Pop
                        constants.JOB_STATUS_RUNNING,
1043 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
1044 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
1045 6c5a7090 Michael Hanselmann
        # no changes.
1046 dfe57c22 Michael Hanselmann
        break
1047 dfe57c22 Michael Hanselmann
1048 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
1049 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
1050 6c5a7090 Michael Hanselmann
        break
1051 6c5a7090 Michael Hanselmann
1052 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
1053 6c5a7090 Michael Hanselmann
1054 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
1055 5c735209 Iustin Pop
      job.change.wait(delta_time)
1056 dfe57c22 Michael Hanselmann
1057 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
1058 dfe57c22 Michael Hanselmann
1059 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
1060 dfe57c22 Michael Hanselmann
1061 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1062 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1063 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1064 188c5e0a Michael Hanselmann
    """Cancels a job.
1065 188c5e0a Michael Hanselmann

1066 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1067 ea03467c Iustin Pop

1068 188c5e0a Michael Hanselmann
    @type job_id: string
1069 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1070 188c5e0a Michael Hanselmann

1071 188c5e0a Michael Hanselmann
    """
1072 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1073 188c5e0a Michael Hanselmann
1074 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1075 188c5e0a Michael Hanselmann
    if not job:
1076 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1077 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1078 fbf0262f Michael Hanselmann
1079 fbf0262f Michael Hanselmann
    job_status = job.CalcStatus()
1080 188c5e0a Michael Hanselmann
1081 fbf0262f Michael Hanselmann
    if job_status not in (constants.JOB_STATUS_QUEUED,
1082 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK):
1083 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
1084 fbf0262f Michael Hanselmann
      return (False, "Job %s is no longer in the queue" % job.id)
1085 fbf0262f Michael Hanselmann
1086 fbf0262f Michael Hanselmann
    if job_status == constants.JOB_STATUS_QUEUED:
1087 fbf0262f Michael Hanselmann
      self.CancelJobUnlocked(job)
1088 fbf0262f Michael Hanselmann
      return (True, "Job %s canceled" % job.id)
1089 188c5e0a Michael Hanselmann
1090 fbf0262f Michael Hanselmann
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1091 fbf0262f Michael Hanselmann
      # The worker will notice the new status and cancel the job
1092 fbf0262f Michael Hanselmann
      try:
1093 fbf0262f Michael Hanselmann
        for op in job.ops:
1094 fbf0262f Michael Hanselmann
          op.status = constants.OP_STATUS_CANCELING
1095 fbf0262f Michael Hanselmann
      finally:
1096 fbf0262f Michael Hanselmann
        self.UpdateJobUnlocked(job)
1097 fbf0262f Michael Hanselmann
      return (True, "Job %s will be canceled" % job.id)
1098 fbf0262f Michael Hanselmann
1099 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1100 fbf0262f Michael Hanselmann
  def CancelJobUnlocked(self, job):
1101 fbf0262f Michael Hanselmann
    """Marks a job as canceled.
1102 fbf0262f Michael Hanselmann

1103 fbf0262f Michael Hanselmann
    """
1104 85f03e0d Michael Hanselmann
    try:
1105 85f03e0d Michael Hanselmann
      for op in job.ops:
1106 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1107 fbf0262f Michael Hanselmann
        op.result = "Job canceled by request"
1108 85f03e0d Michael Hanselmann
    finally:
1109 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
1110 188c5e0a Michael Hanselmann
1111 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1112 07cd723a Iustin Pop
  def _ArchiveJobUnlocked(self, job_id):
1113 c609f802 Michael Hanselmann
    """Archives a job.
1114 c609f802 Michael Hanselmann

1115 c609f802 Michael Hanselmann
    @type job_id: string
1116 c41eea6e Iustin Pop
    @param job_id: the ID of job to be archived
1117 c609f802 Michael Hanselmann

1118 c609f802 Michael Hanselmann
    """
1119 07cd723a Iustin Pop
    logging.info("Archiving job %s", job_id)
1120 c609f802 Michael Hanselmann
1121 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1122 c609f802 Michael Hanselmann
    if not job:
1123 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1124 c609f802 Michael Hanselmann
      return
1125 c609f802 Michael Hanselmann
1126 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1127 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_SUCCESS,
1128 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_ERROR):
1129 85f03e0d Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
1130 c609f802 Michael Hanselmann
      return
1131 c609f802 Michael Hanselmann
1132 5685c1a5 Michael Hanselmann
    old = self._GetJobPath(job.id)
1133 5685c1a5 Michael Hanselmann
    new = self._GetArchivedJobPath(job.id)
1134 c609f802 Michael Hanselmann
1135 5685c1a5 Michael Hanselmann
    self._RenameFileUnlocked(old, new)
1136 c609f802 Michael Hanselmann
1137 5685c1a5 Michael Hanselmann
    logging.debug("Successfully archived job %s", job.id)
1138 f1da30e6 Michael Hanselmann
1139 07cd723a Iustin Pop
  @utils.LockedMethod
1140 07cd723a Iustin Pop
  @_RequireOpenQueue
1141 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1142 07cd723a Iustin Pop
    """Archives a job.
1143 07cd723a Iustin Pop

1144 ea03467c Iustin Pop
    This is just a wrapper over L{_ArchiveJobUnlocked}.
1145 ea03467c Iustin Pop

1146 07cd723a Iustin Pop
    @type job_id: string
1147 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1148 07cd723a Iustin Pop

1149 07cd723a Iustin Pop
    """
1150 07cd723a Iustin Pop
    return self._ArchiveJobUnlocked(job_id)
1151 07cd723a Iustin Pop
1152 07cd723a Iustin Pop
  @utils.LockedMethod
1153 07cd723a Iustin Pop
  @_RequireOpenQueue
1154 07cd723a Iustin Pop
  def AutoArchiveJobs(self, age):
1155 07cd723a Iustin Pop
    """Archives all jobs based on age.
1156 07cd723a Iustin Pop

1157 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1158 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1159 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1160 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1161 07cd723a Iustin Pop

1162 07cd723a Iustin Pop
    @type age: int
1163 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1164 07cd723a Iustin Pop

1165 07cd723a Iustin Pop
    """
1166 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1167 07cd723a Iustin Pop
1168 07cd723a Iustin Pop
    now = time.time()
1169 07cd723a Iustin Pop
    for jid in self._GetJobIDsUnlocked(archived=False):
1170 07cd723a Iustin Pop
      job = self._LoadJobUnlocked(jid)
1171 07cd723a Iustin Pop
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
1172 07cd723a Iustin Pop
                                  constants.OP_STATUS_ERROR,
1173 07cd723a Iustin Pop
                                  constants.OP_STATUS_CANCELED):
1174 07cd723a Iustin Pop
        continue
1175 07cd723a Iustin Pop
      if job.end_timestamp is None:
1176 07cd723a Iustin Pop
        if job.start_timestamp is None:
1177 07cd723a Iustin Pop
          job_age = job.received_timestamp
1178 07cd723a Iustin Pop
        else:
1179 07cd723a Iustin Pop
          job_age = job.start_timestamp
1180 07cd723a Iustin Pop
      else:
1181 07cd723a Iustin Pop
        job_age = job.end_timestamp
1182 07cd723a Iustin Pop
1183 07cd723a Iustin Pop
      if age == -1 or now - job_age[0] > age:
1184 07cd723a Iustin Pop
        self._ArchiveJobUnlocked(jid)
1185 07cd723a Iustin Pop
1186 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
1187 ea03467c Iustin Pop
    """Returns information about a job.
1188 ea03467c Iustin Pop

1189 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1190 ea03467c Iustin Pop
    @param job: the job which we query
1191 ea03467c Iustin Pop
    @type fields: list
1192 ea03467c Iustin Pop
    @param fields: names of fields to return
1193 ea03467c Iustin Pop
    @rtype: list
1194 ea03467c Iustin Pop
    @return: list with one element for each field
1195 ea03467c Iustin Pop
    @raise errors.OpExecError: when an invalid field
1196 ea03467c Iustin Pop
        has been passed
1197 ea03467c Iustin Pop

1198 ea03467c Iustin Pop
    """
1199 e2715f69 Michael Hanselmann
    row = []
1200 e2715f69 Michael Hanselmann
    for fname in fields:
1201 e2715f69 Michael Hanselmann
      if fname == "id":
1202 e2715f69 Michael Hanselmann
        row.append(job.id)
1203 e2715f69 Michael Hanselmann
      elif fname == "status":
1204 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
1205 af30b2fd Michael Hanselmann
      elif fname == "ops":
1206 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
1207 af30b2fd Michael Hanselmann
      elif fname == "opresult":
1208 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
1209 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
1210 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
1211 5b23c34c Iustin Pop
      elif fname == "oplog":
1212 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
1213 c56ec146 Iustin Pop
      elif fname == "opstart":
1214 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
1215 c56ec146 Iustin Pop
      elif fname == "opend":
1216 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
1217 c56ec146 Iustin Pop
      elif fname == "received_ts":
1218 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
1219 c56ec146 Iustin Pop
      elif fname == "start_ts":
1220 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
1221 c56ec146 Iustin Pop
      elif fname == "end_ts":
1222 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
1223 60dd1473 Iustin Pop
      elif fname == "summary":
1224 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
1225 e2715f69 Michael Hanselmann
      else:
1226 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1227 e2715f69 Michael Hanselmann
    return row
1228 e2715f69 Michael Hanselmann
1229 85f03e0d Michael Hanselmann
  @utils.LockedMethod
1230 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1231 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1232 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1233 e2715f69 Michael Hanselmann

1234 ea03467c Iustin Pop
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1235 ea03467c Iustin Pop
    processing for each job.
1236 ea03467c Iustin Pop

1237 ea03467c Iustin Pop
    @type job_ids: list
1238 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1239 ea03467c Iustin Pop
    @type fields: list
1240 ea03467c Iustin Pop
    @param fields: names of fields to return
1241 ea03467c Iustin Pop
    @rtype: list
1242 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1243 ea03467c Iustin Pop
        the requested fields
1244 e2715f69 Michael Hanselmann

1245 e2715f69 Michael Hanselmann
    """
1246 85f03e0d Michael Hanselmann
    jobs = []
1247 e2715f69 Michael Hanselmann
1248 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
1249 85f03e0d Michael Hanselmann
      if job is None:
1250 85f03e0d Michael Hanselmann
        jobs.append(None)
1251 85f03e0d Michael Hanselmann
      else:
1252 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1253 e2715f69 Michael Hanselmann
1254 85f03e0d Michael Hanselmann
    return jobs
1255 e2715f69 Michael Hanselmann
1256 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1257 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1258 e2715f69 Michael Hanselmann
  def Shutdown(self):
1259 e2715f69 Michael Hanselmann
    """Stops the job queue.
1260 e2715f69 Michael Hanselmann

1261 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1262 ea03467c Iustin Pop

1263 e2715f69 Michael Hanselmann
    """
1264 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1265 85f03e0d Michael Hanselmann
1266 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
1267 04ab05ce Michael Hanselmann
    self._queue_lock = None