Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ f6424741

History | View | Annotate | Download (38.7 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 e2715f69 Michael Hanselmann
import threading
35 f1da30e6 Michael Hanselmann
import errno
36 f1da30e6 Michael Hanselmann
import re
37 f1048938 Iustin Pop
import time
38 5685c1a5 Michael Hanselmann
import weakref
39 498ae1cc Iustin Pop
40 e2715f69 Michael Hanselmann
from ganeti import constants
41 f1da30e6 Michael Hanselmann
from ganeti import serializer
42 e2715f69 Michael Hanselmann
from ganeti import workerpool
43 f1da30e6 Michael Hanselmann
from ganeti import opcodes
44 7a1ecaed Iustin Pop
from ganeti import errors
45 e2715f69 Michael Hanselmann
from ganeti import mcpu
46 7996a135 Iustin Pop
from ganeti import utils
47 04ab05ce Michael Hanselmann
from ganeti import jstore
48 c3f0a12f Iustin Pop
from ganeti import rpc
49 e2715f69 Michael Hanselmann
50 fbf0262f Michael Hanselmann
51 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
52 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53 e2715f69 Michael Hanselmann
54 498ae1cc Iustin Pop
55 9728ae5d Iustin Pop
class CancelJob(Exception):
56 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
57 fbf0262f Michael Hanselmann

58 fbf0262f Michael Hanselmann
  """
59 fbf0262f Michael Hanselmann
60 fbf0262f Michael Hanselmann
61 70552c46 Michael Hanselmann
def TimeStampNow():
62 ea03467c Iustin Pop
  """Returns the current timestamp.
63 ea03467c Iustin Pop

64 ea03467c Iustin Pop
  @rtype: tuple
65 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
66 ea03467c Iustin Pop

67 ea03467c Iustin Pop
  """
68 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
69 70552c46 Michael Hanselmann
70 70552c46 Michael Hanselmann
71 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
72 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
73 e2715f69 Michael Hanselmann

74 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
75 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
76 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
77 ea03467c Iustin Pop
  @ivar status: the current status
78 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
79 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
80 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
81 f1048938 Iustin Pop

82 e2715f69 Michael Hanselmann
  """
83 66d895a8 Iustin Pop
  __slots__ = ["input", "status", "result", "log",
84 66d895a8 Iustin Pop
               "start_timestamp", "end_timestamp",
85 66d895a8 Iustin Pop
               "__weakref__"]
86 66d895a8 Iustin Pop
87 85f03e0d Michael Hanselmann
  def __init__(self, op):
88 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
89 ea03467c Iustin Pop

90 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
91 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
92 ea03467c Iustin Pop

93 ea03467c Iustin Pop
    """
94 85f03e0d Michael Hanselmann
    self.input = op
95 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
96 85f03e0d Michael Hanselmann
    self.result = None
97 85f03e0d Michael Hanselmann
    self.log = []
98 70552c46 Michael Hanselmann
    self.start_timestamp = None
99 70552c46 Michael Hanselmann
    self.end_timestamp = None
100 f1da30e6 Michael Hanselmann
101 f1da30e6 Michael Hanselmann
  @classmethod
102 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
103 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
104 ea03467c Iustin Pop

105 ea03467c Iustin Pop
    @type state: dict
106 ea03467c Iustin Pop
    @param state: the serialized state
107 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
108 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
109 ea03467c Iustin Pop

110 ea03467c Iustin Pop
    """
111 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
112 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113 85f03e0d Michael Hanselmann
    obj.status = state["status"]
114 85f03e0d Michael Hanselmann
    obj.result = state["result"]
115 85f03e0d Michael Hanselmann
    obj.log = state["log"]
116 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
117 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
118 f1da30e6 Michael Hanselmann
    return obj
119 f1da30e6 Michael Hanselmann
120 f1da30e6 Michael Hanselmann
  def Serialize(self):
121 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
122 ea03467c Iustin Pop

123 ea03467c Iustin Pop
    @rtype: dict
124 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
125 ea03467c Iustin Pop

126 ea03467c Iustin Pop
    """
127 6c5a7090 Michael Hanselmann
    return {
128 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
129 6c5a7090 Michael Hanselmann
      "status": self.status,
130 6c5a7090 Michael Hanselmann
      "result": self.result,
131 6c5a7090 Michael Hanselmann
      "log": self.log,
132 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
133 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
134 6c5a7090 Michael Hanselmann
      }
135 f1048938 Iustin Pop
136 e2715f69 Michael Hanselmann
137 e2715f69 Michael Hanselmann
class _QueuedJob(object):
138 e2715f69 Michael Hanselmann
  """In-memory job representation.
139 e2715f69 Michael Hanselmann

140 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
141 ea03467c Iustin Pop
  be taken care of by users of this class.
142 ea03467c Iustin Pop

143 ea03467c Iustin Pop
  @type queue: L{JobQueue}
144 ea03467c Iustin Pop
  @ivar queue: the parent queue
145 ea03467c Iustin Pop
  @ivar id: the job ID
146 ea03467c Iustin Pop
  @type ops: list
147 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
148 ea03467c Iustin Pop
  @type run_op_index: int
149 ea03467c Iustin Pop
  @ivar run_op_index: the currently executing opcode, or -1 if
150 ea03467c Iustin Pop
      we didn't yet start executing
151 ea03467c Iustin Pop
  @type log_serial: int
152 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
153 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
154 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
155 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
156 ea03467c Iustin Pop
  @ivar change: a Condition variable we use for waiting for job changes
157 e2715f69 Michael Hanselmann

158 e2715f69 Michael Hanselmann
  """
159 66d895a8 Iustin Pop
  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
161 66d895a8 Iustin Pop
               "change",
162 66d895a8 Iustin Pop
               "__weakref__"]
163 66d895a8 Iustin Pop
164 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
165 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
166 ea03467c Iustin Pop

167 ea03467c Iustin Pop
    @type queue: L{JobQueue}
168 ea03467c Iustin Pop
    @param queue: our parent queue
169 ea03467c Iustin Pop
    @type job_id: job_id
170 ea03467c Iustin Pop
    @param job_id: our job id
171 ea03467c Iustin Pop
    @type ops: list
172 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
173 ea03467c Iustin Pop
        in _QueuedOpCodes
174 ea03467c Iustin Pop

175 ea03467c Iustin Pop
    """
176 e2715f69 Michael Hanselmann
    if not ops:
177 ea03467c Iustin Pop
      # TODO: use a better exception
178 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
179 e2715f69 Michael Hanselmann
180 85f03e0d Michael Hanselmann
    self.queue = queue
181 f1da30e6 Michael Hanselmann
    self.id = job_id
182 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
183 85f03e0d Michael Hanselmann
    self.run_op_index = -1
184 6c5a7090 Michael Hanselmann
    self.log_serial = 0
185 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
186 c56ec146 Iustin Pop
    self.start_timestamp = None
187 c56ec146 Iustin Pop
    self.end_timestamp = None
188 6c5a7090 Michael Hanselmann
189 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
190 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
191 f1da30e6 Michael Hanselmann
192 f1da30e6 Michael Hanselmann
  @classmethod
193 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
194 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
195 ea03467c Iustin Pop

196 ea03467c Iustin Pop
    @type queue: L{JobQueue}
197 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
198 ea03467c Iustin Pop
    @type state: dict
199 ea03467c Iustin Pop
    @param state: the serialized state
200 ea03467c Iustin Pop
    @rtype: _JobQueue
201 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
202 ea03467c Iustin Pop

203 ea03467c Iustin Pop
    """
204 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
205 85f03e0d Michael Hanselmann
    obj.queue = queue
206 85f03e0d Michael Hanselmann
    obj.id = state["id"]
207 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
208 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
209 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
210 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
211 6c5a7090 Michael Hanselmann
212 6c5a7090 Michael Hanselmann
    obj.ops = []
213 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
214 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
215 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
216 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
217 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
218 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
219 6c5a7090 Michael Hanselmann
220 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
221 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
222 6c5a7090 Michael Hanselmann
223 f1da30e6 Michael Hanselmann
    return obj
224 f1da30e6 Michael Hanselmann
225 f1da30e6 Michael Hanselmann
  def Serialize(self):
226 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
227 ea03467c Iustin Pop

228 ea03467c Iustin Pop
    @rtype: dict
229 ea03467c Iustin Pop
    @return: the serialized state
230 ea03467c Iustin Pop

231 ea03467c Iustin Pop
    """
232 f1da30e6 Michael Hanselmann
    return {
233 f1da30e6 Michael Hanselmann
      "id": self.id,
234 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
235 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
236 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
237 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
238 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
239 f1da30e6 Michael Hanselmann
      }
240 f1da30e6 Michael Hanselmann
241 85f03e0d Michael Hanselmann
  def CalcStatus(self):
242 ea03467c Iustin Pop
    """Compute the status of this job.
243 ea03467c Iustin Pop

244 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
245 ea03467c Iustin Pop
    based on their status, computes the job status.
246 ea03467c Iustin Pop

247 ea03467c Iustin Pop
    The algorithm is:
248 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
249 ea03467c Iustin Pop
        status will be the same
250 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
251 ea03467c Iustin Pop
          - waitlock
252 fbf0262f Michael Hanselmann
          - canceling
253 ea03467c Iustin Pop
          - running
254 ea03467c Iustin Pop

255 ea03467c Iustin Pop
        will determine the job status
256 ea03467c Iustin Pop

257 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
258 ea03467c Iustin Pop
        and the job status will be the same
259 ea03467c Iustin Pop

260 ea03467c Iustin Pop
    @return: the job status
261 ea03467c Iustin Pop

262 ea03467c Iustin Pop
    """
263 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
264 e2715f69 Michael Hanselmann
265 e2715f69 Michael Hanselmann
    all_success = True
266 85f03e0d Michael Hanselmann
    for op in self.ops:
267 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
268 e2715f69 Michael Hanselmann
        continue
269 e2715f69 Michael Hanselmann
270 e2715f69 Michael Hanselmann
      all_success = False
271 e2715f69 Michael Hanselmann
272 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
273 e2715f69 Michael Hanselmann
        pass
274 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
275 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
276 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
277 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
278 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
279 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
280 fbf0262f Michael Hanselmann
        break
281 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
282 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
283 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
284 f1da30e6 Michael Hanselmann
        break
285 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
286 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
287 4cb1d919 Michael Hanselmann
        break
288 e2715f69 Michael Hanselmann
289 e2715f69 Michael Hanselmann
    if all_success:
290 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
291 e2715f69 Michael Hanselmann
292 e2715f69 Michael Hanselmann
    return status
293 e2715f69 Michael Hanselmann
294 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
295 ea03467c Iustin Pop
    """Selectively returns the log entries.
296 ea03467c Iustin Pop

297 ea03467c Iustin Pop
    @type newer_than: None or int
298 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
299 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
300 ea03467c Iustin Pop
        than this value
301 ea03467c Iustin Pop
    @rtype: list
302 ea03467c Iustin Pop
    @return: the list of the log entries selected
303 ea03467c Iustin Pop

304 ea03467c Iustin Pop
    """
305 6c5a7090 Michael Hanselmann
    if newer_than is None:
306 6c5a7090 Michael Hanselmann
      serial = -1
307 6c5a7090 Michael Hanselmann
    else:
308 6c5a7090 Michael Hanselmann
      serial = newer_than
309 6c5a7090 Michael Hanselmann
310 6c5a7090 Michael Hanselmann
    entries = []
311 6c5a7090 Michael Hanselmann
    for op in self.ops:
312 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313 6c5a7090 Michael Hanselmann
314 6c5a7090 Michael Hanselmann
    return entries
315 6c5a7090 Michael Hanselmann
316 f1048938 Iustin Pop
317 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
318 ea03467c Iustin Pop
  """The actual job workers.
319 ea03467c Iustin Pop

320 ea03467c Iustin Pop
  """
321 e92376d7 Iustin Pop
  def _NotifyStart(self):
322 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
323 e92376d7 Iustin Pop

324 e92376d7 Iustin Pop
    This is called from the mcpu code as a notifier function, when the
325 e92376d7 Iustin Pop
    LU is finally about to start the Exec() method. Of course, to have
326 e92376d7 Iustin Pop
    end-user visible results, the opcode must be initially (before
327 e92376d7 Iustin Pop
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
328 e92376d7 Iustin Pop

329 e92376d7 Iustin Pop
    """
330 e92376d7 Iustin Pop
    assert self.queue, "Queue attribute is missing"
331 e92376d7 Iustin Pop
    assert self.opcode, "Opcode attribute is missing"
332 e92376d7 Iustin Pop
333 e92376d7 Iustin Pop
    self.queue.acquire()
334 e92376d7 Iustin Pop
    try:
335 fbf0262f Michael Hanselmann
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
336 fbf0262f Michael Hanselmann
                                    constants.OP_STATUS_CANCELING)
337 fbf0262f Michael Hanselmann
338 fbf0262f Michael Hanselmann
      # Cancel here if we were asked to
339 fbf0262f Michael Hanselmann
      if self.opcode.status == constants.OP_STATUS_CANCELING:
340 fbf0262f Michael Hanselmann
        raise CancelJob()
341 fbf0262f Michael Hanselmann
342 e92376d7 Iustin Pop
      self.opcode.status = constants.OP_STATUS_RUNNING
343 e92376d7 Iustin Pop
    finally:
344 e92376d7 Iustin Pop
      self.queue.release()
345 e92376d7 Iustin Pop
346 85f03e0d Michael Hanselmann
  def RunTask(self, job):
347 e2715f69 Michael Hanselmann
    """Job executor.
348 e2715f69 Michael Hanselmann

349 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
350 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
351 e2715f69 Michael Hanselmann

352 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
353 ea03467c Iustin Pop
    @param job: the job to be processed
354 ea03467c Iustin Pop

355 e2715f69 Michael Hanselmann
    """
356 d21d09d6 Iustin Pop
    logging.info("Worker %s processing job %s",
357 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
358 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
359 e92376d7 Iustin Pop
    self.queue = queue = job.queue
360 e2715f69 Michael Hanselmann
    try:
361 85f03e0d Michael Hanselmann
      try:
362 85f03e0d Michael Hanselmann
        count = len(job.ops)
363 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
364 d21d09d6 Iustin Pop
          op_summary = op.input.Summary()
365 f6424741 Iustin Pop
          if op.status == constants.OP_STATUS_SUCCESS:
366 f6424741 Iustin Pop
            # this is a job that was partially completed before master
367 f6424741 Iustin Pop
            # daemon shutdown, so it can be expected that some opcodes
368 f6424741 Iustin Pop
            # are already completed successfully (if any did error
369 f6424741 Iustin Pop
            # out, then the whole job should have been aborted and not
370 f6424741 Iustin Pop
            # resubmitted for processing)
371 f6424741 Iustin Pop
            logging.info("Op %s/%s: opcode %s already processed, skipping",
372 f6424741 Iustin Pop
                         idx + 1, count, op_summary)
373 f6424741 Iustin Pop
            continue
374 85f03e0d Michael Hanselmann
          try:
375 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
376 d21d09d6 Iustin Pop
                         op_summary)
377 85f03e0d Michael Hanselmann
378 85f03e0d Michael Hanselmann
            queue.acquire()
379 85f03e0d Michael Hanselmann
            try:
380 df0fb067 Iustin Pop
              if op.status == constants.OP_STATUS_CANCELED:
381 df0fb067 Iustin Pop
                raise CancelJob()
382 fbf0262f Michael Hanselmann
              assert op.status == constants.OP_STATUS_QUEUED
383 85f03e0d Michael Hanselmann
              job.run_op_index = idx
384 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
385 85f03e0d Michael Hanselmann
              op.result = None
386 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
387 c56ec146 Iustin Pop
              if idx == 0: # first opcode
388 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
389 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
390 85f03e0d Michael Hanselmann
391 38206f3c Iustin Pop
              input_opcode = op.input
392 85f03e0d Michael Hanselmann
            finally:
393 85f03e0d Michael Hanselmann
              queue.release()
394 85f03e0d Michael Hanselmann
395 dfe57c22 Michael Hanselmann
            def _Log(*args):
396 6c5a7090 Michael Hanselmann
              """Append a log entry.
397 6c5a7090 Michael Hanselmann

398 6c5a7090 Michael Hanselmann
              """
399 6c5a7090 Michael Hanselmann
              assert len(args) < 3
400 6c5a7090 Michael Hanselmann
401 6c5a7090 Michael Hanselmann
              if len(args) == 1:
402 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
403 6c5a7090 Michael Hanselmann
                log_msg = args[0]
404 6c5a7090 Michael Hanselmann
              else:
405 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
406 6c5a7090 Michael Hanselmann
407 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
408 6c5a7090 Michael Hanselmann
              # precision.
409 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
410 dfe57c22 Michael Hanselmann
411 6c5a7090 Michael Hanselmann
              queue.acquire()
412 dfe57c22 Michael Hanselmann
              try:
413 6c5a7090 Michael Hanselmann
                job.log_serial += 1
414 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
415 6c5a7090 Michael Hanselmann
416 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
417 dfe57c22 Michael Hanselmann
              finally:
418 6c5a7090 Michael Hanselmann
                queue.release()
419 dfe57c22 Michael Hanselmann
420 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
421 e92376d7 Iustin Pop
            self.opcode = op
422 e92376d7 Iustin Pop
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
423 85f03e0d Michael Hanselmann
424 85f03e0d Michael Hanselmann
            queue.acquire()
425 85f03e0d Michael Hanselmann
            try:
426 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
427 85f03e0d Michael Hanselmann
              op.result = result
428 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
429 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
430 85f03e0d Michael Hanselmann
            finally:
431 85f03e0d Michael Hanselmann
              queue.release()
432 85f03e0d Michael Hanselmann
433 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Successfully finished opcode %s",
434 d21d09d6 Iustin Pop
                         idx + 1, count, op_summary)
435 fbf0262f Michael Hanselmann
          except CancelJob:
436 fbf0262f Michael Hanselmann
            # Will be handled further up
437 fbf0262f Michael Hanselmann
            raise
438 85f03e0d Michael Hanselmann
          except Exception, err:
439 85f03e0d Michael Hanselmann
            queue.acquire()
440 85f03e0d Michael Hanselmann
            try:
441 85f03e0d Michael Hanselmann
              try:
442 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
443 85f03e0d Michael Hanselmann
                op.result = str(err)
444 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
445 0f6be82a Iustin Pop
                logging.info("Op %s/%s: Error in opcode %s: %s",
446 0f6be82a Iustin Pop
                             idx + 1, count, op_summary, err)
447 85f03e0d Michael Hanselmann
              finally:
448 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
449 85f03e0d Michael Hanselmann
            finally:
450 85f03e0d Michael Hanselmann
              queue.release()
451 85f03e0d Michael Hanselmann
            raise
452 85f03e0d Michael Hanselmann
453 fbf0262f Michael Hanselmann
      except CancelJob:
454 fbf0262f Michael Hanselmann
        queue.acquire()
455 fbf0262f Michael Hanselmann
        try:
456 fbf0262f Michael Hanselmann
          queue.CancelJobUnlocked(job)
457 fbf0262f Michael Hanselmann
        finally:
458 fbf0262f Michael Hanselmann
          queue.release()
459 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
460 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
461 85f03e0d Michael Hanselmann
      except:
462 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
463 e2715f69 Michael Hanselmann
    finally:
464 85f03e0d Michael Hanselmann
      queue.acquire()
465 85f03e0d Michael Hanselmann
      try:
466 65548ed5 Michael Hanselmann
        try:
467 ed21712b Iustin Pop
          job.run_op_index = -1
468 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
469 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
470 65548ed5 Michael Hanselmann
        finally:
471 65548ed5 Michael Hanselmann
          job_id = job.id
472 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
473 85f03e0d Michael Hanselmann
      finally:
474 85f03e0d Michael Hanselmann
        queue.release()
475 d21d09d6 Iustin Pop
      logging.info("Worker %s finished job %s, status = %s",
476 d21d09d6 Iustin Pop
                   self.worker_id, job_id, status)
477 e2715f69 Michael Hanselmann
478 e2715f69 Michael Hanselmann
479 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
480 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
481 ea03467c Iustin Pop

482 ea03467c Iustin Pop
  """
483 5bdce580 Michael Hanselmann
  def __init__(self, queue):
484 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
485 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
486 5bdce580 Michael Hanselmann
    self.queue = queue
487 e2715f69 Michael Hanselmann
488 e2715f69 Michael Hanselmann
489 85f03e0d Michael Hanselmann
class JobQueue(object):
490 5bbd3f7f Michael Hanselmann
  """Queue used to manage the jobs.
491 ea03467c Iustin Pop

492 ea03467c Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
493 ea03467c Iustin Pop

494 ea03467c Iustin Pop
  """
495 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
496 f1da30e6 Michael Hanselmann
497 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
498 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
499 db37da70 Michael Hanselmann

500 ea03467c Iustin Pop
    This function should be used for all 'public' functions. That is,
501 ea03467c Iustin Pop
    functions usually called from other classes.
502 db37da70 Michael Hanselmann

503 ea03467c Iustin Pop
    @warning: Use this decorator only after utils.LockedMethod!
504 db37da70 Michael Hanselmann

505 ea03467c Iustin Pop
    Example::
506 db37da70 Michael Hanselmann
      @utils.LockedMethod
507 db37da70 Michael Hanselmann
      @_RequireOpenQueue
508 db37da70 Michael Hanselmann
      def Example(self):
509 db37da70 Michael Hanselmann
        pass
510 db37da70 Michael Hanselmann

511 db37da70 Michael Hanselmann
    """
512 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
513 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
514 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
515 db37da70 Michael Hanselmann
    return wrapper
516 db37da70 Michael Hanselmann
517 85f03e0d Michael Hanselmann
  def __init__(self, context):
518 ea03467c Iustin Pop
    """Constructor for JobQueue.
519 ea03467c Iustin Pop

520 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
521 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
522 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
523 ea03467c Iustin Pop
    running).
524 ea03467c Iustin Pop

525 ea03467c Iustin Pop
    @type context: GanetiContext
526 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
527 ea03467c Iustin Pop
        data and other ganeti objects
528 ea03467c Iustin Pop

529 ea03467c Iustin Pop
    """
530 5bdce580 Michael Hanselmann
    self.context = context
531 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
532 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
533 f1da30e6 Michael Hanselmann
534 85f03e0d Michael Hanselmann
    # Locking
535 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
536 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
537 85f03e0d Michael Hanselmann
    self.release = self._lock.release
538 85f03e0d Michael Hanselmann
539 04ab05ce Michael Hanselmann
    # Initialize
540 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
541 f1da30e6 Michael Hanselmann
542 04ab05ce Michael Hanselmann
    # Read serial file
543 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
544 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
545 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
546 c4beba1c Iustin Pop
547 23752136 Michael Hanselmann
    # Get initial list of nodes
548 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
549 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
550 59303563 Iustin Pop
                       if n.master_candidate)
551 8e00939c Michael Hanselmann
552 8e00939c Michael Hanselmann
    # Remove master node
553 8e00939c Michael Hanselmann
    try:
554 99aabbed Iustin Pop
      del self._nodes[self._my_hostname]
555 33987705 Iustin Pop
    except KeyError:
556 8e00939c Michael Hanselmann
      pass
557 23752136 Michael Hanselmann
558 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
559 23752136 Michael Hanselmann
560 85f03e0d Michael Hanselmann
    # Setup worker pool
561 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
562 85f03e0d Michael Hanselmann
    try:
563 16714921 Michael Hanselmann
      # We need to lock here because WorkerPool.AddTask() may start a job while
564 16714921 Michael Hanselmann
      # we're still doing our work.
565 16714921 Michael Hanselmann
      self.acquire()
566 16714921 Michael Hanselmann
      try:
567 711b5124 Michael Hanselmann
        logging.info("Inspecting job queue")
568 711b5124 Michael Hanselmann
569 711b5124 Michael Hanselmann
        all_job_ids = self._GetJobIDsUnlocked()
570 b7cb9024 Michael Hanselmann
        jobs_count = len(all_job_ids)
571 711b5124 Michael Hanselmann
        lastinfo = time.time()
572 711b5124 Michael Hanselmann
        for idx, job_id in enumerate(all_job_ids):
573 711b5124 Michael Hanselmann
          # Give an update every 1000 jobs or 10 seconds
574 b7cb9024 Michael Hanselmann
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
575 b7cb9024 Michael Hanselmann
              idx == (jobs_count - 1)):
576 711b5124 Michael Hanselmann
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
577 b7cb9024 Michael Hanselmann
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
578 711b5124 Michael Hanselmann
            lastinfo = time.time()
579 711b5124 Michael Hanselmann
580 711b5124 Michael Hanselmann
          job = self._LoadJobUnlocked(job_id)
581 711b5124 Michael Hanselmann
582 16714921 Michael Hanselmann
          # a failure in loading the job can cause 'None' to be returned
583 16714921 Michael Hanselmann
          if job is None:
584 16714921 Michael Hanselmann
            continue
585 94ed59a5 Iustin Pop
586 16714921 Michael Hanselmann
          status = job.CalcStatus()
587 85f03e0d Michael Hanselmann
588 16714921 Michael Hanselmann
          if status in (constants.JOB_STATUS_QUEUED, ):
589 16714921 Michael Hanselmann
            self._wpool.AddTask(job)
590 85f03e0d Michael Hanselmann
591 16714921 Michael Hanselmann
          elif status in (constants.JOB_STATUS_RUNNING,
592 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK,
593 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_CANCELING):
594 16714921 Michael Hanselmann
            logging.warning("Unfinished job %s found: %s", job.id, job)
595 16714921 Michael Hanselmann
            try:
596 16714921 Michael Hanselmann
              for op in job.ops:
597 16714921 Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
598 16714921 Michael Hanselmann
                op.result = "Unclean master daemon shutdown"
599 16714921 Michael Hanselmann
            finally:
600 16714921 Michael Hanselmann
              self.UpdateJobUnlocked(job)
601 711b5124 Michael Hanselmann
602 711b5124 Michael Hanselmann
        logging.info("Job queue inspection finished")
603 16714921 Michael Hanselmann
      finally:
604 16714921 Michael Hanselmann
        self.release()
605 16714921 Michael Hanselmann
    except:
606 16714921 Michael Hanselmann
      self._wpool.TerminateWorkers()
607 16714921 Michael Hanselmann
      raise
608 85f03e0d Michael Hanselmann
609 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
610 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
611 99aabbed Iustin Pop
  def AddNode(self, node):
612 99aabbed Iustin Pop
    """Register a new node with the queue.
613 99aabbed Iustin Pop

614 99aabbed Iustin Pop
    @type node: L{objects.Node}
615 99aabbed Iustin Pop
    @param node: the node object to be added
616 99aabbed Iustin Pop

617 99aabbed Iustin Pop
    """
618 99aabbed Iustin Pop
    node_name = node.name
619 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
620 23752136 Michael Hanselmann
621 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
622 a3811745 Michael Hanselmann
    rpc.RpcRunner.call_jobqueue_purge(node_name)
623 23752136 Michael Hanselmann
624 59303563 Iustin Pop
    if not node.master_candidate:
625 59303563 Iustin Pop
      # remove if existing, ignoring errors
626 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
627 59303563 Iustin Pop
      # and skip the replication of the job ids
628 59303563 Iustin Pop
      return
629 59303563 Iustin Pop
630 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
631 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
632 23752136 Michael Hanselmann
633 d2e03a33 Michael Hanselmann
    # Upload current serial file
634 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
635 d2e03a33 Michael Hanselmann
636 d2e03a33 Michael Hanselmann
    for file_name in files:
637 9f774ee8 Michael Hanselmann
      # Read file content
638 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
639 9f774ee8 Michael Hanselmann
      try:
640 9f774ee8 Michael Hanselmann
        content = fd.read()
641 9f774ee8 Michael Hanselmann
      finally:
642 9f774ee8 Michael Hanselmann
        fd.close()
643 9f774ee8 Michael Hanselmann
644 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
645 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
646 a3811745 Michael Hanselmann
                                                  file_name, content)
647 d2e03a33 Michael Hanselmann
      if not result[node_name]:
648 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
649 d2e03a33 Michael Hanselmann
650 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
651 d2e03a33 Michael Hanselmann
652 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
653 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
654 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
655 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
656 ea03467c Iustin Pop

657 ea03467c Iustin Pop
    @type node_name: str
658 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
659 ea03467c Iustin Pop

660 ea03467c Iustin Pop
    """
661 23752136 Michael Hanselmann
    try:
662 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
663 99aabbed Iustin Pop
      del self._nodes[node_name]
664 d2e03a33 Michael Hanselmann
    except KeyError:
665 23752136 Michael Hanselmann
      pass
666 23752136 Michael Hanselmann
667 e74798c1 Michael Hanselmann
  def _CheckRpcResult(self, result, nodes, failmsg):
668 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
669 ea03467c Iustin Pop

670 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
671 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
672 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
673 ea03467c Iustin Pop

674 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
675 ea03467c Iustin Pop
    @type nodes: list
676 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
677 ea03467c Iustin Pop
    @type failmsg: str
678 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
679 ea03467c Iustin Pop

680 ea03467c Iustin Pop
    """
681 e74798c1 Michael Hanselmann
    failed = []
682 e74798c1 Michael Hanselmann
    success = []
683 e74798c1 Michael Hanselmann
684 e74798c1 Michael Hanselmann
    for node in nodes:
685 e74798c1 Michael Hanselmann
      if result[node]:
686 e74798c1 Michael Hanselmann
        success.append(node)
687 e74798c1 Michael Hanselmann
      else:
688 e74798c1 Michael Hanselmann
        failed.append(node)
689 e74798c1 Michael Hanselmann
690 e74798c1 Michael Hanselmann
    if failed:
691 e74798c1 Michael Hanselmann
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
692 e74798c1 Michael Hanselmann
693 e74798c1 Michael Hanselmann
    # +1 for the master node
694 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
695 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
696 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
697 e74798c1 Michael Hanselmann
698 99aabbed Iustin Pop
  def _GetNodeIp(self):
699 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
700 99aabbed Iustin Pop

701 ea03467c Iustin Pop
    @rtype: (list, list)
702 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
703 ea03467c Iustin Pop
        names and the second one with the node addresses
704 ea03467c Iustin Pop

705 99aabbed Iustin Pop
    """
706 99aabbed Iustin Pop
    name_list = self._nodes.keys()
707 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
708 99aabbed Iustin Pop
    return name_list, addr_list
709 99aabbed Iustin Pop
710 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
711 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
712 8e00939c Michael Hanselmann

713 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
714 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
715 ea03467c Iustin Pop

716 ea03467c Iustin Pop
    @type file_name: str
717 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
718 ea03467c Iustin Pop
    @type data: str
719 ea03467c Iustin Pop
    @param data: the new contents of the file
720 ea03467c Iustin Pop

721 8e00939c Michael Hanselmann
    """
722 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
723 8e00939c Michael Hanselmann
724 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
725 a3811745 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
726 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
727 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
728 23752136 Michael Hanselmann
729 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
730 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
731 ea03467c Iustin Pop

732 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
733 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
734 ea03467c Iustin Pop

735 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
736 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
737 ea03467c Iustin Pop

738 ea03467c Iustin Pop
    """
739 dd875d32 Michael Hanselmann
    # Rename them locally
740 d7fd1f28 Michael Hanselmann
    for old, new in rename:
741 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
742 abc1f2ce Michael Hanselmann
743 dd875d32 Michael Hanselmann
    # ... and on all nodes
744 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
745 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
746 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
747 abc1f2ce Michael Hanselmann
748 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
749 ea03467c Iustin Pop
    """Convert a job ID to string format.
750 ea03467c Iustin Pop

751 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
752 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
753 ea03467c Iustin Pop
    abstract this change.
754 ea03467c Iustin Pop

755 ea03467c Iustin Pop
    @type job_id: int or long
756 ea03467c Iustin Pop
    @param job_id: the numeric job id
757 ea03467c Iustin Pop
    @rtype: str
758 ea03467c Iustin Pop
    @return: the formatted job id
759 ea03467c Iustin Pop

760 ea03467c Iustin Pop
    """
761 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
762 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
763 85f03e0d Michael Hanselmann
    if job_id < 0:
764 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
765 85f03e0d Michael Hanselmann
766 85f03e0d Michael Hanselmann
    return str(job_id)
767 85f03e0d Michael Hanselmann
768 58b22b6e Michael Hanselmann
  @classmethod
769 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
770 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
771 58b22b6e Michael Hanselmann

772 58b22b6e Michael Hanselmann
    @type job_id: str
773 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
774 58b22b6e Michael Hanselmann
    @rtype: str
775 58b22b6e Michael Hanselmann
    @return: Directory name
776 58b22b6e Michael Hanselmann

777 58b22b6e Michael Hanselmann
    """
778 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
779 58b22b6e Michael Hanselmann
780 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
781 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
782 f1da30e6 Michael Hanselmann

783 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
784 f1da30e6 Michael Hanselmann

785 ea03467c Iustin Pop
    @rtype: str
786 ea03467c Iustin Pop
    @return: a string representing the job identifier.
787 f1da30e6 Michael Hanselmann

788 f1da30e6 Michael Hanselmann
    """
789 f1da30e6 Michael Hanselmann
    # New number
790 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
791 f1da30e6 Michael Hanselmann
792 f1da30e6 Michael Hanselmann
    # Write to file
793 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
794 23752136 Michael Hanselmann
                                        "%s\n" % serial)
795 f1da30e6 Michael Hanselmann
796 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
797 f1da30e6 Michael Hanselmann
    self._last_serial = serial
798 f1da30e6 Michael Hanselmann
799 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
800 f1da30e6 Michael Hanselmann
801 85f03e0d Michael Hanselmann
  @staticmethod
802 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
803 ea03467c Iustin Pop
    """Returns the job file for a given job id.
804 ea03467c Iustin Pop

805 ea03467c Iustin Pop
    @type job_id: str
806 ea03467c Iustin Pop
    @param job_id: the job identifier
807 ea03467c Iustin Pop
    @rtype: str
808 ea03467c Iustin Pop
    @return: the path to the job file
809 ea03467c Iustin Pop

810 ea03467c Iustin Pop
    """
811 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
812 f1da30e6 Michael Hanselmann
813 58b22b6e Michael Hanselmann
  @classmethod
814 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
815 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
816 ea03467c Iustin Pop

817 ea03467c Iustin Pop
    @type job_id: str
818 ea03467c Iustin Pop
    @param job_id: the job identifier
819 ea03467c Iustin Pop
    @rtype: str
820 ea03467c Iustin Pop
    @return: the path to the archived job file
821 ea03467c Iustin Pop

822 ea03467c Iustin Pop
    """
823 58b22b6e Michael Hanselmann
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
824 58b22b6e Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
825 0cb94105 Michael Hanselmann
826 85f03e0d Michael Hanselmann
  @classmethod
827 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
828 ea03467c Iustin Pop
    """Extract the job id from a filename.
829 ea03467c Iustin Pop

830 ea03467c Iustin Pop
    @type name: str
831 ea03467c Iustin Pop
    @param name: the job filename
832 ea03467c Iustin Pop
    @rtype: job id or None
833 ea03467c Iustin Pop
    @return: the job id corresponding to the given filename,
834 ea03467c Iustin Pop
        or None if the filename does not represent a valid
835 ea03467c Iustin Pop
        job file
836 ea03467c Iustin Pop

837 ea03467c Iustin Pop
    """
838 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
839 fae737ac Michael Hanselmann
    if m:
840 fae737ac Michael Hanselmann
      return m.group(1)
841 fae737ac Michael Hanselmann
    else:
842 fae737ac Michael Hanselmann
      return None
843 fae737ac Michael Hanselmann
844 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
845 911a495b Iustin Pop
    """Return all known job IDs.
846 911a495b Iustin Pop

847 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
848 911a495b Iustin Pop
    included. Currently this argument is unused.
849 911a495b Iustin Pop

850 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
851 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
852 ac0930b9 Iustin Pop
    extra IDs).
853 ac0930b9 Iustin Pop

854 ea03467c Iustin Pop
    @rtype: list
855 ea03467c Iustin Pop
    @return: the list of job IDs
856 ea03467c Iustin Pop

857 911a495b Iustin Pop
    """
858 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
859 3b87986e Iustin Pop
    jlist = utils.NiceSort(jlist)
860 f0d874fe Iustin Pop
    return jlist
861 911a495b Iustin Pop
862 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
863 ea03467c Iustin Pop
    """Returns the list of current job files.
864 ea03467c Iustin Pop

865 ea03467c Iustin Pop
    @rtype: list
866 ea03467c Iustin Pop
    @return: the list of job file names
867 ea03467c Iustin Pop

868 ea03467c Iustin Pop
    """
869 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
870 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
871 f1da30e6 Michael Hanselmann
872 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
873 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
874 ea03467c Iustin Pop

875 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
876 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
877 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
878 ea03467c Iustin Pop

879 ea03467c Iustin Pop
    @param job_id: the job id
880 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
881 ea03467c Iustin Pop
    @return: either None or the job object
882 ea03467c Iustin Pop

883 ea03467c Iustin Pop
    """
884 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
885 5685c1a5 Michael Hanselmann
    if job:
886 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
887 5685c1a5 Michael Hanselmann
      return job
888 ac0930b9 Iustin Pop
889 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
890 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
891 f1da30e6 Michael Hanselmann
    try:
892 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
893 f1da30e6 Michael Hanselmann
    except IOError, err:
894 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
895 f1da30e6 Michael Hanselmann
        return None
896 f1da30e6 Michael Hanselmann
      raise
897 f1da30e6 Michael Hanselmann
    try:
898 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
899 f1da30e6 Michael Hanselmann
    finally:
900 f1da30e6 Michael Hanselmann
      fd.close()
901 f1da30e6 Michael Hanselmann
902 94ed59a5 Iustin Pop
    try:
903 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
904 94ed59a5 Iustin Pop
    except Exception, err:
905 94ed59a5 Iustin Pop
      new_path = self._GetArchivedJobPath(job_id)
906 94ed59a5 Iustin Pop
      if filepath == new_path:
907 94ed59a5 Iustin Pop
        # job already archived (future case)
908 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s", job_id)
909 94ed59a5 Iustin Pop
      else:
910 94ed59a5 Iustin Pop
        # non-archived case
911 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s, will archive.", job_id)
912 d7fd1f28 Michael Hanselmann
        self._RenameFilesUnlocked([(filepath, new_path)])
913 94ed59a5 Iustin Pop
      return None
914 94ed59a5 Iustin Pop
915 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
916 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
917 ac0930b9 Iustin Pop
    return job
918 f1da30e6 Michael Hanselmann
919 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
920 ea03467c Iustin Pop
    """Return a list of jobs based on their IDs.
921 ea03467c Iustin Pop

922 ea03467c Iustin Pop
    @type job_ids: list
923 ea03467c Iustin Pop
    @param job_ids: either an empty list (meaning all jobs),
924 ea03467c Iustin Pop
        or a list of job IDs
925 ea03467c Iustin Pop
    @rtype: list
926 ea03467c Iustin Pop
    @return: the list of job objects
927 ea03467c Iustin Pop

928 ea03467c Iustin Pop
    """
929 911a495b Iustin Pop
    if not job_ids:
930 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
931 f1da30e6 Michael Hanselmann
932 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
933 f1da30e6 Michael Hanselmann
934 686d7433 Iustin Pop
  @staticmethod
935 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
936 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
937 686d7433 Iustin Pop

938 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
939 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
940 686d7433 Iustin Pop

941 ea03467c Iustin Pop
    @rtype: boolean
942 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
943 ea03467c Iustin Pop

944 686d7433 Iustin Pop
    """
945 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
946 686d7433 Iustin Pop
947 3ccafd0e Iustin Pop
  @staticmethod
948 3ccafd0e Iustin Pop
  def SetDrainFlag(drain_flag):
949 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
950 3ccafd0e Iustin Pop

951 3ccafd0e Iustin Pop
    This is similar to the function L{backend.JobQueueSetDrainFlag},
952 3ccafd0e Iustin Pop
    and in the future we might merge them.
953 3ccafd0e Iustin Pop

954 ea03467c Iustin Pop
    @type drain_flag: boolean
955 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
956 ea03467c Iustin Pop

957 3ccafd0e Iustin Pop
    """
958 3ccafd0e Iustin Pop
    if drain_flag:
959 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
960 3ccafd0e Iustin Pop
    else:
961 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
962 3ccafd0e Iustin Pop
    return True
963 3ccafd0e Iustin Pop
964 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
965 db37da70 Michael Hanselmann
  @_RequireOpenQueue
966 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
967 85f03e0d Michael Hanselmann
    """Create and store a new job.
968 f1da30e6 Michael Hanselmann

969 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
970 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
971 c3f0a12f Iustin Pop

972 c3f0a12f Iustin Pop
    @type ops: list
973 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
974 ea03467c Iustin Pop
    @rtype: job ID
975 ea03467c Iustin Pop
    @return: the job ID of the newly created job
976 ea03467c Iustin Pop
    @raise errors.JobQueueDrainError: if the job is marked for draining
977 c3f0a12f Iustin Pop

978 c3f0a12f Iustin Pop
    """
979 686d7433 Iustin Pop
    if self._IsQueueMarkedDrain():
980 686d7433 Iustin Pop
      raise errors.JobQueueDrainError()
981 f87b405e Michael Hanselmann
982 f87b405e Michael Hanselmann
    # Check job queue size
983 f87b405e Michael Hanselmann
    size = len(self._ListJobFiles())
984 f87b405e Michael Hanselmann
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
985 f87b405e Michael Hanselmann
      # TODO: Autoarchive jobs. Make sure it's not done on every job
986 f87b405e Michael Hanselmann
      # submission, though.
987 f87b405e Michael Hanselmann
      #size = ...
988 f87b405e Michael Hanselmann
      pass
989 f87b405e Michael Hanselmann
990 f87b405e Michael Hanselmann
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
991 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
992 f87b405e Michael Hanselmann
993 f1da30e6 Michael Hanselmann
    # Get job identifier
994 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
995 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
996 f1da30e6 Michael Hanselmann
997 f1da30e6 Michael Hanselmann
    # Write to disk
998 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
999 f1da30e6 Michael Hanselmann
1000 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
1001 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
1002 ac0930b9 Iustin Pop
1003 85f03e0d Michael Hanselmann
    # Add to worker pool
1004 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
1005 85f03e0d Michael Hanselmann
1006 85f03e0d Michael Hanselmann
    return job.id
1007 f1da30e6 Michael Hanselmann
1008 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1009 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
1010 ea03467c Iustin Pop
    """Update a job's on disk storage.
1011 ea03467c Iustin Pop

1012 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
1013 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
1014 ea03467c Iustin Pop
    nodes.
1015 ea03467c Iustin Pop

1016 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1017 ea03467c Iustin Pop
    @param job: the changed job
1018 ea03467c Iustin Pop

1019 ea03467c Iustin Pop
    """
1020 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
1021 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
1022 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
1023 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
1024 ac0930b9 Iustin Pop
1025 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
1026 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
1027 dfe57c22 Michael Hanselmann
1028 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
1029 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
1030 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1031 5c735209 Iustin Pop
                        timeout):
1032 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
1033 6c5a7090 Michael Hanselmann

1034 6c5a7090 Michael Hanselmann
    @type job_id: string
1035 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
1036 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1037 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1038 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1039 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1040 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1041 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1042 5c735209 Iustin Pop
    @type timeout: float
1043 5c735209 Iustin Pop
    @param timeout: maximum time to wait
1044 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1045 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1046 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1047 ea03467c Iustin Pop

1048 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1049 ea03467c Iustin Pop
        we instead return a special value,
1050 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1051 ea03467c Iustin Pop
        as such by the clients
1052 6c5a7090 Michael Hanselmann

1053 6c5a7090 Michael Hanselmann
    """
1054 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
1055 5c735209 Iustin Pop
    end_time = time.time() + timeout
1056 dfe57c22 Michael Hanselmann
    while True:
1057 5c735209 Iustin Pop
      delta_time = end_time - time.time()
1058 5c735209 Iustin Pop
      if delta_time < 0:
1059 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
1060 5c735209 Iustin Pop
1061 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1062 6c5a7090 Michael Hanselmann
      if not job:
1063 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
1064 6c5a7090 Michael Hanselmann
        break
1065 dfe57c22 Michael Hanselmann
1066 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
1067 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
1068 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
1069 dfe57c22 Michael Hanselmann
1070 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
1071 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
1072 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
1073 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
1074 dfe57c22 Michael Hanselmann
      # significantly different.
1075 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1076 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1077 dfe57c22 Michael Hanselmann
1078 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
1079 e92376d7 Iustin Pop
                        constants.JOB_STATUS_RUNNING,
1080 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
1081 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
1082 6c5a7090 Michael Hanselmann
        # no changes.
1083 dfe57c22 Michael Hanselmann
        break
1084 dfe57c22 Michael Hanselmann
1085 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
1086 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
1087 6c5a7090 Michael Hanselmann
        break
1088 6c5a7090 Michael Hanselmann
1089 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
1090 6c5a7090 Michael Hanselmann
1091 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
1092 5c735209 Iustin Pop
      job.change.wait(delta_time)
1093 dfe57c22 Michael Hanselmann
1094 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
1095 dfe57c22 Michael Hanselmann
1096 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
1097 dfe57c22 Michael Hanselmann
1098 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1099 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1100 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1101 188c5e0a Michael Hanselmann
    """Cancels a job.
1102 188c5e0a Michael Hanselmann

1103 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1104 ea03467c Iustin Pop

1105 188c5e0a Michael Hanselmann
    @type job_id: string
1106 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1107 188c5e0a Michael Hanselmann

1108 188c5e0a Michael Hanselmann
    """
1109 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1110 188c5e0a Michael Hanselmann
1111 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1112 188c5e0a Michael Hanselmann
    if not job:
1113 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1114 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1115 fbf0262f Michael Hanselmann
1116 fbf0262f Michael Hanselmann
    job_status = job.CalcStatus()
1117 188c5e0a Michael Hanselmann
1118 fbf0262f Michael Hanselmann
    if job_status not in (constants.JOB_STATUS_QUEUED,
1119 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK):
1120 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
1121 fbf0262f Michael Hanselmann
      return (False, "Job %s is no longer in the queue" % job.id)
1122 fbf0262f Michael Hanselmann
1123 fbf0262f Michael Hanselmann
    if job_status == constants.JOB_STATUS_QUEUED:
1124 fbf0262f Michael Hanselmann
      self.CancelJobUnlocked(job)
1125 fbf0262f Michael Hanselmann
      return (True, "Job %s canceled" % job.id)
1126 188c5e0a Michael Hanselmann
1127 fbf0262f Michael Hanselmann
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1128 fbf0262f Michael Hanselmann
      # The worker will notice the new status and cancel the job
1129 fbf0262f Michael Hanselmann
      try:
1130 fbf0262f Michael Hanselmann
        for op in job.ops:
1131 fbf0262f Michael Hanselmann
          op.status = constants.OP_STATUS_CANCELING
1132 fbf0262f Michael Hanselmann
      finally:
1133 fbf0262f Michael Hanselmann
        self.UpdateJobUnlocked(job)
1134 fbf0262f Michael Hanselmann
      return (True, "Job %s will be canceled" % job.id)
1135 fbf0262f Michael Hanselmann
1136 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1137 fbf0262f Michael Hanselmann
  def CancelJobUnlocked(self, job):
1138 fbf0262f Michael Hanselmann
    """Marks a job as canceled.
1139 fbf0262f Michael Hanselmann

1140 fbf0262f Michael Hanselmann
    """
1141 85f03e0d Michael Hanselmann
    try:
1142 85f03e0d Michael Hanselmann
      for op in job.ops:
1143 df0fb067 Iustin Pop
        op.status = constants.OP_STATUS_CANCELED
1144 fbf0262f Michael Hanselmann
        op.result = "Job canceled by request"
1145 85f03e0d Michael Hanselmann
    finally:
1146 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
1147 188c5e0a Michael Hanselmann
1148 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1149 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
1150 d7fd1f28 Michael Hanselmann
    """Archives jobs.
1151 c609f802 Michael Hanselmann

1152 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
1153 25e7b43f Iustin Pop
    @param jobs: Job objects
1154 d7fd1f28 Michael Hanselmann
    @rtype: int
1155 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
1156 c609f802 Michael Hanselmann

1157 c609f802 Michael Hanselmann
    """
1158 d7fd1f28 Michael Hanselmann
    archive_jobs = []
1159 d7fd1f28 Michael Hanselmann
    rename_files = []
1160 d7fd1f28 Michael Hanselmann
    for job in jobs:
1161 d7fd1f28 Michael Hanselmann
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1162 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_SUCCESS,
1163 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_ERROR):
1164 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
1165 d7fd1f28 Michael Hanselmann
        continue
1166 c609f802 Michael Hanselmann
1167 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
1168 c609f802 Michael Hanselmann
1169 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
1170 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
1171 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
1172 c609f802 Michael Hanselmann
1173 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
1174 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
1175 f1da30e6 Michael Hanselmann
1176 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
1177 d7fd1f28 Michael Hanselmann
                  ", ".join(job.id for job in archive_jobs))
1178 d7fd1f28 Michael Hanselmann
1179 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
1180 78d12585 Michael Hanselmann
1181 07cd723a Iustin Pop
  @utils.LockedMethod
1182 07cd723a Iustin Pop
  @_RequireOpenQueue
1183 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1184 07cd723a Iustin Pop
    """Archives a job.
1185 07cd723a Iustin Pop

1186 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1187 ea03467c Iustin Pop

1188 07cd723a Iustin Pop
    @type job_id: string
1189 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1190 78d12585 Michael Hanselmann
    @rtype: bool
1191 78d12585 Michael Hanselmann
    @return: Whether job was archived
1192 07cd723a Iustin Pop

1193 07cd723a Iustin Pop
    """
1194 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
1195 78d12585 Michael Hanselmann
1196 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1197 78d12585 Michael Hanselmann
    if not job:
1198 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1199 78d12585 Michael Hanselmann
      return False
1200 78d12585 Michael Hanselmann
1201 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
1202 07cd723a Iustin Pop
1203 07cd723a Iustin Pop
  @utils.LockedMethod
1204 07cd723a Iustin Pop
  @_RequireOpenQueue
1205 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
1206 07cd723a Iustin Pop
    """Archives all jobs based on age.
1207 07cd723a Iustin Pop

1208 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1209 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1210 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1211 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1212 07cd723a Iustin Pop

1213 07cd723a Iustin Pop
    @type age: int
1214 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1215 07cd723a Iustin Pop

1216 07cd723a Iustin Pop
    """
1217 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1218 07cd723a Iustin Pop
1219 07cd723a Iustin Pop
    now = time.time()
1220 f8ad5591 Michael Hanselmann
    end_time = now + timeout
1221 f8ad5591 Michael Hanselmann
    archived_count = 0
1222 f8ad5591 Michael Hanselmann
    last_touched = 0
1223 f8ad5591 Michael Hanselmann
1224 f8ad5591 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1225 d7fd1f28 Michael Hanselmann
    pending = []
1226 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1227 f8ad5591 Michael Hanselmann
      last_touched = idx
1228 f8ad5591 Michael Hanselmann
1229 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
1230 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
1231 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
1232 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
1233 f8ad5591 Michael Hanselmann
        break
1234 f8ad5591 Michael Hanselmann
1235 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
1236 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1237 f8ad5591 Michael Hanselmann
      if job:
1238 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
1239 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
1240 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
1241 f8ad5591 Michael Hanselmann
          else:
1242 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
1243 07cd723a Iustin Pop
        else:
1244 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
1245 f8ad5591 Michael Hanselmann
1246 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
1247 d7fd1f28 Michael Hanselmann
          pending.append(job)
1248 d7fd1f28 Michael Hanselmann
1249 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
1250 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
1251 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
1252 d7fd1f28 Michael Hanselmann
            pending = []
1253 f8ad5591 Michael Hanselmann
1254 d7fd1f28 Michael Hanselmann
    if pending:
1255 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
1256 07cd723a Iustin Pop
1257 f8ad5591 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched - 1)
1258 07cd723a Iustin Pop
1259 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
1260 ea03467c Iustin Pop
    """Returns information about a job.
1261 ea03467c Iustin Pop

1262 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1263 ea03467c Iustin Pop
    @param job: the job which we query
1264 ea03467c Iustin Pop
    @type fields: list
1265 ea03467c Iustin Pop
    @param fields: names of fields to return
1266 ea03467c Iustin Pop
    @rtype: list
1267 ea03467c Iustin Pop
    @return: list with one element for each field
1268 ea03467c Iustin Pop
    @raise errors.OpExecError: when an invalid field
1269 ea03467c Iustin Pop
        has been passed
1270 ea03467c Iustin Pop

1271 ea03467c Iustin Pop
    """
1272 e2715f69 Michael Hanselmann
    row = []
1273 e2715f69 Michael Hanselmann
    for fname in fields:
1274 e2715f69 Michael Hanselmann
      if fname == "id":
1275 e2715f69 Michael Hanselmann
        row.append(job.id)
1276 e2715f69 Michael Hanselmann
      elif fname == "status":
1277 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
1278 af30b2fd Michael Hanselmann
      elif fname == "ops":
1279 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
1280 af30b2fd Michael Hanselmann
      elif fname == "opresult":
1281 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
1282 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
1283 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
1284 5b23c34c Iustin Pop
      elif fname == "oplog":
1285 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
1286 c56ec146 Iustin Pop
      elif fname == "opstart":
1287 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
1288 c56ec146 Iustin Pop
      elif fname == "opend":
1289 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
1290 c56ec146 Iustin Pop
      elif fname == "received_ts":
1291 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
1292 c56ec146 Iustin Pop
      elif fname == "start_ts":
1293 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
1294 c56ec146 Iustin Pop
      elif fname == "end_ts":
1295 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
1296 60dd1473 Iustin Pop
      elif fname == "summary":
1297 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
1298 e2715f69 Michael Hanselmann
      else:
1299 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1300 e2715f69 Michael Hanselmann
    return row
1301 e2715f69 Michael Hanselmann
1302 85f03e0d Michael Hanselmann
  @utils.LockedMethod
1303 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1304 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1305 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1306 e2715f69 Michael Hanselmann

1307 ea03467c Iustin Pop
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1308 ea03467c Iustin Pop
    processing for each job.
1309 ea03467c Iustin Pop

1310 ea03467c Iustin Pop
    @type job_ids: list
1311 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1312 ea03467c Iustin Pop
    @type fields: list
1313 ea03467c Iustin Pop
    @param fields: names of fields to return
1314 ea03467c Iustin Pop
    @rtype: list
1315 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1316 ea03467c Iustin Pop
        the requested fields
1317 e2715f69 Michael Hanselmann

1318 e2715f69 Michael Hanselmann
    """
1319 85f03e0d Michael Hanselmann
    jobs = []
1320 e2715f69 Michael Hanselmann
1321 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
1322 85f03e0d Michael Hanselmann
      if job is None:
1323 85f03e0d Michael Hanselmann
        jobs.append(None)
1324 85f03e0d Michael Hanselmann
      else:
1325 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1326 e2715f69 Michael Hanselmann
1327 85f03e0d Michael Hanselmann
    return jobs
1328 e2715f69 Michael Hanselmann
1329 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1330 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1331 e2715f69 Michael Hanselmann
  def Shutdown(self):
1332 e2715f69 Michael Hanselmann
    """Stops the job queue.
1333 e2715f69 Michael Hanselmann

1334 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1335 ea03467c Iustin Pop

1336 e2715f69 Michael Hanselmann
    """
1337 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1338 85f03e0d Michael Hanselmann
1339 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
1340 04ab05ce Michael Hanselmann
    self._queue_lock = None