Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 38e250ba

History | View | Annotate | Download (37.8 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 e2715f69 Michael Hanselmann
import threading
35 f1da30e6 Michael Hanselmann
import errno
36 f1da30e6 Michael Hanselmann
import re
37 f1048938 Iustin Pop
import time
38 5685c1a5 Michael Hanselmann
import weakref
39 498ae1cc Iustin Pop
40 e2715f69 Michael Hanselmann
from ganeti import constants
41 f1da30e6 Michael Hanselmann
from ganeti import serializer
42 e2715f69 Michael Hanselmann
from ganeti import workerpool
43 f1da30e6 Michael Hanselmann
from ganeti import opcodes
44 7a1ecaed Iustin Pop
from ganeti import errors
45 e2715f69 Michael Hanselmann
from ganeti import mcpu
46 7996a135 Iustin Pop
from ganeti import utils
47 04ab05ce Michael Hanselmann
from ganeti import jstore
48 c3f0a12f Iustin Pop
from ganeti import rpc
49 e2715f69 Michael Hanselmann
50 fbf0262f Michael Hanselmann
51 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
52 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53 e2715f69 Michael Hanselmann
54 498ae1cc Iustin Pop
55 9728ae5d Iustin Pop
class CancelJob(Exception):
56 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
57 fbf0262f Michael Hanselmann

58 fbf0262f Michael Hanselmann
  """
59 fbf0262f Michael Hanselmann
60 fbf0262f Michael Hanselmann
61 70552c46 Michael Hanselmann
def TimeStampNow():
62 ea03467c Iustin Pop
  """Returns the current timestamp.
63 ea03467c Iustin Pop

64 ea03467c Iustin Pop
  @rtype: tuple
65 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
66 ea03467c Iustin Pop

67 ea03467c Iustin Pop
  """
68 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
69 70552c46 Michael Hanselmann
70 70552c46 Michael Hanselmann
71 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
72 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
73 e2715f69 Michael Hanselmann

74 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
75 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
76 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
77 ea03467c Iustin Pop
  @ivar status: the current status
78 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
79 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
80 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
81 f1048938 Iustin Pop

82 e2715f69 Michael Hanselmann
  """
83 85f03e0d Michael Hanselmann
  def __init__(self, op):
84 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
85 ea03467c Iustin Pop

86 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
87 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
88 ea03467c Iustin Pop

89 ea03467c Iustin Pop
    """
90 85f03e0d Michael Hanselmann
    self.input = op
91 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
92 85f03e0d Michael Hanselmann
    self.result = None
93 85f03e0d Michael Hanselmann
    self.log = []
94 70552c46 Michael Hanselmann
    self.start_timestamp = None
95 70552c46 Michael Hanselmann
    self.end_timestamp = None
96 f1da30e6 Michael Hanselmann
97 f1da30e6 Michael Hanselmann
  @classmethod
98 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
99 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
100 ea03467c Iustin Pop

101 ea03467c Iustin Pop
    @type state: dict
102 ea03467c Iustin Pop
    @param state: the serialized state
103 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
104 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
105 ea03467c Iustin Pop

106 ea03467c Iustin Pop
    """
107 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
108 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
109 85f03e0d Michael Hanselmann
    obj.status = state["status"]
110 85f03e0d Michael Hanselmann
    obj.result = state["result"]
111 85f03e0d Michael Hanselmann
    obj.log = state["log"]
112 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
113 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
114 f1da30e6 Michael Hanselmann
    return obj
115 f1da30e6 Michael Hanselmann
116 f1da30e6 Michael Hanselmann
  def Serialize(self):
117 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
118 ea03467c Iustin Pop

119 ea03467c Iustin Pop
    @rtype: dict
120 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
121 ea03467c Iustin Pop

122 ea03467c Iustin Pop
    """
123 6c5a7090 Michael Hanselmann
    return {
124 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
125 6c5a7090 Michael Hanselmann
      "status": self.status,
126 6c5a7090 Michael Hanselmann
      "result": self.result,
127 6c5a7090 Michael Hanselmann
      "log": self.log,
128 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
129 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
130 6c5a7090 Michael Hanselmann
      }
131 f1048938 Iustin Pop
132 e2715f69 Michael Hanselmann
133 e2715f69 Michael Hanselmann
class _QueuedJob(object):
134 e2715f69 Michael Hanselmann
  """In-memory job representation.
135 e2715f69 Michael Hanselmann

136 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
137 ea03467c Iustin Pop
  be taken care of by users of this class.
138 ea03467c Iustin Pop

139 ea03467c Iustin Pop
  @type queue: L{JobQueue}
140 ea03467c Iustin Pop
  @ivar queue: the parent queue
141 ea03467c Iustin Pop
  @ivar id: the job ID
142 ea03467c Iustin Pop
  @type ops: list
143 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
144 ea03467c Iustin Pop
  @type run_op_index: int
145 ea03467c Iustin Pop
  @ivar run_op_index: the currently executing opcode, or -1 if
146 ea03467c Iustin Pop
      we didn't yet start executing
147 ea03467c Iustin Pop
  @type log_serial: int
148 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
149 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
150 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
151 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
152 ea03467c Iustin Pop
  @ivar change: a Condition variable we use for waiting for job changes
153 e2715f69 Michael Hanselmann

154 e2715f69 Michael Hanselmann
  """
155 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
156 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
157 ea03467c Iustin Pop

158 ea03467c Iustin Pop
    @type queue: L{JobQueue}
159 ea03467c Iustin Pop
    @param queue: our parent queue
160 ea03467c Iustin Pop
    @type job_id: job_id
161 ea03467c Iustin Pop
    @param job_id: our job id
162 ea03467c Iustin Pop
    @type ops: list
163 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
164 ea03467c Iustin Pop
        in _QueuedOpCodes
165 ea03467c Iustin Pop

166 ea03467c Iustin Pop
    """
167 e2715f69 Michael Hanselmann
    if not ops:
168 ea03467c Iustin Pop
      # TODO: use a better exception
169 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
170 e2715f69 Michael Hanselmann
171 85f03e0d Michael Hanselmann
    self.queue = queue
172 f1da30e6 Michael Hanselmann
    self.id = job_id
173 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
174 85f03e0d Michael Hanselmann
    self.run_op_index = -1
175 6c5a7090 Michael Hanselmann
    self.log_serial = 0
176 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
177 c56ec146 Iustin Pop
    self.start_timestamp = None
178 c56ec146 Iustin Pop
    self.end_timestamp = None
179 6c5a7090 Michael Hanselmann
180 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
181 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
182 f1da30e6 Michael Hanselmann
183 f1da30e6 Michael Hanselmann
  @classmethod
184 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
185 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
186 ea03467c Iustin Pop

187 ea03467c Iustin Pop
    @type queue: L{JobQueue}
188 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
189 ea03467c Iustin Pop
    @type state: dict
190 ea03467c Iustin Pop
    @param state: the serialized state
191 ea03467c Iustin Pop
    @rtype: _JobQueue
192 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
193 ea03467c Iustin Pop

194 ea03467c Iustin Pop
    """
195 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
196 85f03e0d Michael Hanselmann
    obj.queue = queue
197 85f03e0d Michael Hanselmann
    obj.id = state["id"]
198 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
199 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
200 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
201 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
202 6c5a7090 Michael Hanselmann
203 6c5a7090 Michael Hanselmann
    obj.ops = []
204 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
205 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
206 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
207 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
208 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
209 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
210 6c5a7090 Michael Hanselmann
211 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
212 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
213 6c5a7090 Michael Hanselmann
214 f1da30e6 Michael Hanselmann
    return obj
215 f1da30e6 Michael Hanselmann
216 f1da30e6 Michael Hanselmann
  def Serialize(self):
217 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
218 ea03467c Iustin Pop

219 ea03467c Iustin Pop
    @rtype: dict
220 ea03467c Iustin Pop
    @return: the serialized state
221 ea03467c Iustin Pop

222 ea03467c Iustin Pop
    """
223 f1da30e6 Michael Hanselmann
    return {
224 f1da30e6 Michael Hanselmann
      "id": self.id,
225 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
226 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
227 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
228 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
229 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
230 f1da30e6 Michael Hanselmann
      }
231 f1da30e6 Michael Hanselmann
232 85f03e0d Michael Hanselmann
  def CalcStatus(self):
233 ea03467c Iustin Pop
    """Compute the status of this job.
234 ea03467c Iustin Pop

235 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
236 ea03467c Iustin Pop
    based on their status, computes the job status.
237 ea03467c Iustin Pop

238 ea03467c Iustin Pop
    The algorithm is:
239 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
240 ea03467c Iustin Pop
        status will be the same
241 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
242 ea03467c Iustin Pop
          - waitlock
243 fbf0262f Michael Hanselmann
          - canceling
244 ea03467c Iustin Pop
          - running
245 ea03467c Iustin Pop

246 ea03467c Iustin Pop
        will determine the job status
247 ea03467c Iustin Pop

248 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
249 ea03467c Iustin Pop
        and the job status will be the same
250 ea03467c Iustin Pop

251 ea03467c Iustin Pop
    @return: the job status
252 ea03467c Iustin Pop

253 ea03467c Iustin Pop
    """
254 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
255 e2715f69 Michael Hanselmann
256 e2715f69 Michael Hanselmann
    all_success = True
257 85f03e0d Michael Hanselmann
    for op in self.ops:
258 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
259 e2715f69 Michael Hanselmann
        continue
260 e2715f69 Michael Hanselmann
261 e2715f69 Michael Hanselmann
      all_success = False
262 e2715f69 Michael Hanselmann
263 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
264 e2715f69 Michael Hanselmann
        pass
265 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
266 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
267 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
268 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
269 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
270 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
271 fbf0262f Michael Hanselmann
        break
272 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
273 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
274 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
275 f1da30e6 Michael Hanselmann
        break
276 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
277 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
278 4cb1d919 Michael Hanselmann
        break
279 e2715f69 Michael Hanselmann
280 e2715f69 Michael Hanselmann
    if all_success:
281 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
282 e2715f69 Michael Hanselmann
283 e2715f69 Michael Hanselmann
    return status
284 e2715f69 Michael Hanselmann
285 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
286 ea03467c Iustin Pop
    """Selectively returns the log entries.
287 ea03467c Iustin Pop

288 ea03467c Iustin Pop
    @type newer_than: None or int
289 ea03467c Iustin Pop
    @param newer_than: if this is None, return all log enties,
290 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
291 ea03467c Iustin Pop
        than this value
292 ea03467c Iustin Pop
    @rtype: list
293 ea03467c Iustin Pop
    @return: the list of the log entries selected
294 ea03467c Iustin Pop

295 ea03467c Iustin Pop
    """
296 6c5a7090 Michael Hanselmann
    if newer_than is None:
297 6c5a7090 Michael Hanselmann
      serial = -1
298 6c5a7090 Michael Hanselmann
    else:
299 6c5a7090 Michael Hanselmann
      serial = newer_than
300 6c5a7090 Michael Hanselmann
301 6c5a7090 Michael Hanselmann
    entries = []
302 6c5a7090 Michael Hanselmann
    for op in self.ops:
303 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
304 6c5a7090 Michael Hanselmann
305 6c5a7090 Michael Hanselmann
    return entries
306 6c5a7090 Michael Hanselmann
307 f1048938 Iustin Pop
308 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
309 ea03467c Iustin Pop
  """The actual job workers.
310 ea03467c Iustin Pop

311 ea03467c Iustin Pop
  """
312 e92376d7 Iustin Pop
  def _NotifyStart(self):
313 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
314 e92376d7 Iustin Pop

315 e92376d7 Iustin Pop
    This is called from the mcpu code as a notifier function, when the
316 e92376d7 Iustin Pop
    LU is finally about to start the Exec() method. Of course, to have
317 e92376d7 Iustin Pop
    end-user visible results, the opcode must be initially (before
318 e92376d7 Iustin Pop
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
319 e92376d7 Iustin Pop

320 e92376d7 Iustin Pop
    """
321 e92376d7 Iustin Pop
    assert self.queue, "Queue attribute is missing"
322 e92376d7 Iustin Pop
    assert self.opcode, "Opcode attribute is missing"
323 e92376d7 Iustin Pop
324 e92376d7 Iustin Pop
    self.queue.acquire()
325 e92376d7 Iustin Pop
    try:
326 fbf0262f Michael Hanselmann
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
327 fbf0262f Michael Hanselmann
                                    constants.OP_STATUS_CANCELING)
328 fbf0262f Michael Hanselmann
329 fbf0262f Michael Hanselmann
      # Cancel here if we were asked to
330 fbf0262f Michael Hanselmann
      if self.opcode.status == constants.OP_STATUS_CANCELING:
331 fbf0262f Michael Hanselmann
        raise CancelJob()
332 fbf0262f Michael Hanselmann
333 e92376d7 Iustin Pop
      self.opcode.status = constants.OP_STATUS_RUNNING
334 e92376d7 Iustin Pop
    finally:
335 e92376d7 Iustin Pop
      self.queue.release()
336 e92376d7 Iustin Pop
337 85f03e0d Michael Hanselmann
  def RunTask(self, job):
338 e2715f69 Michael Hanselmann
    """Job executor.
339 e2715f69 Michael Hanselmann

340 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
341 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
342 e2715f69 Michael Hanselmann

343 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
344 ea03467c Iustin Pop
    @param job: the job to be processed
345 ea03467c Iustin Pop

346 e2715f69 Michael Hanselmann
    """
347 d21d09d6 Iustin Pop
    logging.info("Worker %s processing job %s",
348 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
349 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
350 e92376d7 Iustin Pop
    self.queue = queue = job.queue
351 e2715f69 Michael Hanselmann
    try:
352 85f03e0d Michael Hanselmann
      try:
353 85f03e0d Michael Hanselmann
        count = len(job.ops)
354 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
355 d21d09d6 Iustin Pop
          op_summary = op.input.Summary()
356 85f03e0d Michael Hanselmann
          try:
357 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
358 d21d09d6 Iustin Pop
                         op_summary)
359 85f03e0d Michael Hanselmann
360 85f03e0d Michael Hanselmann
            queue.acquire()
361 85f03e0d Michael Hanselmann
            try:
362 fbf0262f Michael Hanselmann
              assert op.status == constants.OP_STATUS_QUEUED
363 85f03e0d Michael Hanselmann
              job.run_op_index = idx
364 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
365 85f03e0d Michael Hanselmann
              op.result = None
366 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
367 c56ec146 Iustin Pop
              if idx == 0: # first opcode
368 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
369 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
370 85f03e0d Michael Hanselmann
371 38206f3c Iustin Pop
              input_opcode = op.input
372 85f03e0d Michael Hanselmann
            finally:
373 85f03e0d Michael Hanselmann
              queue.release()
374 85f03e0d Michael Hanselmann
375 dfe57c22 Michael Hanselmann
            def _Log(*args):
376 6c5a7090 Michael Hanselmann
              """Append a log entry.
377 6c5a7090 Michael Hanselmann

378 6c5a7090 Michael Hanselmann
              """
379 6c5a7090 Michael Hanselmann
              assert len(args) < 3
380 6c5a7090 Michael Hanselmann
381 6c5a7090 Michael Hanselmann
              if len(args) == 1:
382 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
383 6c5a7090 Michael Hanselmann
                log_msg = args[0]
384 6c5a7090 Michael Hanselmann
              else:
385 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
386 6c5a7090 Michael Hanselmann
387 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
388 6c5a7090 Michael Hanselmann
              # precision.
389 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
390 dfe57c22 Michael Hanselmann
391 6c5a7090 Michael Hanselmann
              queue.acquire()
392 dfe57c22 Michael Hanselmann
              try:
393 6c5a7090 Michael Hanselmann
                job.log_serial += 1
394 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
395 6c5a7090 Michael Hanselmann
396 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
397 dfe57c22 Michael Hanselmann
              finally:
398 6c5a7090 Michael Hanselmann
                queue.release()
399 dfe57c22 Michael Hanselmann
400 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
401 e92376d7 Iustin Pop
            self.opcode = op
402 e92376d7 Iustin Pop
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
403 85f03e0d Michael Hanselmann
404 85f03e0d Michael Hanselmann
            queue.acquire()
405 85f03e0d Michael Hanselmann
            try:
406 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
407 85f03e0d Michael Hanselmann
              op.result = result
408 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
409 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
410 85f03e0d Michael Hanselmann
            finally:
411 85f03e0d Michael Hanselmann
              queue.release()
412 85f03e0d Michael Hanselmann
413 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Successfully finished opcode %s",
414 d21d09d6 Iustin Pop
                         idx + 1, count, op_summary)
415 fbf0262f Michael Hanselmann
          except CancelJob:
416 fbf0262f Michael Hanselmann
            # Will be handled further up
417 fbf0262f Michael Hanselmann
            raise
418 85f03e0d Michael Hanselmann
          except Exception, err:
419 85f03e0d Michael Hanselmann
            queue.acquire()
420 85f03e0d Michael Hanselmann
            try:
421 85f03e0d Michael Hanselmann
              try:
422 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
423 85f03e0d Michael Hanselmann
                op.result = str(err)
424 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
425 d21d09d6 Iustin Pop
                logging.info("Op %s/%s: Error in opcode %s", idx + 1, count,
426 d21d09d6 Iustin Pop
                             op_summary)
427 85f03e0d Michael Hanselmann
              finally:
428 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
429 85f03e0d Michael Hanselmann
            finally:
430 85f03e0d Michael Hanselmann
              queue.release()
431 85f03e0d Michael Hanselmann
            raise
432 85f03e0d Michael Hanselmann
433 fbf0262f Michael Hanselmann
      except CancelJob:
434 fbf0262f Michael Hanselmann
        queue.acquire()
435 fbf0262f Michael Hanselmann
        try:
436 fbf0262f Michael Hanselmann
          queue.CancelJobUnlocked(job)
437 fbf0262f Michael Hanselmann
        finally:
438 fbf0262f Michael Hanselmann
          queue.release()
439 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
440 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
441 85f03e0d Michael Hanselmann
      except:
442 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
443 e2715f69 Michael Hanselmann
    finally:
444 85f03e0d Michael Hanselmann
      queue.acquire()
445 85f03e0d Michael Hanselmann
      try:
446 65548ed5 Michael Hanselmann
        try:
447 65548ed5 Michael Hanselmann
          job.run_op_idx = -1
448 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
449 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
450 65548ed5 Michael Hanselmann
        finally:
451 65548ed5 Michael Hanselmann
          job_id = job.id
452 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
453 85f03e0d Michael Hanselmann
      finally:
454 85f03e0d Michael Hanselmann
        queue.release()
455 d21d09d6 Iustin Pop
      logging.info("Worker %s finished job %s, status = %s",
456 d21d09d6 Iustin Pop
                   self.worker_id, job_id, status)
457 e2715f69 Michael Hanselmann
458 e2715f69 Michael Hanselmann
459 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
460 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
461 ea03467c Iustin Pop

462 ea03467c Iustin Pop
  """
463 5bdce580 Michael Hanselmann
  def __init__(self, queue):
464 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
465 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
466 5bdce580 Michael Hanselmann
    self.queue = queue
467 e2715f69 Michael Hanselmann
468 e2715f69 Michael Hanselmann
469 85f03e0d Michael Hanselmann
class JobQueue(object):
470 ea03467c Iustin Pop
  """Quue used to manaage the jobs.
471 ea03467c Iustin Pop

472 ea03467c Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
473 ea03467c Iustin Pop

474 ea03467c Iustin Pop
  """
475 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
476 f1da30e6 Michael Hanselmann
477 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
478 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
479 db37da70 Michael Hanselmann

480 ea03467c Iustin Pop
    This function should be used for all 'public' functions. That is,
481 ea03467c Iustin Pop
    functions usually called from other classes.
482 db37da70 Michael Hanselmann

483 ea03467c Iustin Pop
    @warning: Use this decorator only after utils.LockedMethod!
484 db37da70 Michael Hanselmann

485 ea03467c Iustin Pop
    Example::
486 db37da70 Michael Hanselmann
      @utils.LockedMethod
487 db37da70 Michael Hanselmann
      @_RequireOpenQueue
488 db37da70 Michael Hanselmann
      def Example(self):
489 db37da70 Michael Hanselmann
        pass
490 db37da70 Michael Hanselmann

491 db37da70 Michael Hanselmann
    """
492 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
493 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
494 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
495 db37da70 Michael Hanselmann
    return wrapper
496 db37da70 Michael Hanselmann
497 85f03e0d Michael Hanselmann
  def __init__(self, context):
498 ea03467c Iustin Pop
    """Constructor for JobQueue.
499 ea03467c Iustin Pop

500 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
501 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
502 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
503 ea03467c Iustin Pop
    running).
504 ea03467c Iustin Pop

505 ea03467c Iustin Pop
    @type context: GanetiContext
506 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
507 ea03467c Iustin Pop
        data and other ganeti objects
508 ea03467c Iustin Pop

509 ea03467c Iustin Pop
    """
510 5bdce580 Michael Hanselmann
    self.context = context
511 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
512 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
513 f1da30e6 Michael Hanselmann
514 85f03e0d Michael Hanselmann
    # Locking
515 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
516 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
517 85f03e0d Michael Hanselmann
    self.release = self._lock.release
518 85f03e0d Michael Hanselmann
519 04ab05ce Michael Hanselmann
    # Initialize
520 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
521 f1da30e6 Michael Hanselmann
522 04ab05ce Michael Hanselmann
    # Read serial file
523 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
524 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
525 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
526 c4beba1c Iustin Pop
527 23752136 Michael Hanselmann
    # Get initial list of nodes
528 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
529 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
530 59303563 Iustin Pop
                       if n.master_candidate)
531 8e00939c Michael Hanselmann
532 8e00939c Michael Hanselmann
    # Remove master node
533 8e00939c Michael Hanselmann
    try:
534 99aabbed Iustin Pop
      del self._nodes[self._my_hostname]
535 33987705 Iustin Pop
    except KeyError:
536 8e00939c Michael Hanselmann
      pass
537 23752136 Michael Hanselmann
538 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
539 23752136 Michael Hanselmann
540 85f03e0d Michael Hanselmann
    # Setup worker pool
541 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
542 85f03e0d Michael Hanselmann
    try:
543 16714921 Michael Hanselmann
      # We need to lock here because WorkerPool.AddTask() may start a job while
544 16714921 Michael Hanselmann
      # we're still doing our work.
545 16714921 Michael Hanselmann
      self.acquire()
546 16714921 Michael Hanselmann
      try:
547 711b5124 Michael Hanselmann
        logging.info("Inspecting job queue")
548 711b5124 Michael Hanselmann
549 711b5124 Michael Hanselmann
        all_job_ids = self._GetJobIDsUnlocked()
550 b7cb9024 Michael Hanselmann
        jobs_count = len(all_job_ids)
551 711b5124 Michael Hanselmann
        lastinfo = time.time()
552 711b5124 Michael Hanselmann
        for idx, job_id in enumerate(all_job_ids):
553 711b5124 Michael Hanselmann
          # Give an update every 1000 jobs or 10 seconds
554 b7cb9024 Michael Hanselmann
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
555 b7cb9024 Michael Hanselmann
              idx == (jobs_count - 1)):
556 711b5124 Michael Hanselmann
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
557 b7cb9024 Michael Hanselmann
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
558 711b5124 Michael Hanselmann
            lastinfo = time.time()
559 711b5124 Michael Hanselmann
560 711b5124 Michael Hanselmann
          job = self._LoadJobUnlocked(job_id)
561 711b5124 Michael Hanselmann
562 16714921 Michael Hanselmann
          # a failure in loading the job can cause 'None' to be returned
563 16714921 Michael Hanselmann
          if job is None:
564 16714921 Michael Hanselmann
            continue
565 94ed59a5 Iustin Pop
566 16714921 Michael Hanselmann
          status = job.CalcStatus()
567 85f03e0d Michael Hanselmann
568 16714921 Michael Hanselmann
          if status in (constants.JOB_STATUS_QUEUED, ):
569 16714921 Michael Hanselmann
            self._wpool.AddTask(job)
570 85f03e0d Michael Hanselmann
571 16714921 Michael Hanselmann
          elif status in (constants.JOB_STATUS_RUNNING,
572 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK,
573 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_CANCELING):
574 16714921 Michael Hanselmann
            logging.warning("Unfinished job %s found: %s", job.id, job)
575 16714921 Michael Hanselmann
            try:
576 16714921 Michael Hanselmann
              for op in job.ops:
577 16714921 Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
578 16714921 Michael Hanselmann
                op.result = "Unclean master daemon shutdown"
579 16714921 Michael Hanselmann
            finally:
580 16714921 Michael Hanselmann
              self.UpdateJobUnlocked(job)
581 711b5124 Michael Hanselmann
582 711b5124 Michael Hanselmann
        logging.info("Job queue inspection finished")
583 16714921 Michael Hanselmann
      finally:
584 16714921 Michael Hanselmann
        self.release()
585 16714921 Michael Hanselmann
    except:
586 16714921 Michael Hanselmann
      self._wpool.TerminateWorkers()
587 16714921 Michael Hanselmann
      raise
588 85f03e0d Michael Hanselmann
589 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
590 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
591 99aabbed Iustin Pop
  def AddNode(self, node):
592 99aabbed Iustin Pop
    """Register a new node with the queue.
593 99aabbed Iustin Pop

594 99aabbed Iustin Pop
    @type node: L{objects.Node}
595 99aabbed Iustin Pop
    @param node: the node object to be added
596 99aabbed Iustin Pop

597 99aabbed Iustin Pop
    """
598 99aabbed Iustin Pop
    node_name = node.name
599 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
600 23752136 Michael Hanselmann
601 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
602 a3811745 Michael Hanselmann
    rpc.RpcRunner.call_jobqueue_purge(node_name)
603 23752136 Michael Hanselmann
604 59303563 Iustin Pop
    if not node.master_candidate:
605 59303563 Iustin Pop
      # remove if existing, ignoring errors
606 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
607 59303563 Iustin Pop
      # and skip the replication of the job ids
608 59303563 Iustin Pop
      return
609 59303563 Iustin Pop
610 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
611 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
612 23752136 Michael Hanselmann
613 d2e03a33 Michael Hanselmann
    # Upload current serial file
614 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
615 d2e03a33 Michael Hanselmann
616 d2e03a33 Michael Hanselmann
    for file_name in files:
617 9f774ee8 Michael Hanselmann
      # Read file content
618 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
619 9f774ee8 Michael Hanselmann
      try:
620 9f774ee8 Michael Hanselmann
        content = fd.read()
621 9f774ee8 Michael Hanselmann
      finally:
622 9f774ee8 Michael Hanselmann
        fd.close()
623 9f774ee8 Michael Hanselmann
624 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
625 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
626 a3811745 Michael Hanselmann
                                                  file_name, content)
627 d2e03a33 Michael Hanselmann
      if not result[node_name]:
628 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
629 d2e03a33 Michael Hanselmann
630 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
631 d2e03a33 Michael Hanselmann
632 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
633 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
634 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
635 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
636 ea03467c Iustin Pop

637 ea03467c Iustin Pop
    @type node_name: str
638 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
639 ea03467c Iustin Pop

640 ea03467c Iustin Pop
    """
641 23752136 Michael Hanselmann
    try:
642 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
643 99aabbed Iustin Pop
      del self._nodes[node_name]
644 d2e03a33 Michael Hanselmann
    except KeyError:
645 23752136 Michael Hanselmann
      pass
646 23752136 Michael Hanselmann
647 e74798c1 Michael Hanselmann
  def _CheckRpcResult(self, result, nodes, failmsg):
648 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
649 ea03467c Iustin Pop

650 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
651 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
652 ea03467c Iustin Pop
    log the case when more than half of the nodes failes.
653 ea03467c Iustin Pop

654 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
655 ea03467c Iustin Pop
    @type nodes: list
656 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
657 ea03467c Iustin Pop
    @type failmsg: str
658 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
659 ea03467c Iustin Pop

660 ea03467c Iustin Pop
    """
661 e74798c1 Michael Hanselmann
    failed = []
662 e74798c1 Michael Hanselmann
    success = []
663 e74798c1 Michael Hanselmann
664 e74798c1 Michael Hanselmann
    for node in nodes:
665 e74798c1 Michael Hanselmann
      if result[node]:
666 e74798c1 Michael Hanselmann
        success.append(node)
667 e74798c1 Michael Hanselmann
      else:
668 e74798c1 Michael Hanselmann
        failed.append(node)
669 e74798c1 Michael Hanselmann
670 e74798c1 Michael Hanselmann
    if failed:
671 e74798c1 Michael Hanselmann
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
672 e74798c1 Michael Hanselmann
673 e74798c1 Michael Hanselmann
    # +1 for the master node
674 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
675 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
676 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
677 e74798c1 Michael Hanselmann
678 99aabbed Iustin Pop
  def _GetNodeIp(self):
679 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
680 99aabbed Iustin Pop

681 ea03467c Iustin Pop
    @rtype: (list, list)
682 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
683 ea03467c Iustin Pop
        names and the second one with the node addresses
684 ea03467c Iustin Pop

685 99aabbed Iustin Pop
    """
686 99aabbed Iustin Pop
    name_list = self._nodes.keys()
687 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
688 99aabbed Iustin Pop
    return name_list, addr_list
689 99aabbed Iustin Pop
690 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
691 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
692 8e00939c Michael Hanselmann

693 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
694 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
695 ea03467c Iustin Pop

696 ea03467c Iustin Pop
    @type file_name: str
697 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
698 ea03467c Iustin Pop
    @type data: str
699 ea03467c Iustin Pop
    @param data: the new contents of the file
700 ea03467c Iustin Pop

701 8e00939c Michael Hanselmann
    """
702 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
703 8e00939c Michael Hanselmann
704 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
705 a3811745 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
706 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
707 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
708 23752136 Michael Hanselmann
709 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
710 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
711 ea03467c Iustin Pop

712 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
713 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
714 ea03467c Iustin Pop

715 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
716 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
717 ea03467c Iustin Pop

718 ea03467c Iustin Pop
    """
719 dd875d32 Michael Hanselmann
    # Rename them locally
720 d7fd1f28 Michael Hanselmann
    for old, new in rename:
721 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
722 abc1f2ce Michael Hanselmann
723 dd875d32 Michael Hanselmann
    # ... and on all nodes
724 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
725 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
726 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
727 abc1f2ce Michael Hanselmann
728 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
729 ea03467c Iustin Pop
    """Convert a job ID to string format.
730 ea03467c Iustin Pop

731 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
732 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
733 ea03467c Iustin Pop
    abstract this change.
734 ea03467c Iustin Pop

735 ea03467c Iustin Pop
    @type job_id: int or long
736 ea03467c Iustin Pop
    @param job_id: the numeric job id
737 ea03467c Iustin Pop
    @rtype: str
738 ea03467c Iustin Pop
    @return: the formatted job id
739 ea03467c Iustin Pop

740 ea03467c Iustin Pop
    """
741 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
742 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
743 85f03e0d Michael Hanselmann
    if job_id < 0:
744 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
745 85f03e0d Michael Hanselmann
746 85f03e0d Michael Hanselmann
    return str(job_id)
747 85f03e0d Michael Hanselmann
748 58b22b6e Michael Hanselmann
  @classmethod
749 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
750 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
751 58b22b6e Michael Hanselmann

752 58b22b6e Michael Hanselmann
    @type job_id: str
753 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
754 58b22b6e Michael Hanselmann
    @rtype: str
755 58b22b6e Michael Hanselmann
    @return: Directory name
756 58b22b6e Michael Hanselmann

757 58b22b6e Michael Hanselmann
    """
758 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
759 58b22b6e Michael Hanselmann
760 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
761 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
762 f1da30e6 Michael Hanselmann

763 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
764 f1da30e6 Michael Hanselmann

765 ea03467c Iustin Pop
    @rtype: str
766 ea03467c Iustin Pop
    @return: a string representing the job identifier.
767 f1da30e6 Michael Hanselmann

768 f1da30e6 Michael Hanselmann
    """
769 f1da30e6 Michael Hanselmann
    # New number
770 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
771 f1da30e6 Michael Hanselmann
772 f1da30e6 Michael Hanselmann
    # Write to file
773 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
774 23752136 Michael Hanselmann
                                        "%s\n" % serial)
775 f1da30e6 Michael Hanselmann
776 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
777 f1da30e6 Michael Hanselmann
    self._last_serial = serial
778 f1da30e6 Michael Hanselmann
779 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
780 f1da30e6 Michael Hanselmann
781 85f03e0d Michael Hanselmann
  @staticmethod
782 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
783 ea03467c Iustin Pop
    """Returns the job file for a given job id.
784 ea03467c Iustin Pop

785 ea03467c Iustin Pop
    @type job_id: str
786 ea03467c Iustin Pop
    @param job_id: the job identifier
787 ea03467c Iustin Pop
    @rtype: str
788 ea03467c Iustin Pop
    @return: the path to the job file
789 ea03467c Iustin Pop

790 ea03467c Iustin Pop
    """
791 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
792 f1da30e6 Michael Hanselmann
793 58b22b6e Michael Hanselmann
  @classmethod
794 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
795 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
796 ea03467c Iustin Pop

797 ea03467c Iustin Pop
    @type job_id: str
798 ea03467c Iustin Pop
    @param job_id: the job identifier
799 ea03467c Iustin Pop
    @rtype: str
800 ea03467c Iustin Pop
    @return: the path to the archived job file
801 ea03467c Iustin Pop

802 ea03467c Iustin Pop
    """
803 58b22b6e Michael Hanselmann
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
804 58b22b6e Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
805 0cb94105 Michael Hanselmann
806 85f03e0d Michael Hanselmann
  @classmethod
807 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
808 ea03467c Iustin Pop
    """Extract the job id from a filename.
809 ea03467c Iustin Pop

810 ea03467c Iustin Pop
    @type name: str
811 ea03467c Iustin Pop
    @param name: the job filename
812 ea03467c Iustin Pop
    @rtype: job id or None
813 ea03467c Iustin Pop
    @return: the job id corresponding to the given filename,
814 ea03467c Iustin Pop
        or None if the filename does not represent a valid
815 ea03467c Iustin Pop
        job file
816 ea03467c Iustin Pop

817 ea03467c Iustin Pop
    """
818 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
819 fae737ac Michael Hanselmann
    if m:
820 fae737ac Michael Hanselmann
      return m.group(1)
821 fae737ac Michael Hanselmann
    else:
822 fae737ac Michael Hanselmann
      return None
823 fae737ac Michael Hanselmann
824 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
825 911a495b Iustin Pop
    """Return all known job IDs.
826 911a495b Iustin Pop

827 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
828 911a495b Iustin Pop
    included. Currently this argument is unused.
829 911a495b Iustin Pop

830 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
831 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
832 ac0930b9 Iustin Pop
    extra IDs).
833 ac0930b9 Iustin Pop

834 ea03467c Iustin Pop
    @rtype: list
835 ea03467c Iustin Pop
    @return: the list of job IDs
836 ea03467c Iustin Pop

837 911a495b Iustin Pop
    """
838 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
839 3b87986e Iustin Pop
    jlist = utils.NiceSort(jlist)
840 f0d874fe Iustin Pop
    return jlist
841 911a495b Iustin Pop
842 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
843 ea03467c Iustin Pop
    """Returns the list of current job files.
844 ea03467c Iustin Pop

845 ea03467c Iustin Pop
    @rtype: list
846 ea03467c Iustin Pop
    @return: the list of job file names
847 ea03467c Iustin Pop

848 ea03467c Iustin Pop
    """
849 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
850 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
851 f1da30e6 Michael Hanselmann
852 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
853 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
854 ea03467c Iustin Pop

855 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
856 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
857 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
858 ea03467c Iustin Pop

859 ea03467c Iustin Pop
    @param job_id: the job id
860 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
861 ea03467c Iustin Pop
    @return: either None or the job object
862 ea03467c Iustin Pop

863 ea03467c Iustin Pop
    """
864 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
865 5685c1a5 Michael Hanselmann
    if job:
866 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
867 5685c1a5 Michael Hanselmann
      return job
868 ac0930b9 Iustin Pop
869 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
870 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
871 f1da30e6 Michael Hanselmann
    try:
872 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
873 f1da30e6 Michael Hanselmann
    except IOError, err:
874 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
875 f1da30e6 Michael Hanselmann
        return None
876 f1da30e6 Michael Hanselmann
      raise
877 f1da30e6 Michael Hanselmann
    try:
878 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
879 f1da30e6 Michael Hanselmann
    finally:
880 f1da30e6 Michael Hanselmann
      fd.close()
881 f1da30e6 Michael Hanselmann
882 94ed59a5 Iustin Pop
    try:
883 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
884 94ed59a5 Iustin Pop
    except Exception, err:
885 94ed59a5 Iustin Pop
      new_path = self._GetArchivedJobPath(job_id)
886 94ed59a5 Iustin Pop
      if filepath == new_path:
887 94ed59a5 Iustin Pop
        # job already archived (future case)
888 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s", job_id)
889 94ed59a5 Iustin Pop
      else:
890 94ed59a5 Iustin Pop
        # non-archived case
891 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s, will archive.", job_id)
892 d7fd1f28 Michael Hanselmann
        self._RenameFilesUnlocked([(filepath, new_path)])
893 94ed59a5 Iustin Pop
      return None
894 94ed59a5 Iustin Pop
895 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
896 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
897 ac0930b9 Iustin Pop
    return job
898 f1da30e6 Michael Hanselmann
899 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
900 ea03467c Iustin Pop
    """Return a list of jobs based on their IDs.
901 ea03467c Iustin Pop

902 ea03467c Iustin Pop
    @type job_ids: list
903 ea03467c Iustin Pop
    @param job_ids: either an empty list (meaning all jobs),
904 ea03467c Iustin Pop
        or a list of job IDs
905 ea03467c Iustin Pop
    @rtype: list
906 ea03467c Iustin Pop
    @return: the list of job objects
907 ea03467c Iustin Pop

908 ea03467c Iustin Pop
    """
909 911a495b Iustin Pop
    if not job_ids:
910 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
911 f1da30e6 Michael Hanselmann
912 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
913 f1da30e6 Michael Hanselmann
914 686d7433 Iustin Pop
  @staticmethod
915 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
916 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
917 686d7433 Iustin Pop

918 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
919 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
920 686d7433 Iustin Pop

921 ea03467c Iustin Pop
    @rtype: boolean
922 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
923 ea03467c Iustin Pop

924 686d7433 Iustin Pop
    """
925 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
926 686d7433 Iustin Pop
927 3ccafd0e Iustin Pop
  @staticmethod
928 3ccafd0e Iustin Pop
  def SetDrainFlag(drain_flag):
929 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
930 3ccafd0e Iustin Pop

931 3ccafd0e Iustin Pop
    This is similar to the function L{backend.JobQueueSetDrainFlag},
932 3ccafd0e Iustin Pop
    and in the future we might merge them.
933 3ccafd0e Iustin Pop

934 ea03467c Iustin Pop
    @type drain_flag: boolean
935 ea03467c Iustin Pop
    @param drain_flag: wheter to set or unset the drain flag
936 ea03467c Iustin Pop

937 3ccafd0e Iustin Pop
    """
938 3ccafd0e Iustin Pop
    if drain_flag:
939 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
940 3ccafd0e Iustin Pop
    else:
941 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
942 3ccafd0e Iustin Pop
    return True
943 3ccafd0e Iustin Pop
944 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
945 db37da70 Michael Hanselmann
  @_RequireOpenQueue
946 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
947 85f03e0d Michael Hanselmann
    """Create and store a new job.
948 f1da30e6 Michael Hanselmann

949 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
950 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
951 c3f0a12f Iustin Pop

952 c3f0a12f Iustin Pop
    @type ops: list
953 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
954 ea03467c Iustin Pop
    @rtype: job ID
955 ea03467c Iustin Pop
    @return: the job ID of the newly created job
956 ea03467c Iustin Pop
    @raise errors.JobQueueDrainError: if the job is marked for draining
957 c3f0a12f Iustin Pop

958 c3f0a12f Iustin Pop
    """
959 686d7433 Iustin Pop
    if self._IsQueueMarkedDrain():
960 686d7433 Iustin Pop
      raise errors.JobQueueDrainError()
961 f87b405e Michael Hanselmann
962 f87b405e Michael Hanselmann
    # Check job queue size
963 f87b405e Michael Hanselmann
    size = len(self._ListJobFiles())
964 f87b405e Michael Hanselmann
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
965 f87b405e Michael Hanselmann
      # TODO: Autoarchive jobs. Make sure it's not done on every job
966 f87b405e Michael Hanselmann
      # submission, though.
967 f87b405e Michael Hanselmann
      #size = ...
968 f87b405e Michael Hanselmann
      pass
969 f87b405e Michael Hanselmann
970 f87b405e Michael Hanselmann
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
971 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
972 f87b405e Michael Hanselmann
973 f1da30e6 Michael Hanselmann
    # Get job identifier
974 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
975 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
976 f1da30e6 Michael Hanselmann
977 f1da30e6 Michael Hanselmann
    # Write to disk
978 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
979 f1da30e6 Michael Hanselmann
980 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
981 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
982 ac0930b9 Iustin Pop
983 85f03e0d Michael Hanselmann
    # Add to worker pool
984 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
985 85f03e0d Michael Hanselmann
986 85f03e0d Michael Hanselmann
    return job.id
987 f1da30e6 Michael Hanselmann
988 db37da70 Michael Hanselmann
  @_RequireOpenQueue
989 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
990 ea03467c Iustin Pop
    """Update a job's on disk storage.
991 ea03467c Iustin Pop

992 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
993 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
994 ea03467c Iustin Pop
    nodes.
995 ea03467c Iustin Pop

996 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
997 ea03467c Iustin Pop
    @param job: the changed job
998 ea03467c Iustin Pop

999 ea03467c Iustin Pop
    """
1000 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
1001 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
1002 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
1003 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
1004 ac0930b9 Iustin Pop
1005 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
1006 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
1007 dfe57c22 Michael Hanselmann
1008 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
1009 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
1010 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1011 5c735209 Iustin Pop
                        timeout):
1012 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
1013 6c5a7090 Michael Hanselmann

1014 6c5a7090 Michael Hanselmann
    @type job_id: string
1015 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
1016 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1017 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1018 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1019 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1020 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1021 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1022 5c735209 Iustin Pop
    @type timeout: float
1023 5c735209 Iustin Pop
    @param timeout: maximum time to wait
1024 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1025 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1026 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1027 ea03467c Iustin Pop

1028 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1029 ea03467c Iustin Pop
        we instead return a special value,
1030 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1031 ea03467c Iustin Pop
        as such by the clients
1032 6c5a7090 Michael Hanselmann

1033 6c5a7090 Michael Hanselmann
    """
1034 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
1035 5c735209 Iustin Pop
    end_time = time.time() + timeout
1036 dfe57c22 Michael Hanselmann
    while True:
1037 5c735209 Iustin Pop
      delta_time = end_time - time.time()
1038 5c735209 Iustin Pop
      if delta_time < 0:
1039 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
1040 5c735209 Iustin Pop
1041 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1042 6c5a7090 Michael Hanselmann
      if not job:
1043 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
1044 6c5a7090 Michael Hanselmann
        break
1045 dfe57c22 Michael Hanselmann
1046 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
1047 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
1048 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
1049 dfe57c22 Michael Hanselmann
1050 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
1051 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
1052 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
1053 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
1054 dfe57c22 Michael Hanselmann
      # significantly different.
1055 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1056 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1057 dfe57c22 Michael Hanselmann
1058 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
1059 e92376d7 Iustin Pop
                        constants.JOB_STATUS_RUNNING,
1060 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
1061 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
1062 6c5a7090 Michael Hanselmann
        # no changes.
1063 dfe57c22 Michael Hanselmann
        break
1064 dfe57c22 Michael Hanselmann
1065 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
1066 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
1067 6c5a7090 Michael Hanselmann
        break
1068 6c5a7090 Michael Hanselmann
1069 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
1070 6c5a7090 Michael Hanselmann
1071 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
1072 5c735209 Iustin Pop
      job.change.wait(delta_time)
1073 dfe57c22 Michael Hanselmann
1074 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
1075 dfe57c22 Michael Hanselmann
1076 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
1077 dfe57c22 Michael Hanselmann
1078 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1079 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1080 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1081 188c5e0a Michael Hanselmann
    """Cancels a job.
1082 188c5e0a Michael Hanselmann

1083 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1084 ea03467c Iustin Pop

1085 188c5e0a Michael Hanselmann
    @type job_id: string
1086 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1087 188c5e0a Michael Hanselmann

1088 188c5e0a Michael Hanselmann
    """
1089 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1090 188c5e0a Michael Hanselmann
1091 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1092 188c5e0a Michael Hanselmann
    if not job:
1093 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1094 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1095 fbf0262f Michael Hanselmann
1096 fbf0262f Michael Hanselmann
    job_status = job.CalcStatus()
1097 188c5e0a Michael Hanselmann
1098 fbf0262f Michael Hanselmann
    if job_status not in (constants.JOB_STATUS_QUEUED,
1099 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK):
1100 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
1101 fbf0262f Michael Hanselmann
      return (False, "Job %s is no longer in the queue" % job.id)
1102 fbf0262f Michael Hanselmann
1103 fbf0262f Michael Hanselmann
    if job_status == constants.JOB_STATUS_QUEUED:
1104 fbf0262f Michael Hanselmann
      self.CancelJobUnlocked(job)
1105 fbf0262f Michael Hanselmann
      return (True, "Job %s canceled" % job.id)
1106 188c5e0a Michael Hanselmann
1107 fbf0262f Michael Hanselmann
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1108 fbf0262f Michael Hanselmann
      # The worker will notice the new status and cancel the job
1109 fbf0262f Michael Hanselmann
      try:
1110 fbf0262f Michael Hanselmann
        for op in job.ops:
1111 fbf0262f Michael Hanselmann
          op.status = constants.OP_STATUS_CANCELING
1112 fbf0262f Michael Hanselmann
      finally:
1113 fbf0262f Michael Hanselmann
        self.UpdateJobUnlocked(job)
1114 fbf0262f Michael Hanselmann
      return (True, "Job %s will be canceled" % job.id)
1115 fbf0262f Michael Hanselmann
1116 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1117 fbf0262f Michael Hanselmann
  def CancelJobUnlocked(self, job):
1118 fbf0262f Michael Hanselmann
    """Marks a job as canceled.
1119 fbf0262f Michael Hanselmann

1120 fbf0262f Michael Hanselmann
    """
1121 85f03e0d Michael Hanselmann
    try:
1122 85f03e0d Michael Hanselmann
      for op in job.ops:
1123 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1124 fbf0262f Michael Hanselmann
        op.result = "Job canceled by request"
1125 85f03e0d Michael Hanselmann
    finally:
1126 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
1127 188c5e0a Michael Hanselmann
1128 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1129 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
1130 d7fd1f28 Michael Hanselmann
    """Archives jobs.
1131 c609f802 Michael Hanselmann

1132 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
1133 25e7b43f Iustin Pop
    @param jobs: Job objects
1134 d7fd1f28 Michael Hanselmann
    @rtype: int
1135 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
1136 c609f802 Michael Hanselmann

1137 c609f802 Michael Hanselmann
    """
1138 d7fd1f28 Michael Hanselmann
    archive_jobs = []
1139 d7fd1f28 Michael Hanselmann
    rename_files = []
1140 d7fd1f28 Michael Hanselmann
    for job in jobs:
1141 d7fd1f28 Michael Hanselmann
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1142 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_SUCCESS,
1143 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_ERROR):
1144 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
1145 d7fd1f28 Michael Hanselmann
        continue
1146 c609f802 Michael Hanselmann
1147 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
1148 c609f802 Michael Hanselmann
1149 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
1150 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
1151 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
1152 c609f802 Michael Hanselmann
1153 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
1154 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
1155 f1da30e6 Michael Hanselmann
1156 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
1157 d7fd1f28 Michael Hanselmann
                  ", ".join(job.id for job in archive_jobs))
1158 d7fd1f28 Michael Hanselmann
1159 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
1160 78d12585 Michael Hanselmann
1161 07cd723a Iustin Pop
  @utils.LockedMethod
1162 07cd723a Iustin Pop
  @_RequireOpenQueue
1163 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1164 07cd723a Iustin Pop
    """Archives a job.
1165 07cd723a Iustin Pop

1166 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1167 ea03467c Iustin Pop

1168 07cd723a Iustin Pop
    @type job_id: string
1169 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1170 78d12585 Michael Hanselmann
    @rtype: bool
1171 78d12585 Michael Hanselmann
    @return: Whether job was archived
1172 07cd723a Iustin Pop

1173 07cd723a Iustin Pop
    """
1174 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
1175 78d12585 Michael Hanselmann
1176 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1177 78d12585 Michael Hanselmann
    if not job:
1178 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1179 78d12585 Michael Hanselmann
      return False
1180 78d12585 Michael Hanselmann
1181 d7fd1f28 Michael Hanselmann
    return self._ArchiveJobUnlocked([job]) == 1
1182 07cd723a Iustin Pop
1183 07cd723a Iustin Pop
  @utils.LockedMethod
1184 07cd723a Iustin Pop
  @_RequireOpenQueue
1185 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
1186 07cd723a Iustin Pop
    """Archives all jobs based on age.
1187 07cd723a Iustin Pop

1188 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1189 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1190 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1191 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1192 07cd723a Iustin Pop

1193 07cd723a Iustin Pop
    @type age: int
1194 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1195 07cd723a Iustin Pop

1196 07cd723a Iustin Pop
    """
1197 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1198 07cd723a Iustin Pop
1199 07cd723a Iustin Pop
    now = time.time()
1200 f8ad5591 Michael Hanselmann
    end_time = now + timeout
1201 f8ad5591 Michael Hanselmann
    archived_count = 0
1202 f8ad5591 Michael Hanselmann
    last_touched = 0
1203 f8ad5591 Michael Hanselmann
1204 f8ad5591 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1205 d7fd1f28 Michael Hanselmann
    pending = []
1206 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1207 f8ad5591 Michael Hanselmann
      last_touched = idx
1208 f8ad5591 Michael Hanselmann
1209 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
1210 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
1211 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
1212 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
1213 f8ad5591 Michael Hanselmann
        break
1214 f8ad5591 Michael Hanselmann
1215 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
1216 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1217 f8ad5591 Michael Hanselmann
      if job:
1218 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
1219 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
1220 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
1221 f8ad5591 Michael Hanselmann
          else:
1222 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
1223 07cd723a Iustin Pop
        else:
1224 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
1225 f8ad5591 Michael Hanselmann
1226 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
1227 d7fd1f28 Michael Hanselmann
          pending.append(job)
1228 d7fd1f28 Michael Hanselmann
1229 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
1230 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
1231 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
1232 d7fd1f28 Michael Hanselmann
            pending = []
1233 f8ad5591 Michael Hanselmann
1234 d7fd1f28 Michael Hanselmann
    if pending:
1235 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
1236 07cd723a Iustin Pop
1237 f8ad5591 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched - 1)
1238 07cd723a Iustin Pop
1239 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
1240 ea03467c Iustin Pop
    """Returns information about a job.
1241 ea03467c Iustin Pop

1242 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1243 ea03467c Iustin Pop
    @param job: the job which we query
1244 ea03467c Iustin Pop
    @type fields: list
1245 ea03467c Iustin Pop
    @param fields: names of fields to return
1246 ea03467c Iustin Pop
    @rtype: list
1247 ea03467c Iustin Pop
    @return: list with one element for each field
1248 ea03467c Iustin Pop
    @raise errors.OpExecError: when an invalid field
1249 ea03467c Iustin Pop
        has been passed
1250 ea03467c Iustin Pop

1251 ea03467c Iustin Pop
    """
1252 e2715f69 Michael Hanselmann
    row = []
1253 e2715f69 Michael Hanselmann
    for fname in fields:
1254 e2715f69 Michael Hanselmann
      if fname == "id":
1255 e2715f69 Michael Hanselmann
        row.append(job.id)
1256 e2715f69 Michael Hanselmann
      elif fname == "status":
1257 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
1258 af30b2fd Michael Hanselmann
      elif fname == "ops":
1259 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
1260 af30b2fd Michael Hanselmann
      elif fname == "opresult":
1261 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
1262 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
1263 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
1264 5b23c34c Iustin Pop
      elif fname == "oplog":
1265 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
1266 c56ec146 Iustin Pop
      elif fname == "opstart":
1267 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
1268 c56ec146 Iustin Pop
      elif fname == "opend":
1269 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
1270 c56ec146 Iustin Pop
      elif fname == "received_ts":
1271 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
1272 c56ec146 Iustin Pop
      elif fname == "start_ts":
1273 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
1274 c56ec146 Iustin Pop
      elif fname == "end_ts":
1275 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
1276 60dd1473 Iustin Pop
      elif fname == "summary":
1277 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
1278 e2715f69 Michael Hanselmann
      else:
1279 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1280 e2715f69 Michael Hanselmann
    return row
1281 e2715f69 Michael Hanselmann
1282 85f03e0d Michael Hanselmann
  @utils.LockedMethod
1283 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1284 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1285 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1286 e2715f69 Michael Hanselmann

1287 ea03467c Iustin Pop
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1288 ea03467c Iustin Pop
    processing for each job.
1289 ea03467c Iustin Pop

1290 ea03467c Iustin Pop
    @type job_ids: list
1291 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1292 ea03467c Iustin Pop
    @type fields: list
1293 ea03467c Iustin Pop
    @param fields: names of fields to return
1294 ea03467c Iustin Pop
    @rtype: list
1295 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1296 ea03467c Iustin Pop
        the requested fields
1297 e2715f69 Michael Hanselmann

1298 e2715f69 Michael Hanselmann
    """
1299 85f03e0d Michael Hanselmann
    jobs = []
1300 e2715f69 Michael Hanselmann
1301 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
1302 85f03e0d Michael Hanselmann
      if job is None:
1303 85f03e0d Michael Hanselmann
        jobs.append(None)
1304 85f03e0d Michael Hanselmann
      else:
1305 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1306 e2715f69 Michael Hanselmann
1307 85f03e0d Michael Hanselmann
    return jobs
1308 e2715f69 Michael Hanselmann
1309 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1310 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1311 e2715f69 Michael Hanselmann
  def Shutdown(self):
1312 e2715f69 Michael Hanselmann
    """Stops the job queue.
1313 e2715f69 Michael Hanselmann

1314 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1315 ea03467c Iustin Pop

1316 e2715f69 Michael Hanselmann
    """
1317 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1318 85f03e0d Michael Hanselmann
1319 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
1320 04ab05ce Michael Hanselmann
    self._queue_lock = None