Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 23f06b2b

History | View | Annotate | Download (37.9 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 e2715f69 Michael Hanselmann
import threading
35 f1da30e6 Michael Hanselmann
import errno
36 f1da30e6 Michael Hanselmann
import re
37 f1048938 Iustin Pop
import time
38 5685c1a5 Michael Hanselmann
import weakref
39 498ae1cc Iustin Pop
40 e2715f69 Michael Hanselmann
from ganeti import constants
41 f1da30e6 Michael Hanselmann
from ganeti import serializer
42 e2715f69 Michael Hanselmann
from ganeti import workerpool
43 f1da30e6 Michael Hanselmann
from ganeti import opcodes
44 7a1ecaed Iustin Pop
from ganeti import errors
45 e2715f69 Michael Hanselmann
from ganeti import mcpu
46 7996a135 Iustin Pop
from ganeti import utils
47 04ab05ce Michael Hanselmann
from ganeti import jstore
48 c3f0a12f Iustin Pop
from ganeti import rpc
49 e2715f69 Michael Hanselmann
50 fbf0262f Michael Hanselmann
51 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
52 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53 e2715f69 Michael Hanselmann
54 498ae1cc Iustin Pop
55 9728ae5d Iustin Pop
class CancelJob(Exception):
56 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
57 fbf0262f Michael Hanselmann

58 fbf0262f Michael Hanselmann
  """
59 fbf0262f Michael Hanselmann
60 fbf0262f Michael Hanselmann
61 70552c46 Michael Hanselmann
def TimeStampNow():
62 ea03467c Iustin Pop
  """Returns the current timestamp.
63 ea03467c Iustin Pop

64 ea03467c Iustin Pop
  @rtype: tuple
65 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
66 ea03467c Iustin Pop

67 ea03467c Iustin Pop
  """
68 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
69 70552c46 Michael Hanselmann
70 70552c46 Michael Hanselmann
71 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
72 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
73 e2715f69 Michael Hanselmann

74 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
75 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
76 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
77 ea03467c Iustin Pop
  @ivar status: the current status
78 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
79 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
80 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
81 f1048938 Iustin Pop

82 e2715f69 Michael Hanselmann
  """
83 85f03e0d Michael Hanselmann
  def __init__(self, op):
84 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
85 ea03467c Iustin Pop

86 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
87 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
88 ea03467c Iustin Pop

89 ea03467c Iustin Pop
    """
90 85f03e0d Michael Hanselmann
    self.input = op
91 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
92 85f03e0d Michael Hanselmann
    self.result = None
93 85f03e0d Michael Hanselmann
    self.log = []
94 70552c46 Michael Hanselmann
    self.start_timestamp = None
95 70552c46 Michael Hanselmann
    self.end_timestamp = None
96 f1da30e6 Michael Hanselmann
97 f1da30e6 Michael Hanselmann
  @classmethod
98 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
99 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
100 ea03467c Iustin Pop

101 ea03467c Iustin Pop
    @type state: dict
102 ea03467c Iustin Pop
    @param state: the serialized state
103 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
104 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
105 ea03467c Iustin Pop

106 ea03467c Iustin Pop
    """
107 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
108 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
109 85f03e0d Michael Hanselmann
    obj.status = state["status"]
110 85f03e0d Michael Hanselmann
    obj.result = state["result"]
111 85f03e0d Michael Hanselmann
    obj.log = state["log"]
112 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
113 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
114 f1da30e6 Michael Hanselmann
    return obj
115 f1da30e6 Michael Hanselmann
116 f1da30e6 Michael Hanselmann
  def Serialize(self):
117 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
118 ea03467c Iustin Pop

119 ea03467c Iustin Pop
    @rtype: dict
120 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
121 ea03467c Iustin Pop

122 ea03467c Iustin Pop
    """
123 6c5a7090 Michael Hanselmann
    return {
124 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
125 6c5a7090 Michael Hanselmann
      "status": self.status,
126 6c5a7090 Michael Hanselmann
      "result": self.result,
127 6c5a7090 Michael Hanselmann
      "log": self.log,
128 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
129 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
130 6c5a7090 Michael Hanselmann
      }
131 f1048938 Iustin Pop
132 e2715f69 Michael Hanselmann
133 e2715f69 Michael Hanselmann
class _QueuedJob(object):
134 e2715f69 Michael Hanselmann
  """In-memory job representation.
135 e2715f69 Michael Hanselmann

136 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
137 ea03467c Iustin Pop
  be taken care of by users of this class.
138 ea03467c Iustin Pop

139 ea03467c Iustin Pop
  @type queue: L{JobQueue}
140 ea03467c Iustin Pop
  @ivar queue: the parent queue
141 ea03467c Iustin Pop
  @ivar id: the job ID
142 ea03467c Iustin Pop
  @type ops: list
143 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
144 ea03467c Iustin Pop
  @type run_op_index: int
145 ea03467c Iustin Pop
  @ivar run_op_index: the currently executing opcode, or -1 if
146 ea03467c Iustin Pop
      we didn't yet start executing
147 ea03467c Iustin Pop
  @type log_serial: int
148 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
149 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
150 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
151 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
152 ea03467c Iustin Pop
  @ivar change: a Condition variable we use for waiting for job changes
153 e2715f69 Michael Hanselmann

154 e2715f69 Michael Hanselmann
  """
155 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
156 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
157 ea03467c Iustin Pop

158 ea03467c Iustin Pop
    @type queue: L{JobQueue}
159 ea03467c Iustin Pop
    @param queue: our parent queue
160 ea03467c Iustin Pop
    @type job_id: job_id
161 ea03467c Iustin Pop
    @param job_id: our job id
162 ea03467c Iustin Pop
    @type ops: list
163 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
164 ea03467c Iustin Pop
        in _QueuedOpCodes
165 ea03467c Iustin Pop

166 ea03467c Iustin Pop
    """
167 e2715f69 Michael Hanselmann
    if not ops:
168 ea03467c Iustin Pop
      # TODO: use a better exception
169 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
170 e2715f69 Michael Hanselmann
171 85f03e0d Michael Hanselmann
    self.queue = queue
172 f1da30e6 Michael Hanselmann
    self.id = job_id
173 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
174 85f03e0d Michael Hanselmann
    self.run_op_index = -1
175 6c5a7090 Michael Hanselmann
    self.log_serial = 0
176 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
177 c56ec146 Iustin Pop
    self.start_timestamp = None
178 c56ec146 Iustin Pop
    self.end_timestamp = None
179 6c5a7090 Michael Hanselmann
180 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
181 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
182 f1da30e6 Michael Hanselmann
183 f1da30e6 Michael Hanselmann
  @classmethod
184 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
185 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
186 ea03467c Iustin Pop

187 ea03467c Iustin Pop
    @type queue: L{JobQueue}
188 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
189 ea03467c Iustin Pop
    @type state: dict
190 ea03467c Iustin Pop
    @param state: the serialized state
191 ea03467c Iustin Pop
    @rtype: _JobQueue
192 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
193 ea03467c Iustin Pop

194 ea03467c Iustin Pop
    """
195 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
196 85f03e0d Michael Hanselmann
    obj.queue = queue
197 85f03e0d Michael Hanselmann
    obj.id = state["id"]
198 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
199 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
200 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
201 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
202 6c5a7090 Michael Hanselmann
203 6c5a7090 Michael Hanselmann
    obj.ops = []
204 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
205 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
206 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
207 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
208 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
209 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
210 6c5a7090 Michael Hanselmann
211 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
212 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
213 6c5a7090 Michael Hanselmann
214 f1da30e6 Michael Hanselmann
    return obj
215 f1da30e6 Michael Hanselmann
216 f1da30e6 Michael Hanselmann
  def Serialize(self):
217 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
218 ea03467c Iustin Pop

219 ea03467c Iustin Pop
    @rtype: dict
220 ea03467c Iustin Pop
    @return: the serialized state
221 ea03467c Iustin Pop

222 ea03467c Iustin Pop
    """
223 f1da30e6 Michael Hanselmann
    return {
224 f1da30e6 Michael Hanselmann
      "id": self.id,
225 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
226 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
227 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
228 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
229 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
230 f1da30e6 Michael Hanselmann
      }
231 f1da30e6 Michael Hanselmann
232 85f03e0d Michael Hanselmann
  def CalcStatus(self):
233 ea03467c Iustin Pop
    """Compute the status of this job.
234 ea03467c Iustin Pop

235 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
236 ea03467c Iustin Pop
    based on their status, computes the job status.
237 ea03467c Iustin Pop

238 ea03467c Iustin Pop
    The algorithm is:
239 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
240 ea03467c Iustin Pop
        status will be the same
241 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
242 ea03467c Iustin Pop
          - waitlock
243 fbf0262f Michael Hanselmann
          - canceling
244 ea03467c Iustin Pop
          - running
245 ea03467c Iustin Pop

246 ea03467c Iustin Pop
        will determine the job status
247 ea03467c Iustin Pop

248 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
249 ea03467c Iustin Pop
        and the job status will be the same
250 ea03467c Iustin Pop

251 ea03467c Iustin Pop
    @return: the job status
252 ea03467c Iustin Pop

253 ea03467c Iustin Pop
    """
254 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
255 e2715f69 Michael Hanselmann
256 e2715f69 Michael Hanselmann
    all_success = True
257 85f03e0d Michael Hanselmann
    for op in self.ops:
258 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
259 e2715f69 Michael Hanselmann
        continue
260 e2715f69 Michael Hanselmann
261 e2715f69 Michael Hanselmann
      all_success = False
262 e2715f69 Michael Hanselmann
263 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
264 e2715f69 Michael Hanselmann
        pass
265 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
266 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
267 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
268 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
269 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
270 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
271 fbf0262f Michael Hanselmann
        break
272 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
273 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
274 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
275 f1da30e6 Michael Hanselmann
        break
276 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
277 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
278 4cb1d919 Michael Hanselmann
        break
279 e2715f69 Michael Hanselmann
280 e2715f69 Michael Hanselmann
    if all_success:
281 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
282 e2715f69 Michael Hanselmann
283 e2715f69 Michael Hanselmann
    return status
284 e2715f69 Michael Hanselmann
285 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
286 ea03467c Iustin Pop
    """Selectively returns the log entries.
287 ea03467c Iustin Pop

288 ea03467c Iustin Pop
    @type newer_than: None or int
289 ea03467c Iustin Pop
    @param newer_than: if this is None, return all log enties,
290 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
291 ea03467c Iustin Pop
        than this value
292 ea03467c Iustin Pop
    @rtype: list
293 ea03467c Iustin Pop
    @return: the list of the log entries selected
294 ea03467c Iustin Pop

295 ea03467c Iustin Pop
    """
296 6c5a7090 Michael Hanselmann
    if newer_than is None:
297 6c5a7090 Michael Hanselmann
      serial = -1
298 6c5a7090 Michael Hanselmann
    else:
299 6c5a7090 Michael Hanselmann
      serial = newer_than
300 6c5a7090 Michael Hanselmann
301 6c5a7090 Michael Hanselmann
    entries = []
302 6c5a7090 Michael Hanselmann
    for op in self.ops:
303 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
304 6c5a7090 Michael Hanselmann
305 6c5a7090 Michael Hanselmann
    return entries
306 6c5a7090 Michael Hanselmann
307 f1048938 Iustin Pop
308 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
309 ea03467c Iustin Pop
  """The actual job workers.
310 ea03467c Iustin Pop

311 ea03467c Iustin Pop
  """
312 e92376d7 Iustin Pop
  def _NotifyStart(self):
313 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
314 e92376d7 Iustin Pop

315 e92376d7 Iustin Pop
    This is called from the mcpu code as a notifier function, when the
316 e92376d7 Iustin Pop
    LU is finally about to start the Exec() method. Of course, to have
317 e92376d7 Iustin Pop
    end-user visible results, the opcode must be initially (before
318 e92376d7 Iustin Pop
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
319 e92376d7 Iustin Pop

320 e92376d7 Iustin Pop
    """
321 e92376d7 Iustin Pop
    assert self.queue, "Queue attribute is missing"
322 e92376d7 Iustin Pop
    assert self.opcode, "Opcode attribute is missing"
323 e92376d7 Iustin Pop
324 e92376d7 Iustin Pop
    self.queue.acquire()
325 e92376d7 Iustin Pop
    try:
326 fbf0262f Michael Hanselmann
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
327 fbf0262f Michael Hanselmann
                                    constants.OP_STATUS_CANCELING)
328 fbf0262f Michael Hanselmann
329 fbf0262f Michael Hanselmann
      # Cancel here if we were asked to
330 fbf0262f Michael Hanselmann
      if self.opcode.status == constants.OP_STATUS_CANCELING:
331 fbf0262f Michael Hanselmann
        raise CancelJob()
332 fbf0262f Michael Hanselmann
333 e92376d7 Iustin Pop
      self.opcode.status = constants.OP_STATUS_RUNNING
334 e92376d7 Iustin Pop
    finally:
335 e92376d7 Iustin Pop
      self.queue.release()
336 e92376d7 Iustin Pop
337 85f03e0d Michael Hanselmann
  def RunTask(self, job):
338 e2715f69 Michael Hanselmann
    """Job executor.
339 e2715f69 Michael Hanselmann

340 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
341 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
342 e2715f69 Michael Hanselmann

343 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
344 ea03467c Iustin Pop
    @param job: the job to be processed
345 ea03467c Iustin Pop

346 e2715f69 Michael Hanselmann
    """
347 d21d09d6 Iustin Pop
    logging.info("Worker %s processing job %s",
348 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
349 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
350 e92376d7 Iustin Pop
    self.queue = queue = job.queue
351 e2715f69 Michael Hanselmann
    try:
352 85f03e0d Michael Hanselmann
      try:
353 85f03e0d Michael Hanselmann
        count = len(job.ops)
354 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
355 d21d09d6 Iustin Pop
          op_summary = op.input.Summary()
356 85f03e0d Michael Hanselmann
          try:
357 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
358 d21d09d6 Iustin Pop
                         op_summary)
359 85f03e0d Michael Hanselmann
360 85f03e0d Michael Hanselmann
            queue.acquire()
361 85f03e0d Michael Hanselmann
            try:
362 df0fb067 Iustin Pop
              if op.status == constants.OP_STATUS_CANCELED:
363 df0fb067 Iustin Pop
                raise CancelJob()
364 fbf0262f Michael Hanselmann
              assert op.status == constants.OP_STATUS_QUEUED
365 85f03e0d Michael Hanselmann
              job.run_op_index = idx
366 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
367 85f03e0d Michael Hanselmann
              op.result = None
368 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
369 c56ec146 Iustin Pop
              if idx == 0: # first opcode
370 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
371 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
372 85f03e0d Michael Hanselmann
373 38206f3c Iustin Pop
              input_opcode = op.input
374 85f03e0d Michael Hanselmann
            finally:
375 85f03e0d Michael Hanselmann
              queue.release()
376 85f03e0d Michael Hanselmann
377 dfe57c22 Michael Hanselmann
            def _Log(*args):
378 6c5a7090 Michael Hanselmann
              """Append a log entry.
379 6c5a7090 Michael Hanselmann

380 6c5a7090 Michael Hanselmann
              """
381 6c5a7090 Michael Hanselmann
              assert len(args) < 3
382 6c5a7090 Michael Hanselmann
383 6c5a7090 Michael Hanselmann
              if len(args) == 1:
384 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
385 6c5a7090 Michael Hanselmann
                log_msg = args[0]
386 6c5a7090 Michael Hanselmann
              else:
387 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
388 6c5a7090 Michael Hanselmann
389 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
390 6c5a7090 Michael Hanselmann
              # precision.
391 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
392 dfe57c22 Michael Hanselmann
393 6c5a7090 Michael Hanselmann
              queue.acquire()
394 dfe57c22 Michael Hanselmann
              try:
395 6c5a7090 Michael Hanselmann
                job.log_serial += 1
396 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
397 6c5a7090 Michael Hanselmann
398 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
399 dfe57c22 Michael Hanselmann
              finally:
400 6c5a7090 Michael Hanselmann
                queue.release()
401 dfe57c22 Michael Hanselmann
402 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
403 e92376d7 Iustin Pop
            self.opcode = op
404 e92376d7 Iustin Pop
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
405 85f03e0d Michael Hanselmann
406 85f03e0d Michael Hanselmann
            queue.acquire()
407 85f03e0d Michael Hanselmann
            try:
408 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
409 85f03e0d Michael Hanselmann
              op.result = result
410 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
411 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
412 85f03e0d Michael Hanselmann
            finally:
413 85f03e0d Michael Hanselmann
              queue.release()
414 85f03e0d Michael Hanselmann
415 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Successfully finished opcode %s",
416 d21d09d6 Iustin Pop
                         idx + 1, count, op_summary)
417 fbf0262f Michael Hanselmann
          except CancelJob:
418 fbf0262f Michael Hanselmann
            # Will be handled further up
419 fbf0262f Michael Hanselmann
            raise
420 85f03e0d Michael Hanselmann
          except Exception, err:
421 85f03e0d Michael Hanselmann
            queue.acquire()
422 85f03e0d Michael Hanselmann
            try:
423 85f03e0d Michael Hanselmann
              try:
424 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
425 85f03e0d Michael Hanselmann
                op.result = str(err)
426 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
427 0f6be82a Iustin Pop
                logging.info("Op %s/%s: Error in opcode %s: %s",
428 0f6be82a Iustin Pop
                             idx + 1, count, op_summary, err)
429 85f03e0d Michael Hanselmann
              finally:
430 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
431 85f03e0d Michael Hanselmann
            finally:
432 85f03e0d Michael Hanselmann
              queue.release()
433 85f03e0d Michael Hanselmann
            raise
434 85f03e0d Michael Hanselmann
435 fbf0262f Michael Hanselmann
      except CancelJob:
436 fbf0262f Michael Hanselmann
        queue.acquire()
437 fbf0262f Michael Hanselmann
        try:
438 fbf0262f Michael Hanselmann
          queue.CancelJobUnlocked(job)
439 fbf0262f Michael Hanselmann
        finally:
440 fbf0262f Michael Hanselmann
          queue.release()
441 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
442 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
443 85f03e0d Michael Hanselmann
      except:
444 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
445 e2715f69 Michael Hanselmann
    finally:
446 85f03e0d Michael Hanselmann
      queue.acquire()
447 85f03e0d Michael Hanselmann
      try:
448 65548ed5 Michael Hanselmann
        try:
449 65548ed5 Michael Hanselmann
          job.run_op_idx = -1
450 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
451 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
452 65548ed5 Michael Hanselmann
        finally:
453 65548ed5 Michael Hanselmann
          job_id = job.id
454 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
455 85f03e0d Michael Hanselmann
      finally:
456 85f03e0d Michael Hanselmann
        queue.release()
457 d21d09d6 Iustin Pop
      logging.info("Worker %s finished job %s, status = %s",
458 d21d09d6 Iustin Pop
                   self.worker_id, job_id, status)
459 e2715f69 Michael Hanselmann
460 e2715f69 Michael Hanselmann
461 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
462 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
463 ea03467c Iustin Pop

464 ea03467c Iustin Pop
  """
465 5bdce580 Michael Hanselmann
  def __init__(self, queue):
466 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
467 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
468 5bdce580 Michael Hanselmann
    self.queue = queue
469 e2715f69 Michael Hanselmann
470 e2715f69 Michael Hanselmann
471 85f03e0d Michael Hanselmann
class JobQueue(object):
472 ea03467c Iustin Pop
  """Quue used to manaage the jobs.
473 ea03467c Iustin Pop

474 ea03467c Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
475 ea03467c Iustin Pop

476 ea03467c Iustin Pop
  """
477 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
478 f1da30e6 Michael Hanselmann
479 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
480 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
481 db37da70 Michael Hanselmann

482 ea03467c Iustin Pop
    This function should be used for all 'public' functions. That is,
483 ea03467c Iustin Pop
    functions usually called from other classes.
484 db37da70 Michael Hanselmann

485 ea03467c Iustin Pop
    @warning: Use this decorator only after utils.LockedMethod!
486 db37da70 Michael Hanselmann

487 ea03467c Iustin Pop
    Example::
488 db37da70 Michael Hanselmann
      @utils.LockedMethod
489 db37da70 Michael Hanselmann
      @_RequireOpenQueue
490 db37da70 Michael Hanselmann
      def Example(self):
491 db37da70 Michael Hanselmann
        pass
492 db37da70 Michael Hanselmann

493 db37da70 Michael Hanselmann
    """
494 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
495 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
496 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
497 db37da70 Michael Hanselmann
    return wrapper
498 db37da70 Michael Hanselmann
499 85f03e0d Michael Hanselmann
  def __init__(self, context):
500 ea03467c Iustin Pop
    """Constructor for JobQueue.
501 ea03467c Iustin Pop

502 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
503 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
504 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
505 ea03467c Iustin Pop
    running).
506 ea03467c Iustin Pop

507 ea03467c Iustin Pop
    @type context: GanetiContext
508 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
509 ea03467c Iustin Pop
        data and other ganeti objects
510 ea03467c Iustin Pop

511 ea03467c Iustin Pop
    """
512 5bdce580 Michael Hanselmann
    self.context = context
513 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
514 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
515 f1da30e6 Michael Hanselmann
516 85f03e0d Michael Hanselmann
    # Locking
517 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
518 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
519 85f03e0d Michael Hanselmann
    self.release = self._lock.release
520 85f03e0d Michael Hanselmann
521 04ab05ce Michael Hanselmann
    # Initialize
522 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
523 f1da30e6 Michael Hanselmann
524 04ab05ce Michael Hanselmann
    # Read serial file
525 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
526 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
527 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
528 c4beba1c Iustin Pop
529 23752136 Michael Hanselmann
    # Get initial list of nodes
530 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
531 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
532 59303563 Iustin Pop
                       if n.master_candidate)
533 8e00939c Michael Hanselmann
534 8e00939c Michael Hanselmann
    # Remove master node
535 8e00939c Michael Hanselmann
    try:
536 99aabbed Iustin Pop
      del self._nodes[self._my_hostname]
537 33987705 Iustin Pop
    except KeyError:
538 8e00939c Michael Hanselmann
      pass
539 23752136 Michael Hanselmann
540 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
541 23752136 Michael Hanselmann
542 85f03e0d Michael Hanselmann
    # Setup worker pool
543 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
544 85f03e0d Michael Hanselmann
    try:
545 16714921 Michael Hanselmann
      # We need to lock here because WorkerPool.AddTask() may start a job while
546 16714921 Michael Hanselmann
      # we're still doing our work.
547 16714921 Michael Hanselmann
      self.acquire()
548 16714921 Michael Hanselmann
      try:
549 711b5124 Michael Hanselmann
        logging.info("Inspecting job queue")
550 711b5124 Michael Hanselmann
551 711b5124 Michael Hanselmann
        all_job_ids = self._GetJobIDsUnlocked()
552 b7cb9024 Michael Hanselmann
        jobs_count = len(all_job_ids)
553 711b5124 Michael Hanselmann
        lastinfo = time.time()
554 711b5124 Michael Hanselmann
        for idx, job_id in enumerate(all_job_ids):
555 711b5124 Michael Hanselmann
          # Give an update every 1000 jobs or 10 seconds
556 b7cb9024 Michael Hanselmann
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
557 b7cb9024 Michael Hanselmann
              idx == (jobs_count - 1)):
558 711b5124 Michael Hanselmann
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
559 b7cb9024 Michael Hanselmann
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
560 711b5124 Michael Hanselmann
            lastinfo = time.time()
561 711b5124 Michael Hanselmann
562 711b5124 Michael Hanselmann
          job = self._LoadJobUnlocked(job_id)
563 711b5124 Michael Hanselmann
564 16714921 Michael Hanselmann
          # a failure in loading the job can cause 'None' to be returned
565 16714921 Michael Hanselmann
          if job is None:
566 16714921 Michael Hanselmann
            continue
567 94ed59a5 Iustin Pop
568 16714921 Michael Hanselmann
          status = job.CalcStatus()
569 85f03e0d Michael Hanselmann
570 16714921 Michael Hanselmann
          if status in (constants.JOB_STATUS_QUEUED, ):
571 16714921 Michael Hanselmann
            self._wpool.AddTask(job)
572 85f03e0d Michael Hanselmann
573 16714921 Michael Hanselmann
          elif status in (constants.JOB_STATUS_RUNNING,
574 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK,
575 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_CANCELING):
576 16714921 Michael Hanselmann
            logging.warning("Unfinished job %s found: %s", job.id, job)
577 16714921 Michael Hanselmann
            try:
578 16714921 Michael Hanselmann
              for op in job.ops:
579 16714921 Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
580 16714921 Michael Hanselmann
                op.result = "Unclean master daemon shutdown"
581 16714921 Michael Hanselmann
            finally:
582 16714921 Michael Hanselmann
              self.UpdateJobUnlocked(job)
583 711b5124 Michael Hanselmann
584 711b5124 Michael Hanselmann
        logging.info("Job queue inspection finished")
585 16714921 Michael Hanselmann
      finally:
586 16714921 Michael Hanselmann
        self.release()
587 16714921 Michael Hanselmann
    except:
588 16714921 Michael Hanselmann
      self._wpool.TerminateWorkers()
589 16714921 Michael Hanselmann
      raise
590 85f03e0d Michael Hanselmann
591 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
592 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
593 99aabbed Iustin Pop
  def AddNode(self, node):
594 99aabbed Iustin Pop
    """Register a new node with the queue.
595 99aabbed Iustin Pop

596 99aabbed Iustin Pop
    @type node: L{objects.Node}
597 99aabbed Iustin Pop
    @param node: the node object to be added
598 99aabbed Iustin Pop

599 99aabbed Iustin Pop
    """
600 99aabbed Iustin Pop
    node_name = node.name
601 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
602 23752136 Michael Hanselmann
603 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
604 a3811745 Michael Hanselmann
    rpc.RpcRunner.call_jobqueue_purge(node_name)
605 23752136 Michael Hanselmann
606 59303563 Iustin Pop
    if not node.master_candidate:
607 59303563 Iustin Pop
      # remove if existing, ignoring errors
608 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
609 59303563 Iustin Pop
      # and skip the replication of the job ids
610 59303563 Iustin Pop
      return
611 59303563 Iustin Pop
612 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
613 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
614 23752136 Michael Hanselmann
615 d2e03a33 Michael Hanselmann
    # Upload current serial file
616 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
617 d2e03a33 Michael Hanselmann
618 d2e03a33 Michael Hanselmann
    for file_name in files:
619 9f774ee8 Michael Hanselmann
      # Read file content
620 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
621 9f774ee8 Michael Hanselmann
      try:
622 9f774ee8 Michael Hanselmann
        content = fd.read()
623 9f774ee8 Michael Hanselmann
      finally:
624 9f774ee8 Michael Hanselmann
        fd.close()
625 9f774ee8 Michael Hanselmann
626 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
627 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
628 a3811745 Michael Hanselmann
                                                  file_name, content)
629 d2e03a33 Michael Hanselmann
      if not result[node_name]:
630 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
631 d2e03a33 Michael Hanselmann
632 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
633 d2e03a33 Michael Hanselmann
634 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
635 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
636 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
637 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
638 ea03467c Iustin Pop

639 ea03467c Iustin Pop
    @type node_name: str
640 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
641 ea03467c Iustin Pop

642 ea03467c Iustin Pop
    """
643 23752136 Michael Hanselmann
    try:
644 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
645 99aabbed Iustin Pop
      del self._nodes[node_name]
646 d2e03a33 Michael Hanselmann
    except KeyError:
647 23752136 Michael Hanselmann
      pass
648 23752136 Michael Hanselmann
649 e74798c1 Michael Hanselmann
  def _CheckRpcResult(self, result, nodes, failmsg):
650 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
651 ea03467c Iustin Pop

652 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
653 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
654 ea03467c Iustin Pop
    log the case when more than half of the nodes failes.
655 ea03467c Iustin Pop

656 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
657 ea03467c Iustin Pop
    @type nodes: list
658 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
659 ea03467c Iustin Pop
    @type failmsg: str
660 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
661 ea03467c Iustin Pop

662 ea03467c Iustin Pop
    """
663 e74798c1 Michael Hanselmann
    failed = []
664 e74798c1 Michael Hanselmann
    success = []
665 e74798c1 Michael Hanselmann
666 e74798c1 Michael Hanselmann
    for node in nodes:
667 e74798c1 Michael Hanselmann
      if result[node]:
668 e74798c1 Michael Hanselmann
        success.append(node)
669 e74798c1 Michael Hanselmann
      else:
670 e74798c1 Michael Hanselmann
        failed.append(node)
671 e74798c1 Michael Hanselmann
672 e74798c1 Michael Hanselmann
    if failed:
673 e74798c1 Michael Hanselmann
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
674 e74798c1 Michael Hanselmann
675 e74798c1 Michael Hanselmann
    # +1 for the master node
676 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
677 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
678 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
679 e74798c1 Michael Hanselmann
680 99aabbed Iustin Pop
  def _GetNodeIp(self):
681 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
682 99aabbed Iustin Pop

683 ea03467c Iustin Pop
    @rtype: (list, list)
684 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
685 ea03467c Iustin Pop
        names and the second one with the node addresses
686 ea03467c Iustin Pop

687 99aabbed Iustin Pop
    """
688 99aabbed Iustin Pop
    name_list = self._nodes.keys()
689 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
690 99aabbed Iustin Pop
    return name_list, addr_list
691 99aabbed Iustin Pop
692 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
693 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
694 8e00939c Michael Hanselmann

695 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
696 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
697 ea03467c Iustin Pop

698 ea03467c Iustin Pop
    @type file_name: str
699 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
700 ea03467c Iustin Pop
    @type data: str
701 ea03467c Iustin Pop
    @param data: the new contents of the file
702 ea03467c Iustin Pop

703 8e00939c Michael Hanselmann
    """
704 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
705 8e00939c Michael Hanselmann
706 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
707 a3811745 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
708 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
709 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
710 23752136 Michael Hanselmann
711 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
712 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
713 ea03467c Iustin Pop

714 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
715 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
716 ea03467c Iustin Pop

717 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
718 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
719 ea03467c Iustin Pop

720 ea03467c Iustin Pop
    """
721 dd875d32 Michael Hanselmann
    # Rename them locally
722 d7fd1f28 Michael Hanselmann
    for old, new in rename:
723 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
724 abc1f2ce Michael Hanselmann
725 dd875d32 Michael Hanselmann
    # ... and on all nodes
726 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
727 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
728 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
729 abc1f2ce Michael Hanselmann
730 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
731 ea03467c Iustin Pop
    """Convert a job ID to string format.
732 ea03467c Iustin Pop

733 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
734 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
735 ea03467c Iustin Pop
    abstract this change.
736 ea03467c Iustin Pop

737 ea03467c Iustin Pop
    @type job_id: int or long
738 ea03467c Iustin Pop
    @param job_id: the numeric job id
739 ea03467c Iustin Pop
    @rtype: str
740 ea03467c Iustin Pop
    @return: the formatted job id
741 ea03467c Iustin Pop

742 ea03467c Iustin Pop
    """
743 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
744 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
745 85f03e0d Michael Hanselmann
    if job_id < 0:
746 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
747 85f03e0d Michael Hanselmann
748 85f03e0d Michael Hanselmann
    return str(job_id)
749 85f03e0d Michael Hanselmann
750 58b22b6e Michael Hanselmann
  @classmethod
751 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
752 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
753 58b22b6e Michael Hanselmann

754 58b22b6e Michael Hanselmann
    @type job_id: str
755 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
756 58b22b6e Michael Hanselmann
    @rtype: str
757 58b22b6e Michael Hanselmann
    @return: Directory name
758 58b22b6e Michael Hanselmann

759 58b22b6e Michael Hanselmann
    """
760 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
761 58b22b6e Michael Hanselmann
762 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
763 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
764 f1da30e6 Michael Hanselmann

765 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
766 f1da30e6 Michael Hanselmann

767 ea03467c Iustin Pop
    @rtype: str
768 ea03467c Iustin Pop
    @return: a string representing the job identifier.
769 f1da30e6 Michael Hanselmann

770 f1da30e6 Michael Hanselmann
    """
771 f1da30e6 Michael Hanselmann
    # New number
772 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
773 f1da30e6 Michael Hanselmann
774 f1da30e6 Michael Hanselmann
    # Write to file
775 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
776 23752136 Michael Hanselmann
                                        "%s\n" % serial)
777 f1da30e6 Michael Hanselmann
778 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
779 f1da30e6 Michael Hanselmann
    self._last_serial = serial
780 f1da30e6 Michael Hanselmann
781 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
782 f1da30e6 Michael Hanselmann
783 85f03e0d Michael Hanselmann
  @staticmethod
784 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
785 ea03467c Iustin Pop
    """Returns the job file for a given job id.
786 ea03467c Iustin Pop

787 ea03467c Iustin Pop
    @type job_id: str
788 ea03467c Iustin Pop
    @param job_id: the job identifier
789 ea03467c Iustin Pop
    @rtype: str
790 ea03467c Iustin Pop
    @return: the path to the job file
791 ea03467c Iustin Pop

792 ea03467c Iustin Pop
    """
793 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
794 f1da30e6 Michael Hanselmann
795 58b22b6e Michael Hanselmann
  @classmethod
796 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
797 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
798 ea03467c Iustin Pop

799 ea03467c Iustin Pop
    @type job_id: str
800 ea03467c Iustin Pop
    @param job_id: the job identifier
801 ea03467c Iustin Pop
    @rtype: str
802 ea03467c Iustin Pop
    @return: the path to the archived job file
803 ea03467c Iustin Pop

804 ea03467c Iustin Pop
    """
805 58b22b6e Michael Hanselmann
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
806 58b22b6e Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
807 0cb94105 Michael Hanselmann
808 85f03e0d Michael Hanselmann
  @classmethod
809 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
810 ea03467c Iustin Pop
    """Extract the job id from a filename.
811 ea03467c Iustin Pop

812 ea03467c Iustin Pop
    @type name: str
813 ea03467c Iustin Pop
    @param name: the job filename
814 ea03467c Iustin Pop
    @rtype: job id or None
815 ea03467c Iustin Pop
    @return: the job id corresponding to the given filename,
816 ea03467c Iustin Pop
        or None if the filename does not represent a valid
817 ea03467c Iustin Pop
        job file
818 ea03467c Iustin Pop

819 ea03467c Iustin Pop
    """
820 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
821 fae737ac Michael Hanselmann
    if m:
822 fae737ac Michael Hanselmann
      return m.group(1)
823 fae737ac Michael Hanselmann
    else:
824 fae737ac Michael Hanselmann
      return None
825 fae737ac Michael Hanselmann
826 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
827 911a495b Iustin Pop
    """Return all known job IDs.
828 911a495b Iustin Pop

829 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
830 911a495b Iustin Pop
    included. Currently this argument is unused.
831 911a495b Iustin Pop

832 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
833 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
834 ac0930b9 Iustin Pop
    extra IDs).
835 ac0930b9 Iustin Pop

836 ea03467c Iustin Pop
    @rtype: list
837 ea03467c Iustin Pop
    @return: the list of job IDs
838 ea03467c Iustin Pop

839 911a495b Iustin Pop
    """
840 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
841 3b87986e Iustin Pop
    jlist = utils.NiceSort(jlist)
842 f0d874fe Iustin Pop
    return jlist
843 911a495b Iustin Pop
844 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
845 ea03467c Iustin Pop
    """Returns the list of current job files.
846 ea03467c Iustin Pop

847 ea03467c Iustin Pop
    @rtype: list
848 ea03467c Iustin Pop
    @return: the list of job file names
849 ea03467c Iustin Pop

850 ea03467c Iustin Pop
    """
851 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
852 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
853 f1da30e6 Michael Hanselmann
854 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
855 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
856 ea03467c Iustin Pop

857 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
858 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
859 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
860 ea03467c Iustin Pop

861 ea03467c Iustin Pop
    @param job_id: the job id
862 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
863 ea03467c Iustin Pop
    @return: either None or the job object
864 ea03467c Iustin Pop

865 ea03467c Iustin Pop
    """
866 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
867 5685c1a5 Michael Hanselmann
    if job:
868 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
869 5685c1a5 Michael Hanselmann
      return job
870 ac0930b9 Iustin Pop
871 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
872 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
873 f1da30e6 Michael Hanselmann
    try:
874 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
875 f1da30e6 Michael Hanselmann
    except IOError, err:
876 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
877 f1da30e6 Michael Hanselmann
        return None
878 f1da30e6 Michael Hanselmann
      raise
879 f1da30e6 Michael Hanselmann
    try:
880 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
881 f1da30e6 Michael Hanselmann
    finally:
882 f1da30e6 Michael Hanselmann
      fd.close()
883 f1da30e6 Michael Hanselmann
884 94ed59a5 Iustin Pop
    try:
885 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
886 94ed59a5 Iustin Pop
    except Exception, err:
887 94ed59a5 Iustin Pop
      new_path = self._GetArchivedJobPath(job_id)
888 94ed59a5 Iustin Pop
      if filepath == new_path:
889 94ed59a5 Iustin Pop
        # job already archived (future case)
890 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s", job_id)
891 94ed59a5 Iustin Pop
      else:
892 94ed59a5 Iustin Pop
        # non-archived case
893 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s, will archive.", job_id)
894 d7fd1f28 Michael Hanselmann
        self._RenameFilesUnlocked([(filepath, new_path)])
895 94ed59a5 Iustin Pop
      return None
896 94ed59a5 Iustin Pop
897 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
898 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
899 ac0930b9 Iustin Pop
    return job
900 f1da30e6 Michael Hanselmann
901 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
902 ea03467c Iustin Pop
    """Return a list of jobs based on their IDs.
903 ea03467c Iustin Pop

904 ea03467c Iustin Pop
    @type job_ids: list
905 ea03467c Iustin Pop
    @param job_ids: either an empty list (meaning all jobs),
906 ea03467c Iustin Pop
        or a list of job IDs
907 ea03467c Iustin Pop
    @rtype: list
908 ea03467c Iustin Pop
    @return: the list of job objects
909 ea03467c Iustin Pop

910 ea03467c Iustin Pop
    """
911 911a495b Iustin Pop
    if not job_ids:
912 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
913 f1da30e6 Michael Hanselmann
914 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
915 f1da30e6 Michael Hanselmann
916 686d7433 Iustin Pop
  @staticmethod
917 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
918 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
919 686d7433 Iustin Pop

920 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
921 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
922 686d7433 Iustin Pop

923 ea03467c Iustin Pop
    @rtype: boolean
924 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
925 ea03467c Iustin Pop

926 686d7433 Iustin Pop
    """
927 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
928 686d7433 Iustin Pop
929 3ccafd0e Iustin Pop
  @staticmethod
930 3ccafd0e Iustin Pop
  def SetDrainFlag(drain_flag):
931 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
932 3ccafd0e Iustin Pop

933 3ccafd0e Iustin Pop
    This is similar to the function L{backend.JobQueueSetDrainFlag},
934 3ccafd0e Iustin Pop
    and in the future we might merge them.
935 3ccafd0e Iustin Pop

936 ea03467c Iustin Pop
    @type drain_flag: boolean
937 ea03467c Iustin Pop
    @param drain_flag: wheter to set or unset the drain flag
938 ea03467c Iustin Pop

939 3ccafd0e Iustin Pop
    """
940 3ccafd0e Iustin Pop
    if drain_flag:
941 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
942 3ccafd0e Iustin Pop
    else:
943 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
944 3ccafd0e Iustin Pop
    return True
945 3ccafd0e Iustin Pop
946 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
947 db37da70 Michael Hanselmann
  @_RequireOpenQueue
948 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
949 85f03e0d Michael Hanselmann
    """Create and store a new job.
950 f1da30e6 Michael Hanselmann

951 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
952 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
953 c3f0a12f Iustin Pop

954 c3f0a12f Iustin Pop
    @type ops: list
955 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
956 ea03467c Iustin Pop
    @rtype: job ID
957 ea03467c Iustin Pop
    @return: the job ID of the newly created job
958 ea03467c Iustin Pop
    @raise errors.JobQueueDrainError: if the job is marked for draining
959 c3f0a12f Iustin Pop

960 c3f0a12f Iustin Pop
    """
961 686d7433 Iustin Pop
    if self._IsQueueMarkedDrain():
962 686d7433 Iustin Pop
      raise errors.JobQueueDrainError()
963 f87b405e Michael Hanselmann
964 f87b405e Michael Hanselmann
    # Check job queue size
965 f87b405e Michael Hanselmann
    size = len(self._ListJobFiles())
966 f87b405e Michael Hanselmann
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
967 f87b405e Michael Hanselmann
      # TODO: Autoarchive jobs. Make sure it's not done on every job
968 f87b405e Michael Hanselmann
      # submission, though.
969 f87b405e Michael Hanselmann
      #size = ...
970 f87b405e Michael Hanselmann
      pass
971 f87b405e Michael Hanselmann
972 f87b405e Michael Hanselmann
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
973 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
974 f87b405e Michael Hanselmann
975 f1da30e6 Michael Hanselmann
    # Get job identifier
976 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
977 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
978 f1da30e6 Michael Hanselmann
979 f1da30e6 Michael Hanselmann
    # Write to disk
980 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
981 f1da30e6 Michael Hanselmann
982 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
983 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
984 ac0930b9 Iustin Pop
985 85f03e0d Michael Hanselmann
    # Add to worker pool
986 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
987 85f03e0d Michael Hanselmann
988 85f03e0d Michael Hanselmann
    return job.id
989 f1da30e6 Michael Hanselmann
990 db37da70 Michael Hanselmann
  @_RequireOpenQueue
991 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
992 ea03467c Iustin Pop
    """Update a job's on disk storage.
993 ea03467c Iustin Pop

994 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
995 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
996 ea03467c Iustin Pop
    nodes.
997 ea03467c Iustin Pop

998 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
999 ea03467c Iustin Pop
    @param job: the changed job
1000 ea03467c Iustin Pop

1001 ea03467c Iustin Pop
    """
1002 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
1003 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
1004 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
1005 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
1006 ac0930b9 Iustin Pop
1007 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
1008 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
1009 dfe57c22 Michael Hanselmann
1010 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
1011 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
1012 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1013 5c735209 Iustin Pop
                        timeout):
1014 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
1015 6c5a7090 Michael Hanselmann

1016 6c5a7090 Michael Hanselmann
    @type job_id: string
1017 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
1018 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1019 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1020 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1021 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1022 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1023 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1024 5c735209 Iustin Pop
    @type timeout: float
1025 5c735209 Iustin Pop
    @param timeout: maximum time to wait
1026 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1027 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1028 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1029 ea03467c Iustin Pop

1030 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1031 ea03467c Iustin Pop
        we instead return a special value,
1032 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1033 ea03467c Iustin Pop
        as such by the clients
1034 6c5a7090 Michael Hanselmann

1035 6c5a7090 Michael Hanselmann
    """
1036 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
1037 5c735209 Iustin Pop
    end_time = time.time() + timeout
1038 dfe57c22 Michael Hanselmann
    while True:
1039 5c735209 Iustin Pop
      delta_time = end_time - time.time()
1040 5c735209 Iustin Pop
      if delta_time < 0:
1041 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
1042 5c735209 Iustin Pop
1043 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1044 6c5a7090 Michael Hanselmann
      if not job:
1045 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
1046 6c5a7090 Michael Hanselmann
        break
1047 dfe57c22 Michael Hanselmann
1048 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
1049 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
1050 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
1051 dfe57c22 Michael Hanselmann
1052 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
1053 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
1054 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
1055 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
1056 dfe57c22 Michael Hanselmann
      # significantly different.
1057 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1058 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1059 dfe57c22 Michael Hanselmann
1060 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
1061 e92376d7 Iustin Pop
                        constants.JOB_STATUS_RUNNING,
1062 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
1063 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
1064 6c5a7090 Michael Hanselmann
        # no changes.
1065 dfe57c22 Michael Hanselmann
        break
1066 dfe57c22 Michael Hanselmann
1067 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
1068 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
1069 6c5a7090 Michael Hanselmann
        break
1070 6c5a7090 Michael Hanselmann
1071 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
1072 6c5a7090 Michael Hanselmann
1073 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
1074 5c735209 Iustin Pop
      job.change.wait(delta_time)
1075 dfe57c22 Michael Hanselmann
1076 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
1077 dfe57c22 Michael Hanselmann
1078 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
1079 dfe57c22 Michael Hanselmann
1080 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1081 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1082 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1083 188c5e0a Michael Hanselmann
    """Cancels a job.
1084 188c5e0a Michael Hanselmann

1085 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1086 ea03467c Iustin Pop

1087 188c5e0a Michael Hanselmann
    @type job_id: string
1088 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1089 188c5e0a Michael Hanselmann

1090 188c5e0a Michael Hanselmann
    """
1091 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1092 188c5e0a Michael Hanselmann
1093 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1094 188c5e0a Michael Hanselmann
    if not job:
1095 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1096 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1097 fbf0262f Michael Hanselmann
1098 fbf0262f Michael Hanselmann
    job_status = job.CalcStatus()
1099 188c5e0a Michael Hanselmann
1100 fbf0262f Michael Hanselmann
    if job_status not in (constants.JOB_STATUS_QUEUED,
1101 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK):
1102 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
1103 fbf0262f Michael Hanselmann
      return (False, "Job %s is no longer in the queue" % job.id)
1104 fbf0262f Michael Hanselmann
1105 fbf0262f Michael Hanselmann
    if job_status == constants.JOB_STATUS_QUEUED:
1106 fbf0262f Michael Hanselmann
      self.CancelJobUnlocked(job)
1107 fbf0262f Michael Hanselmann
      return (True, "Job %s canceled" % job.id)
1108 188c5e0a Michael Hanselmann
1109 fbf0262f Michael Hanselmann
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1110 fbf0262f Michael Hanselmann
      # The worker will notice the new status and cancel the job
1111 fbf0262f Michael Hanselmann
      try:
1112 fbf0262f Michael Hanselmann
        for op in job.ops:
1113 fbf0262f Michael Hanselmann
          op.status = constants.OP_STATUS_CANCELING
1114 fbf0262f Michael Hanselmann
      finally:
1115 fbf0262f Michael Hanselmann
        self.UpdateJobUnlocked(job)
1116 fbf0262f Michael Hanselmann
      return (True, "Job %s will be canceled" % job.id)
1117 fbf0262f Michael Hanselmann
1118 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1119 fbf0262f Michael Hanselmann
  def CancelJobUnlocked(self, job):
1120 fbf0262f Michael Hanselmann
    """Marks a job as canceled.
1121 fbf0262f Michael Hanselmann

1122 fbf0262f Michael Hanselmann
    """
1123 85f03e0d Michael Hanselmann
    try:
1124 85f03e0d Michael Hanselmann
      for op in job.ops:
1125 df0fb067 Iustin Pop
        op.status = constants.OP_STATUS_CANCELED
1126 fbf0262f Michael Hanselmann
        op.result = "Job canceled by request"
1127 85f03e0d Michael Hanselmann
    finally:
1128 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
1129 188c5e0a Michael Hanselmann
1130 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1131 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
1132 d7fd1f28 Michael Hanselmann
    """Archives jobs.
1133 c609f802 Michael Hanselmann

1134 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
1135 25e7b43f Iustin Pop
    @param jobs: Job objects
1136 d7fd1f28 Michael Hanselmann
    @rtype: int
1137 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
1138 c609f802 Michael Hanselmann

1139 c609f802 Michael Hanselmann
    """
1140 d7fd1f28 Michael Hanselmann
    archive_jobs = []
1141 d7fd1f28 Michael Hanselmann
    rename_files = []
1142 d7fd1f28 Michael Hanselmann
    for job in jobs:
1143 d7fd1f28 Michael Hanselmann
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1144 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_SUCCESS,
1145 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_ERROR):
1146 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
1147 d7fd1f28 Michael Hanselmann
        continue
1148 c609f802 Michael Hanselmann
1149 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
1150 c609f802 Michael Hanselmann
1151 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
1152 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
1153 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
1154 c609f802 Michael Hanselmann
1155 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
1156 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
1157 f1da30e6 Michael Hanselmann
1158 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
1159 d7fd1f28 Michael Hanselmann
                  ", ".join(job.id for job in archive_jobs))
1160 d7fd1f28 Michael Hanselmann
1161 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
1162 78d12585 Michael Hanselmann
1163 07cd723a Iustin Pop
  @utils.LockedMethod
1164 07cd723a Iustin Pop
  @_RequireOpenQueue
1165 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1166 07cd723a Iustin Pop
    """Archives a job.
1167 07cd723a Iustin Pop

1168 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1169 ea03467c Iustin Pop

1170 07cd723a Iustin Pop
    @type job_id: string
1171 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1172 78d12585 Michael Hanselmann
    @rtype: bool
1173 78d12585 Michael Hanselmann
    @return: Whether job was archived
1174 07cd723a Iustin Pop

1175 07cd723a Iustin Pop
    """
1176 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
1177 78d12585 Michael Hanselmann
1178 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1179 78d12585 Michael Hanselmann
    if not job:
1180 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1181 78d12585 Michael Hanselmann
      return False
1182 78d12585 Michael Hanselmann
1183 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
1184 07cd723a Iustin Pop
1185 07cd723a Iustin Pop
  @utils.LockedMethod
1186 07cd723a Iustin Pop
  @_RequireOpenQueue
1187 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
1188 07cd723a Iustin Pop
    """Archives all jobs based on age.
1189 07cd723a Iustin Pop

1190 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1191 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1192 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1193 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1194 07cd723a Iustin Pop

1195 07cd723a Iustin Pop
    @type age: int
1196 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1197 07cd723a Iustin Pop

1198 07cd723a Iustin Pop
    """
1199 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1200 07cd723a Iustin Pop
1201 07cd723a Iustin Pop
    now = time.time()
1202 f8ad5591 Michael Hanselmann
    end_time = now + timeout
1203 f8ad5591 Michael Hanselmann
    archived_count = 0
1204 f8ad5591 Michael Hanselmann
    last_touched = 0
1205 f8ad5591 Michael Hanselmann
1206 f8ad5591 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1207 d7fd1f28 Michael Hanselmann
    pending = []
1208 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1209 f8ad5591 Michael Hanselmann
      last_touched = idx
1210 f8ad5591 Michael Hanselmann
1211 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
1212 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
1213 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
1214 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
1215 f8ad5591 Michael Hanselmann
        break
1216 f8ad5591 Michael Hanselmann
1217 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
1218 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1219 f8ad5591 Michael Hanselmann
      if job:
1220 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
1221 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
1222 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
1223 f8ad5591 Michael Hanselmann
          else:
1224 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
1225 07cd723a Iustin Pop
        else:
1226 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
1227 f8ad5591 Michael Hanselmann
1228 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
1229 d7fd1f28 Michael Hanselmann
          pending.append(job)
1230 d7fd1f28 Michael Hanselmann
1231 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
1232 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
1233 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
1234 d7fd1f28 Michael Hanselmann
            pending = []
1235 f8ad5591 Michael Hanselmann
1236 d7fd1f28 Michael Hanselmann
    if pending:
1237 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
1238 07cd723a Iustin Pop
1239 f8ad5591 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched - 1)
1240 07cd723a Iustin Pop
1241 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
1242 ea03467c Iustin Pop
    """Returns information about a job.
1243 ea03467c Iustin Pop

1244 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1245 ea03467c Iustin Pop
    @param job: the job which we query
1246 ea03467c Iustin Pop
    @type fields: list
1247 ea03467c Iustin Pop
    @param fields: names of fields to return
1248 ea03467c Iustin Pop
    @rtype: list
1249 ea03467c Iustin Pop
    @return: list with one element for each field
1250 ea03467c Iustin Pop
    @raise errors.OpExecError: when an invalid field
1251 ea03467c Iustin Pop
        has been passed
1252 ea03467c Iustin Pop

1253 ea03467c Iustin Pop
    """
1254 e2715f69 Michael Hanselmann
    row = []
1255 e2715f69 Michael Hanselmann
    for fname in fields:
1256 e2715f69 Michael Hanselmann
      if fname == "id":
1257 e2715f69 Michael Hanselmann
        row.append(job.id)
1258 e2715f69 Michael Hanselmann
      elif fname == "status":
1259 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
1260 af30b2fd Michael Hanselmann
      elif fname == "ops":
1261 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
1262 af30b2fd Michael Hanselmann
      elif fname == "opresult":
1263 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
1264 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
1265 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
1266 5b23c34c Iustin Pop
      elif fname == "oplog":
1267 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
1268 c56ec146 Iustin Pop
      elif fname == "opstart":
1269 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
1270 c56ec146 Iustin Pop
      elif fname == "opend":
1271 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
1272 c56ec146 Iustin Pop
      elif fname == "received_ts":
1273 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
1274 c56ec146 Iustin Pop
      elif fname == "start_ts":
1275 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
1276 c56ec146 Iustin Pop
      elif fname == "end_ts":
1277 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
1278 60dd1473 Iustin Pop
      elif fname == "summary":
1279 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
1280 e2715f69 Michael Hanselmann
      else:
1281 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1282 e2715f69 Michael Hanselmann
    return row
1283 e2715f69 Michael Hanselmann
1284 85f03e0d Michael Hanselmann
  @utils.LockedMethod
1285 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1286 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1287 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1288 e2715f69 Michael Hanselmann

1289 ea03467c Iustin Pop
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1290 ea03467c Iustin Pop
    processing for each job.
1291 ea03467c Iustin Pop

1292 ea03467c Iustin Pop
    @type job_ids: list
1293 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1294 ea03467c Iustin Pop
    @type fields: list
1295 ea03467c Iustin Pop
    @param fields: names of fields to return
1296 ea03467c Iustin Pop
    @rtype: list
1297 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1298 ea03467c Iustin Pop
        the requested fields
1299 e2715f69 Michael Hanselmann

1300 e2715f69 Michael Hanselmann
    """
1301 85f03e0d Michael Hanselmann
    jobs = []
1302 e2715f69 Michael Hanselmann
1303 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
1304 85f03e0d Michael Hanselmann
      if job is None:
1305 85f03e0d Michael Hanselmann
        jobs.append(None)
1306 85f03e0d Michael Hanselmann
      else:
1307 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1308 e2715f69 Michael Hanselmann
1309 85f03e0d Michael Hanselmann
    return jobs
1310 e2715f69 Michael Hanselmann
1311 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1312 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1313 e2715f69 Michael Hanselmann
  def Shutdown(self):
1314 e2715f69 Michael Hanselmann
    """Stops the job queue.
1315 e2715f69 Michael Hanselmann

1316 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1317 ea03467c Iustin Pop

1318 e2715f69 Michael Hanselmann
    """
1319 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1320 85f03e0d Michael Hanselmann
1321 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
1322 04ab05ce Michael Hanselmann
    self._queue_lock = None