Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ acd65a16

History | View | Annotate | Download (42 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 e2715f69 Michael Hanselmann
import threading
35 f1da30e6 Michael Hanselmann
import errno
36 f1da30e6 Michael Hanselmann
import re
37 f1048938 Iustin Pop
import time
38 5685c1a5 Michael Hanselmann
import weakref
39 498ae1cc Iustin Pop
40 e2715f69 Michael Hanselmann
from ganeti import constants
41 f1da30e6 Michael Hanselmann
from ganeti import serializer
42 e2715f69 Michael Hanselmann
from ganeti import workerpool
43 f1da30e6 Michael Hanselmann
from ganeti import opcodes
44 7a1ecaed Iustin Pop
from ganeti import errors
45 e2715f69 Michael Hanselmann
from ganeti import mcpu
46 7996a135 Iustin Pop
from ganeti import utils
47 04ab05ce Michael Hanselmann
from ganeti import jstore
48 c3f0a12f Iustin Pop
from ganeti import rpc
49 e2715f69 Michael Hanselmann
50 fbf0262f Michael Hanselmann
51 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
52 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53 e2715f69 Michael Hanselmann
54 498ae1cc Iustin Pop
55 9728ae5d Iustin Pop
class CancelJob(Exception):
56 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
57 fbf0262f Michael Hanselmann

58 fbf0262f Michael Hanselmann
  """
59 fbf0262f Michael Hanselmann
60 fbf0262f Michael Hanselmann
61 70552c46 Michael Hanselmann
def TimeStampNow():
62 ea03467c Iustin Pop
  """Returns the current timestamp.
63 ea03467c Iustin Pop

64 ea03467c Iustin Pop
  @rtype: tuple
65 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
66 ea03467c Iustin Pop

67 ea03467c Iustin Pop
  """
68 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
69 70552c46 Michael Hanselmann
70 70552c46 Michael Hanselmann
71 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
72 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
73 e2715f69 Michael Hanselmann

74 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
75 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
76 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
77 ea03467c Iustin Pop
  @ivar status: the current status
78 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
79 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
80 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
81 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
82 f1048938 Iustin Pop

83 e2715f69 Michael Hanselmann
  """
84 66d895a8 Iustin Pop
  __slots__ = ["input", "status", "result", "log",
85 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
86 66d895a8 Iustin Pop
               "__weakref__"]
87 66d895a8 Iustin Pop
88 85f03e0d Michael Hanselmann
  def __init__(self, op):
89 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
90 ea03467c Iustin Pop

91 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
92 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
93 ea03467c Iustin Pop

94 ea03467c Iustin Pop
    """
95 85f03e0d Michael Hanselmann
    self.input = op
96 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
97 85f03e0d Michael Hanselmann
    self.result = None
98 85f03e0d Michael Hanselmann
    self.log = []
99 70552c46 Michael Hanselmann
    self.start_timestamp = None
100 b9b5abcb Iustin Pop
    self.exec_timestamp = None
101 70552c46 Michael Hanselmann
    self.end_timestamp = None
102 f1da30e6 Michael Hanselmann
103 f1da30e6 Michael Hanselmann
  @classmethod
104 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
105 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
106 ea03467c Iustin Pop

107 ea03467c Iustin Pop
    @type state: dict
108 ea03467c Iustin Pop
    @param state: the serialized state
109 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
110 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
111 ea03467c Iustin Pop

112 ea03467c Iustin Pop
    """
113 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
114 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
115 85f03e0d Michael Hanselmann
    obj.status = state["status"]
116 85f03e0d Michael Hanselmann
    obj.result = state["result"]
117 85f03e0d Michael Hanselmann
    obj.log = state["log"]
118 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
119 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
120 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
121 f1da30e6 Michael Hanselmann
    return obj
122 f1da30e6 Michael Hanselmann
123 f1da30e6 Michael Hanselmann
  def Serialize(self):
124 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
125 ea03467c Iustin Pop

126 ea03467c Iustin Pop
    @rtype: dict
127 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
128 ea03467c Iustin Pop

129 ea03467c Iustin Pop
    """
130 6c5a7090 Michael Hanselmann
    return {
131 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
132 6c5a7090 Michael Hanselmann
      "status": self.status,
133 6c5a7090 Michael Hanselmann
      "result": self.result,
134 6c5a7090 Michael Hanselmann
      "log": self.log,
135 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
136 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
137 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
138 6c5a7090 Michael Hanselmann
      }
139 f1048938 Iustin Pop
140 e2715f69 Michael Hanselmann
141 e2715f69 Michael Hanselmann
class _QueuedJob(object):
142 e2715f69 Michael Hanselmann
  """In-memory job representation.
143 e2715f69 Michael Hanselmann

144 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
145 ea03467c Iustin Pop
  be taken care of by users of this class.
146 ea03467c Iustin Pop

147 ea03467c Iustin Pop
  @type queue: L{JobQueue}
148 ea03467c Iustin Pop
  @ivar queue: the parent queue
149 ea03467c Iustin Pop
  @ivar id: the job ID
150 ea03467c Iustin Pop
  @type ops: list
151 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
152 ea03467c Iustin Pop
  @type log_serial: int
153 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
154 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
155 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
156 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
157 ef2df7d3 Michael Hanselmann
  @ivar lock_status: In-memory locking information for debugging
158 ea03467c Iustin Pop
  @ivar change: a Condition variable we use for waiting for job changes
159 e2715f69 Michael Hanselmann

160 e2715f69 Michael Hanselmann
  """
161 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
162 d25c1d6a Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial",
163 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
164 ef2df7d3 Michael Hanselmann
               "lock_status", "change",
165 66d895a8 Iustin Pop
               "__weakref__"]
166 66d895a8 Iustin Pop
167 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
168 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
169 ea03467c Iustin Pop

170 ea03467c Iustin Pop
    @type queue: L{JobQueue}
171 ea03467c Iustin Pop
    @param queue: our parent queue
172 ea03467c Iustin Pop
    @type job_id: job_id
173 ea03467c Iustin Pop
    @param job_id: our job id
174 ea03467c Iustin Pop
    @type ops: list
175 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
176 ea03467c Iustin Pop
        in _QueuedOpCodes
177 ea03467c Iustin Pop

178 ea03467c Iustin Pop
    """
179 e2715f69 Michael Hanselmann
    if not ops:
180 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
181 e2715f69 Michael Hanselmann
182 85f03e0d Michael Hanselmann
    self.queue = queue
183 f1da30e6 Michael Hanselmann
    self.id = job_id
184 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
185 6c5a7090 Michael Hanselmann
    self.log_serial = 0
186 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
187 c56ec146 Iustin Pop
    self.start_timestamp = None
188 c56ec146 Iustin Pop
    self.end_timestamp = None
189 6c5a7090 Michael Hanselmann
190 ef2df7d3 Michael Hanselmann
    # In-memory attributes
191 ef2df7d3 Michael Hanselmann
    self.lock_status = None
192 ef2df7d3 Michael Hanselmann
193 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
194 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
195 f1da30e6 Michael Hanselmann
196 9fa2e150 Michael Hanselmann
  def __repr__(self):
197 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
198 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
199 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
200 9fa2e150 Michael Hanselmann
201 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
202 9fa2e150 Michael Hanselmann
203 f1da30e6 Michael Hanselmann
  @classmethod
204 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
205 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
206 ea03467c Iustin Pop

207 ea03467c Iustin Pop
    @type queue: L{JobQueue}
208 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
209 ea03467c Iustin Pop
    @type state: dict
210 ea03467c Iustin Pop
    @param state: the serialized state
211 ea03467c Iustin Pop
    @rtype: _JobQueue
212 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
213 ea03467c Iustin Pop

214 ea03467c Iustin Pop
    """
215 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
216 85f03e0d Michael Hanselmann
    obj.queue = queue
217 85f03e0d Michael Hanselmann
    obj.id = state["id"]
218 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
219 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
220 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
221 6c5a7090 Michael Hanselmann
222 ef2df7d3 Michael Hanselmann
    # In-memory attributes
223 ef2df7d3 Michael Hanselmann
    obj.lock_status = None
224 ef2df7d3 Michael Hanselmann
225 6c5a7090 Michael Hanselmann
    obj.ops = []
226 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
227 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
228 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
229 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
230 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
231 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
232 6c5a7090 Michael Hanselmann
233 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
234 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
235 6c5a7090 Michael Hanselmann
236 f1da30e6 Michael Hanselmann
    return obj
237 f1da30e6 Michael Hanselmann
238 f1da30e6 Michael Hanselmann
  def Serialize(self):
239 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
240 ea03467c Iustin Pop

241 ea03467c Iustin Pop
    @rtype: dict
242 ea03467c Iustin Pop
    @return: the serialized state
243 ea03467c Iustin Pop

244 ea03467c Iustin Pop
    """
245 f1da30e6 Michael Hanselmann
    return {
246 f1da30e6 Michael Hanselmann
      "id": self.id,
247 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
248 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
249 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
250 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
251 f1da30e6 Michael Hanselmann
      }
252 f1da30e6 Michael Hanselmann
253 85f03e0d Michael Hanselmann
  def CalcStatus(self):
254 ea03467c Iustin Pop
    """Compute the status of this job.
255 ea03467c Iustin Pop

256 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
257 ea03467c Iustin Pop
    based on their status, computes the job status.
258 ea03467c Iustin Pop

259 ea03467c Iustin Pop
    The algorithm is:
260 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
261 ea03467c Iustin Pop
        status will be the same
262 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
263 ea03467c Iustin Pop
          - waitlock
264 fbf0262f Michael Hanselmann
          - canceling
265 ea03467c Iustin Pop
          - running
266 ea03467c Iustin Pop

267 ea03467c Iustin Pop
        will determine the job status
268 ea03467c Iustin Pop

269 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
270 ea03467c Iustin Pop
        and the job status will be the same
271 ea03467c Iustin Pop

272 ea03467c Iustin Pop
    @return: the job status
273 ea03467c Iustin Pop

274 ea03467c Iustin Pop
    """
275 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
276 e2715f69 Michael Hanselmann
277 e2715f69 Michael Hanselmann
    all_success = True
278 85f03e0d Michael Hanselmann
    for op in self.ops:
279 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
280 e2715f69 Michael Hanselmann
        continue
281 e2715f69 Michael Hanselmann
282 e2715f69 Michael Hanselmann
      all_success = False
283 e2715f69 Michael Hanselmann
284 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
285 e2715f69 Michael Hanselmann
        pass
286 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
287 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
288 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
289 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
290 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
291 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
292 fbf0262f Michael Hanselmann
        break
293 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
294 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
295 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
296 f1da30e6 Michael Hanselmann
        break
297 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
298 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
299 4cb1d919 Michael Hanselmann
        break
300 e2715f69 Michael Hanselmann
301 e2715f69 Michael Hanselmann
    if all_success:
302 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
303 e2715f69 Michael Hanselmann
304 e2715f69 Michael Hanselmann
    return status
305 e2715f69 Michael Hanselmann
306 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
307 ea03467c Iustin Pop
    """Selectively returns the log entries.
308 ea03467c Iustin Pop

309 ea03467c Iustin Pop
    @type newer_than: None or int
310 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
311 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
312 ea03467c Iustin Pop
        than this value
313 ea03467c Iustin Pop
    @rtype: list
314 ea03467c Iustin Pop
    @return: the list of the log entries selected
315 ea03467c Iustin Pop

316 ea03467c Iustin Pop
    """
317 6c5a7090 Michael Hanselmann
    if newer_than is None:
318 6c5a7090 Michael Hanselmann
      serial = -1
319 6c5a7090 Michael Hanselmann
    else:
320 6c5a7090 Michael Hanselmann
      serial = newer_than
321 6c5a7090 Michael Hanselmann
322 6c5a7090 Michael Hanselmann
    entries = []
323 6c5a7090 Michael Hanselmann
    for op in self.ops:
324 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
325 6c5a7090 Michael Hanselmann
326 6c5a7090 Michael Hanselmann
    return entries
327 6c5a7090 Michael Hanselmann
328 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
329 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
330 34327f51 Iustin Pop

331 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
332 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
333 34327f51 Iustin Pop
    finalised are not changed.
334 34327f51 Iustin Pop

335 34327f51 Iustin Pop
    @param status: a given opcode status
336 34327f51 Iustin Pop
    @param result: the opcode result
337 34327f51 Iustin Pop

338 34327f51 Iustin Pop
    """
339 34327f51 Iustin Pop
    not_marked = True
340 34327f51 Iustin Pop
    for op in self.ops:
341 34327f51 Iustin Pop
      if op.status in constants.OPS_FINALIZED:
342 34327f51 Iustin Pop
        assert not_marked, "Finalized opcodes found after non-finalized ones"
343 34327f51 Iustin Pop
        continue
344 34327f51 Iustin Pop
      op.status = status
345 34327f51 Iustin Pop
      op.result = result
346 34327f51 Iustin Pop
      not_marked = False
347 34327f51 Iustin Pop
348 f1048938 Iustin Pop
349 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
350 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
351 031a3e57 Michael Hanselmann
    """Initializes this class.
352 ea03467c Iustin Pop

353 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
354 031a3e57 Michael Hanselmann
    @param queue: Job queue
355 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
356 031a3e57 Michael Hanselmann
    @param job: Job object
357 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
358 031a3e57 Michael Hanselmann
    @param op: OpCode
359 031a3e57 Michael Hanselmann

360 031a3e57 Michael Hanselmann
    """
361 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
362 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
363 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
364 031a3e57 Michael Hanselmann
365 031a3e57 Michael Hanselmann
    self._queue = queue
366 031a3e57 Michael Hanselmann
    self._job = job
367 031a3e57 Michael Hanselmann
    self._op = op
368 031a3e57 Michael Hanselmann
369 031a3e57 Michael Hanselmann
  def NotifyStart(self):
370 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
371 e92376d7 Iustin Pop

372 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
373 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
374 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
375 031a3e57 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
376 e92376d7 Iustin Pop

377 e92376d7 Iustin Pop
    """
378 031a3e57 Michael Hanselmann
    self._queue.acquire()
379 e92376d7 Iustin Pop
    try:
380 031a3e57 Michael Hanselmann
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
381 031a3e57 Michael Hanselmann
                                 constants.OP_STATUS_CANCELING)
382 fbf0262f Michael Hanselmann
383 ef2df7d3 Michael Hanselmann
      # All locks are acquired by now
384 ef2df7d3 Michael Hanselmann
      self._job.lock_status = None
385 ef2df7d3 Michael Hanselmann
386 fbf0262f Michael Hanselmann
      # Cancel here if we were asked to
387 031a3e57 Michael Hanselmann
      if self._op.status == constants.OP_STATUS_CANCELING:
388 fbf0262f Michael Hanselmann
        raise CancelJob()
389 fbf0262f Michael Hanselmann
390 031a3e57 Michael Hanselmann
      self._op.status = constants.OP_STATUS_RUNNING
391 b9b5abcb Iustin Pop
      self._op.exec_timestamp = TimeStampNow()
392 e92376d7 Iustin Pop
    finally:
393 031a3e57 Michael Hanselmann
      self._queue.release()
394 031a3e57 Michael Hanselmann
395 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
396 031a3e57 Michael Hanselmann
    """Append a log entry.
397 031a3e57 Michael Hanselmann

398 031a3e57 Michael Hanselmann
    """
399 031a3e57 Michael Hanselmann
    assert len(args) < 3
400 031a3e57 Michael Hanselmann
401 031a3e57 Michael Hanselmann
    if len(args) == 1:
402 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
403 031a3e57 Michael Hanselmann
      log_msg = args[0]
404 031a3e57 Michael Hanselmann
    else:
405 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
406 031a3e57 Michael Hanselmann
407 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
408 031a3e57 Michael Hanselmann
    # precision.
409 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
410 e92376d7 Iustin Pop
411 031a3e57 Michael Hanselmann
    self._queue.acquire()
412 031a3e57 Michael Hanselmann
    try:
413 031a3e57 Michael Hanselmann
      self._job.log_serial += 1
414 031a3e57 Michael Hanselmann
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
415 031a3e57 Michael Hanselmann
416 031a3e57 Michael Hanselmann
      self._job.change.notifyAll()
417 031a3e57 Michael Hanselmann
    finally:
418 031a3e57 Michael Hanselmann
      self._queue.release()
419 031a3e57 Michael Hanselmann
420 ef2df7d3 Michael Hanselmann
  def ReportLocks(self, msg):
421 ef2df7d3 Michael Hanselmann
    """Write locking information to the job.
422 ef2df7d3 Michael Hanselmann

423 ef2df7d3 Michael Hanselmann
    Called whenever the LU processor is waiting for a lock or has acquired one.
424 ef2df7d3 Michael Hanselmann

425 ef2df7d3 Michael Hanselmann
    """
426 ef2df7d3 Michael Hanselmann
    # Not getting the queue lock because this is a single assignment
427 ef2df7d3 Michael Hanselmann
    self._job.lock_status = msg
428 ef2df7d3 Michael Hanselmann
429 031a3e57 Michael Hanselmann
430 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
431 031a3e57 Michael Hanselmann
  """The actual job workers.
432 031a3e57 Michael Hanselmann

433 031a3e57 Michael Hanselmann
  """
434 7260cfbe Iustin Pop
  def RunTask(self, job): # pylint: disable-msg=W0221
435 e2715f69 Michael Hanselmann
    """Job executor.
436 e2715f69 Michael Hanselmann

437 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
438 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
439 e2715f69 Michael Hanselmann

440 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
441 ea03467c Iustin Pop
    @param job: the job to be processed
442 ea03467c Iustin Pop

443 e2715f69 Michael Hanselmann
    """
444 02fc74da Michael Hanselmann
    logging.info("Processing job %s", job.id)
445 adfa97e3 Guido Trotter
    proc = mcpu.Processor(self.pool.queue.context, job.id)
446 031a3e57 Michael Hanselmann
    queue = job.queue
447 e2715f69 Michael Hanselmann
    try:
448 85f03e0d Michael Hanselmann
      try:
449 85f03e0d Michael Hanselmann
        count = len(job.ops)
450 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
451 d21d09d6 Iustin Pop
          op_summary = op.input.Summary()
452 f6424741 Iustin Pop
          if op.status == constants.OP_STATUS_SUCCESS:
453 f6424741 Iustin Pop
            # this is a job that was partially completed before master
454 f6424741 Iustin Pop
            # daemon shutdown, so it can be expected that some opcodes
455 f6424741 Iustin Pop
            # are already completed successfully (if any did error
456 f6424741 Iustin Pop
            # out, then the whole job should have been aborted and not
457 f6424741 Iustin Pop
            # resubmitted for processing)
458 f6424741 Iustin Pop
            logging.info("Op %s/%s: opcode %s already processed, skipping",
459 f6424741 Iustin Pop
                         idx + 1, count, op_summary)
460 f6424741 Iustin Pop
            continue
461 85f03e0d Michael Hanselmann
          try:
462 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
463 d21d09d6 Iustin Pop
                         op_summary)
464 85f03e0d Michael Hanselmann
465 85f03e0d Michael Hanselmann
            queue.acquire()
466 85f03e0d Michael Hanselmann
            try:
467 df0fb067 Iustin Pop
              if op.status == constants.OP_STATUS_CANCELED:
468 df0fb067 Iustin Pop
                raise CancelJob()
469 fbf0262f Michael Hanselmann
              assert op.status == constants.OP_STATUS_QUEUED
470 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
471 85f03e0d Michael Hanselmann
              op.result = None
472 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
473 c56ec146 Iustin Pop
              if idx == 0: # first opcode
474 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
475 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
476 85f03e0d Michael Hanselmann
477 38206f3c Iustin Pop
              input_opcode = op.input
478 85f03e0d Michael Hanselmann
            finally:
479 85f03e0d Michael Hanselmann
              queue.release()
480 85f03e0d Michael Hanselmann
481 031a3e57 Michael Hanselmann
            # Make sure not to hold queue lock while calling ExecOpCode
482 031a3e57 Michael Hanselmann
            result = proc.ExecOpCode(input_opcode,
483 ef2df7d3 Michael Hanselmann
                                     _OpExecCallbacks(queue, job, op))
484 85f03e0d Michael Hanselmann
485 85f03e0d Michael Hanselmann
            queue.acquire()
486 85f03e0d Michael Hanselmann
            try:
487 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
488 85f03e0d Michael Hanselmann
              op.result = result
489 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
490 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
491 85f03e0d Michael Hanselmann
            finally:
492 85f03e0d Michael Hanselmann
              queue.release()
493 85f03e0d Michael Hanselmann
494 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Successfully finished opcode %s",
495 d21d09d6 Iustin Pop
                         idx + 1, count, op_summary)
496 fbf0262f Michael Hanselmann
          except CancelJob:
497 fbf0262f Michael Hanselmann
            # Will be handled further up
498 fbf0262f Michael Hanselmann
            raise
499 85f03e0d Michael Hanselmann
          except Exception, err:
500 85f03e0d Michael Hanselmann
            queue.acquire()
501 85f03e0d Michael Hanselmann
            try:
502 85f03e0d Michael Hanselmann
              try:
503 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
504 bcb66fca Iustin Pop
                if isinstance(err, errors.GenericError):
505 bcb66fca Iustin Pop
                  op.result = errors.EncodeException(err)
506 bcb66fca Iustin Pop
                else:
507 bcb66fca Iustin Pop
                  op.result = str(err)
508 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
509 0f6be82a Iustin Pop
                logging.info("Op %s/%s: Error in opcode %s: %s",
510 0f6be82a Iustin Pop
                             idx + 1, count, op_summary, err)
511 85f03e0d Michael Hanselmann
              finally:
512 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
513 85f03e0d Michael Hanselmann
            finally:
514 85f03e0d Michael Hanselmann
              queue.release()
515 85f03e0d Michael Hanselmann
            raise
516 85f03e0d Michael Hanselmann
517 fbf0262f Michael Hanselmann
      except CancelJob:
518 fbf0262f Michael Hanselmann
        queue.acquire()
519 fbf0262f Michael Hanselmann
        try:
520 fbf0262f Michael Hanselmann
          queue.CancelJobUnlocked(job)
521 fbf0262f Michael Hanselmann
        finally:
522 fbf0262f Michael Hanselmann
          queue.release()
523 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
524 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
525 85f03e0d Michael Hanselmann
      except:
526 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
527 e2715f69 Michael Hanselmann
    finally:
528 85f03e0d Michael Hanselmann
      queue.acquire()
529 85f03e0d Michael Hanselmann
      try:
530 65548ed5 Michael Hanselmann
        try:
531 ef2df7d3 Michael Hanselmann
          job.lock_status = None
532 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
533 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
534 65548ed5 Michael Hanselmann
        finally:
535 65548ed5 Michael Hanselmann
          job_id = job.id
536 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
537 85f03e0d Michael Hanselmann
      finally:
538 85f03e0d Michael Hanselmann
        queue.release()
539 ef2df7d3 Michael Hanselmann
540 02fc74da Michael Hanselmann
      logging.info("Finished job %s, status = %s", job_id, status)
541 e2715f69 Michael Hanselmann
542 e2715f69 Michael Hanselmann
543 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
544 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
545 ea03467c Iustin Pop

546 ea03467c Iustin Pop
  """
547 5bdce580 Michael Hanselmann
  def __init__(self, queue):
548 89e2b4d2 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
549 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
550 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
551 5bdce580 Michael Hanselmann
    self.queue = queue
552 e2715f69 Michael Hanselmann
553 e2715f69 Michael Hanselmann
554 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
555 6c881c52 Iustin Pop
  """Decorator for "public" functions.
556 ea03467c Iustin Pop

557 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
558 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
559 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
560 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
561 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
562 ea03467c Iustin Pop

563 6c881c52 Iustin Pop
  @warning: Use this decorator only after utils.LockedMethod!
564 f1da30e6 Michael Hanselmann

565 6c881c52 Iustin Pop
  Example::
566 6c881c52 Iustin Pop
    @utils.LockedMethod
567 6c881c52 Iustin Pop
    @_RequireOpenQueue
568 6c881c52 Iustin Pop
    def Example(self):
569 6c881c52 Iustin Pop
      pass
570 db37da70 Michael Hanselmann

571 6c881c52 Iustin Pop
  """
572 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
573 7260cfbe Iustin Pop
    # pylint: disable-msg=W0212
574 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
575 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
576 6c881c52 Iustin Pop
  return wrapper
577 db37da70 Michael Hanselmann
578 db37da70 Michael Hanselmann
579 6c881c52 Iustin Pop
class JobQueue(object):
580 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
581 db37da70 Michael Hanselmann

582 6c881c52 Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
583 6c881c52 Iustin Pop

584 6c881c52 Iustin Pop
  """
585 6c881c52 Iustin Pop
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
586 db37da70 Michael Hanselmann
587 85f03e0d Michael Hanselmann
  def __init__(self, context):
588 ea03467c Iustin Pop
    """Constructor for JobQueue.
589 ea03467c Iustin Pop

590 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
591 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
592 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
593 ea03467c Iustin Pop
    running).
594 ea03467c Iustin Pop

595 ea03467c Iustin Pop
    @type context: GanetiContext
596 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
597 ea03467c Iustin Pop
        data and other ganeti objects
598 ea03467c Iustin Pop

599 ea03467c Iustin Pop
    """
600 5bdce580 Michael Hanselmann
    self.context = context
601 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
602 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
603 f1da30e6 Michael Hanselmann
604 85f03e0d Michael Hanselmann
    # Locking
605 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
606 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
607 85f03e0d Michael Hanselmann
    self.release = self._lock.release
608 85f03e0d Michael Hanselmann
609 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
610 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
611 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
612 f1da30e6 Michael Hanselmann
613 04ab05ce Michael Hanselmann
    # Read serial file
614 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
615 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
616 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
617 c4beba1c Iustin Pop
618 23752136 Michael Hanselmann
    # Get initial list of nodes
619 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
620 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
621 59303563 Iustin Pop
                       if n.master_candidate)
622 8e00939c Michael Hanselmann
623 8e00939c Michael Hanselmann
    # Remove master node
624 8e00939c Michael Hanselmann
    try:
625 99aabbed Iustin Pop
      del self._nodes[self._my_hostname]
626 33987705 Iustin Pop
    except KeyError:
627 8e00939c Michael Hanselmann
      pass
628 23752136 Michael Hanselmann
629 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
630 23752136 Michael Hanselmann
631 20571a26 Guido Trotter
    self._queue_size = 0
632 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
633 20571a26 Guido Trotter
    self._drained = self._IsQueueMarkedDrain()
634 20571a26 Guido Trotter
635 85f03e0d Michael Hanselmann
    # Setup worker pool
636 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
637 85f03e0d Michael Hanselmann
    try:
638 16714921 Michael Hanselmann
      # We need to lock here because WorkerPool.AddTask() may start a job while
639 16714921 Michael Hanselmann
      # we're still doing our work.
640 16714921 Michael Hanselmann
      self.acquire()
641 16714921 Michael Hanselmann
      try:
642 711b5124 Michael Hanselmann
        logging.info("Inspecting job queue")
643 711b5124 Michael Hanselmann
644 711b5124 Michael Hanselmann
        all_job_ids = self._GetJobIDsUnlocked()
645 b7cb9024 Michael Hanselmann
        jobs_count = len(all_job_ids)
646 711b5124 Michael Hanselmann
        lastinfo = time.time()
647 711b5124 Michael Hanselmann
        for idx, job_id in enumerate(all_job_ids):
648 711b5124 Michael Hanselmann
          # Give an update every 1000 jobs or 10 seconds
649 b7cb9024 Michael Hanselmann
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
650 b7cb9024 Michael Hanselmann
              idx == (jobs_count - 1)):
651 711b5124 Michael Hanselmann
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
652 b7cb9024 Michael Hanselmann
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
653 711b5124 Michael Hanselmann
            lastinfo = time.time()
654 711b5124 Michael Hanselmann
655 711b5124 Michael Hanselmann
          job = self._LoadJobUnlocked(job_id)
656 711b5124 Michael Hanselmann
657 16714921 Michael Hanselmann
          # a failure in loading the job can cause 'None' to be returned
658 16714921 Michael Hanselmann
          if job is None:
659 16714921 Michael Hanselmann
            continue
660 94ed59a5 Iustin Pop
661 16714921 Michael Hanselmann
          status = job.CalcStatus()
662 85f03e0d Michael Hanselmann
663 16714921 Michael Hanselmann
          if status in (constants.JOB_STATUS_QUEUED, ):
664 16714921 Michael Hanselmann
            self._wpool.AddTask(job)
665 85f03e0d Michael Hanselmann
666 16714921 Michael Hanselmann
          elif status in (constants.JOB_STATUS_RUNNING,
667 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK,
668 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_CANCELING):
669 16714921 Michael Hanselmann
            logging.warning("Unfinished job %s found: %s", job.id, job)
670 16714921 Michael Hanselmann
            try:
671 34327f51 Iustin Pop
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
672 34327f51 Iustin Pop
                                    "Unclean master daemon shutdown")
673 16714921 Michael Hanselmann
            finally:
674 16714921 Michael Hanselmann
              self.UpdateJobUnlocked(job)
675 711b5124 Michael Hanselmann
676 711b5124 Michael Hanselmann
        logging.info("Job queue inspection finished")
677 16714921 Michael Hanselmann
      finally:
678 16714921 Michael Hanselmann
        self.release()
679 16714921 Michael Hanselmann
    except:
680 16714921 Michael Hanselmann
      self._wpool.TerminateWorkers()
681 16714921 Michael Hanselmann
      raise
682 85f03e0d Michael Hanselmann
683 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
684 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
685 99aabbed Iustin Pop
  def AddNode(self, node):
686 99aabbed Iustin Pop
    """Register a new node with the queue.
687 99aabbed Iustin Pop

688 99aabbed Iustin Pop
    @type node: L{objects.Node}
689 99aabbed Iustin Pop
    @param node: the node object to be added
690 99aabbed Iustin Pop

691 99aabbed Iustin Pop
    """
692 99aabbed Iustin Pop
    node_name = node.name
693 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
694 23752136 Michael Hanselmann
695 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
696 c8457ce7 Iustin Pop
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
697 3cebe102 Michael Hanselmann
    msg = result.fail_msg
698 c8457ce7 Iustin Pop
    if msg:
699 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
700 c8457ce7 Iustin Pop
                      node_name, msg)
701 23752136 Michael Hanselmann
702 59303563 Iustin Pop
    if not node.master_candidate:
703 59303563 Iustin Pop
      # remove if existing, ignoring errors
704 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
705 59303563 Iustin Pop
      # and skip the replication of the job ids
706 59303563 Iustin Pop
      return
707 59303563 Iustin Pop
708 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
709 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
710 23752136 Michael Hanselmann
711 d2e03a33 Michael Hanselmann
    # Upload current serial file
712 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
713 d2e03a33 Michael Hanselmann
714 d2e03a33 Michael Hanselmann
    for file_name in files:
715 9f774ee8 Michael Hanselmann
      # Read file content
716 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
717 9f774ee8 Michael Hanselmann
718 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
719 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
720 a3811745 Michael Hanselmann
                                                  file_name, content)
721 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
722 c8457ce7 Iustin Pop
      if msg:
723 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
724 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
725 d2e03a33 Michael Hanselmann
726 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
727 d2e03a33 Michael Hanselmann
728 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
729 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
730 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
731 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
732 ea03467c Iustin Pop

733 ea03467c Iustin Pop
    @type node_name: str
734 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
735 ea03467c Iustin Pop

736 ea03467c Iustin Pop
    """
737 23752136 Michael Hanselmann
    try:
738 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
739 99aabbed Iustin Pop
      del self._nodes[node_name]
740 d2e03a33 Michael Hanselmann
    except KeyError:
741 23752136 Michael Hanselmann
      pass
742 23752136 Michael Hanselmann
743 7e950d31 Iustin Pop
  @staticmethod
744 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
745 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
746 ea03467c Iustin Pop

747 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
748 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
749 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
750 ea03467c Iustin Pop

751 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
752 ea03467c Iustin Pop
    @type nodes: list
753 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
754 ea03467c Iustin Pop
    @type failmsg: str
755 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
756 ea03467c Iustin Pop

757 ea03467c Iustin Pop
    """
758 e74798c1 Michael Hanselmann
    failed = []
759 e74798c1 Michael Hanselmann
    success = []
760 e74798c1 Michael Hanselmann
761 e74798c1 Michael Hanselmann
    for node in nodes:
762 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
763 c8457ce7 Iustin Pop
      if msg:
764 e74798c1 Michael Hanselmann
        failed.append(node)
765 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
766 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
767 c8457ce7 Iustin Pop
      else:
768 c8457ce7 Iustin Pop
        success.append(node)
769 e74798c1 Michael Hanselmann
770 e74798c1 Michael Hanselmann
    # +1 for the master node
771 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
772 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
773 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
774 e74798c1 Michael Hanselmann
775 99aabbed Iustin Pop
  def _GetNodeIp(self):
776 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
777 99aabbed Iustin Pop

778 ea03467c Iustin Pop
    @rtype: (list, list)
779 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
780 ea03467c Iustin Pop
        names and the second one with the node addresses
781 ea03467c Iustin Pop

782 99aabbed Iustin Pop
    """
783 99aabbed Iustin Pop
    name_list = self._nodes.keys()
784 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
785 99aabbed Iustin Pop
    return name_list, addr_list
786 99aabbed Iustin Pop
787 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
788 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
789 8e00939c Michael Hanselmann

790 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
791 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
792 ea03467c Iustin Pop

793 ea03467c Iustin Pop
    @type file_name: str
794 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
795 ea03467c Iustin Pop
    @type data: str
796 ea03467c Iustin Pop
    @param data: the new contents of the file
797 ea03467c Iustin Pop

798 8e00939c Michael Hanselmann
    """
799 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
800 8e00939c Michael Hanselmann
801 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
802 a3811745 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
803 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
804 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
805 23752136 Michael Hanselmann
806 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
807 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
808 ea03467c Iustin Pop

809 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
810 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
811 ea03467c Iustin Pop

812 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
813 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
814 ea03467c Iustin Pop

815 ea03467c Iustin Pop
    """
816 dd875d32 Michael Hanselmann
    # Rename them locally
817 d7fd1f28 Michael Hanselmann
    for old, new in rename:
818 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
819 abc1f2ce Michael Hanselmann
820 dd875d32 Michael Hanselmann
    # ... and on all nodes
821 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
822 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
823 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
824 abc1f2ce Michael Hanselmann
825 7e950d31 Iustin Pop
  @staticmethod
826 7e950d31 Iustin Pop
  def _FormatJobID(job_id):
827 ea03467c Iustin Pop
    """Convert a job ID to string format.
828 ea03467c Iustin Pop

829 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
830 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
831 ea03467c Iustin Pop
    abstract this change.
832 ea03467c Iustin Pop

833 ea03467c Iustin Pop
    @type job_id: int or long
834 ea03467c Iustin Pop
    @param job_id: the numeric job id
835 ea03467c Iustin Pop
    @rtype: str
836 ea03467c Iustin Pop
    @return: the formatted job id
837 ea03467c Iustin Pop

838 ea03467c Iustin Pop
    """
839 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
840 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
841 85f03e0d Michael Hanselmann
    if job_id < 0:
842 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
843 85f03e0d Michael Hanselmann
844 85f03e0d Michael Hanselmann
    return str(job_id)
845 85f03e0d Michael Hanselmann
846 58b22b6e Michael Hanselmann
  @classmethod
847 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
848 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
849 58b22b6e Michael Hanselmann

850 58b22b6e Michael Hanselmann
    @type job_id: str
851 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
852 58b22b6e Michael Hanselmann
    @rtype: str
853 58b22b6e Michael Hanselmann
    @return: Directory name
854 58b22b6e Michael Hanselmann

855 58b22b6e Michael Hanselmann
    """
856 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
857 58b22b6e Michael Hanselmann
858 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
859 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
860 f1da30e6 Michael Hanselmann

861 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
862 f1da30e6 Michael Hanselmann

863 009e73d0 Iustin Pop
    @type count: integer
864 009e73d0 Iustin Pop
    @param count: how many serials to return
865 ea03467c Iustin Pop
    @rtype: str
866 ea03467c Iustin Pop
    @return: a string representing the job identifier.
867 f1da30e6 Michael Hanselmann

868 f1da30e6 Michael Hanselmann
    """
869 009e73d0 Iustin Pop
    assert count > 0
870 f1da30e6 Michael Hanselmann
    # New number
871 009e73d0 Iustin Pop
    serial = self._last_serial + count
872 f1da30e6 Michael Hanselmann
873 f1da30e6 Michael Hanselmann
    # Write to file
874 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
875 23752136 Michael Hanselmann
                                        "%s\n" % serial)
876 f1da30e6 Michael Hanselmann
877 009e73d0 Iustin Pop
    result = [self._FormatJobID(v)
878 009e73d0 Iustin Pop
              for v in range(self._last_serial, serial + 1)]
879 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
880 f1da30e6 Michael Hanselmann
    self._last_serial = serial
881 f1da30e6 Michael Hanselmann
882 009e73d0 Iustin Pop
    return result
883 f1da30e6 Michael Hanselmann
884 85f03e0d Michael Hanselmann
  @staticmethod
885 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
886 ea03467c Iustin Pop
    """Returns the job file for a given job id.
887 ea03467c Iustin Pop

888 ea03467c Iustin Pop
    @type job_id: str
889 ea03467c Iustin Pop
    @param job_id: the job identifier
890 ea03467c Iustin Pop
    @rtype: str
891 ea03467c Iustin Pop
    @return: the path to the job file
892 ea03467c Iustin Pop

893 ea03467c Iustin Pop
    """
894 c4feafe8 Iustin Pop
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
895 f1da30e6 Michael Hanselmann
896 58b22b6e Michael Hanselmann
  @classmethod
897 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
898 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
899 ea03467c Iustin Pop

900 ea03467c Iustin Pop
    @type job_id: str
901 ea03467c Iustin Pop
    @param job_id: the job identifier
902 ea03467c Iustin Pop
    @rtype: str
903 ea03467c Iustin Pop
    @return: the path to the archived job file
904 ea03467c Iustin Pop

905 ea03467c Iustin Pop
    """
906 0411c011 Iustin Pop
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
907 0411c011 Iustin Pop
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
908 0cb94105 Michael Hanselmann
909 85a1c57d Guido Trotter
  def _GetJobIDsUnlocked(self, sort=True):
910 911a495b Iustin Pop
    """Return all known job IDs.
911 911a495b Iustin Pop

912 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
913 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
914 ac0930b9 Iustin Pop
    extra IDs).
915 ac0930b9 Iustin Pop

916 85a1c57d Guido Trotter
    @type sort: boolean
917 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
918 ea03467c Iustin Pop
    @rtype: list
919 ea03467c Iustin Pop
    @return: the list of job IDs
920 ea03467c Iustin Pop

921 911a495b Iustin Pop
    """
922 85a1c57d Guido Trotter
    jlist = []
923 85a1c57d Guido Trotter
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR, sort=False):
924 85a1c57d Guido Trotter
      m = self._RE_JOB_FILE.match(filename)
925 85a1c57d Guido Trotter
      if m:
926 85a1c57d Guido Trotter
        jlist.append(m.group(1))
927 85a1c57d Guido Trotter
    if sort:
928 85a1c57d Guido Trotter
      jlist = utils.NiceSort(jlist)
929 f0d874fe Iustin Pop
    return jlist
930 911a495b Iustin Pop
931 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
932 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
933 ea03467c Iustin Pop

934 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
935 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
936 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
937 ea03467c Iustin Pop

938 ea03467c Iustin Pop
    @param job_id: the job id
939 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
940 ea03467c Iustin Pop
    @return: either None or the job object
941 ea03467c Iustin Pop

942 ea03467c Iustin Pop
    """
943 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
944 5685c1a5 Michael Hanselmann
    if job:
945 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
946 5685c1a5 Michael Hanselmann
      return job
947 ac0930b9 Iustin Pop
948 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
949 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
950 f1da30e6 Michael Hanselmann
    try:
951 13998ef2 Michael Hanselmann
      raw_data = utils.ReadFile(filepath)
952 f1da30e6 Michael Hanselmann
    except IOError, err:
953 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
954 f1da30e6 Michael Hanselmann
        return None
955 f1da30e6 Michael Hanselmann
      raise
956 13998ef2 Michael Hanselmann
957 13998ef2 Michael Hanselmann
    data = serializer.LoadJson(raw_data)
958 f1da30e6 Michael Hanselmann
959 94ed59a5 Iustin Pop
    try:
960 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
961 7260cfbe Iustin Pop
    except Exception, err: # pylint: disable-msg=W0703
962 94ed59a5 Iustin Pop
      new_path = self._GetArchivedJobPath(job_id)
963 94ed59a5 Iustin Pop
      if filepath == new_path:
964 94ed59a5 Iustin Pop
        # job already archived (future case)
965 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s", job_id)
966 94ed59a5 Iustin Pop
      else:
967 94ed59a5 Iustin Pop
        # non-archived case
968 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s, will archive.", job_id)
969 d7fd1f28 Michael Hanselmann
        self._RenameFilesUnlocked([(filepath, new_path)])
970 94ed59a5 Iustin Pop
      return None
971 94ed59a5 Iustin Pop
972 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
973 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
974 ac0930b9 Iustin Pop
    return job
975 f1da30e6 Michael Hanselmann
976 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
977 ea03467c Iustin Pop
    """Return a list of jobs based on their IDs.
978 ea03467c Iustin Pop

979 ea03467c Iustin Pop
    @type job_ids: list
980 ea03467c Iustin Pop
    @param job_ids: either an empty list (meaning all jobs),
981 ea03467c Iustin Pop
        or a list of job IDs
982 ea03467c Iustin Pop
    @rtype: list
983 ea03467c Iustin Pop
    @return: the list of job objects
984 ea03467c Iustin Pop

985 ea03467c Iustin Pop
    """
986 911a495b Iustin Pop
    if not job_ids:
987 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
988 f1da30e6 Michael Hanselmann
989 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
990 f1da30e6 Michael Hanselmann
991 686d7433 Iustin Pop
  @staticmethod
992 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
993 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
994 686d7433 Iustin Pop

995 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
996 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
997 686d7433 Iustin Pop

998 ea03467c Iustin Pop
    @rtype: boolean
999 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
1000 ea03467c Iustin Pop

1001 686d7433 Iustin Pop
    """
1002 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1003 686d7433 Iustin Pop
1004 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
1005 20571a26 Guido Trotter
    """Update the queue size.
1006 20571a26 Guido Trotter

1007 20571a26 Guido Trotter
    """
1008 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1009 20571a26 Guido Trotter
1010 20571a26 Guido Trotter
  @utils.LockedMethod
1011 20571a26 Guido Trotter
  @_RequireOpenQueue
1012 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
1013 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
1014 3ccafd0e Iustin Pop

1015 ea03467c Iustin Pop
    @type drain_flag: boolean
1016 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
1017 ea03467c Iustin Pop

1018 3ccafd0e Iustin Pop
    """
1019 3ccafd0e Iustin Pop
    if drain_flag:
1020 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1021 3ccafd0e Iustin Pop
    else:
1022 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1023 20571a26 Guido Trotter
1024 20571a26 Guido Trotter
    self._drained = drain_flag
1025 20571a26 Guido Trotter
1026 3ccafd0e Iustin Pop
    return True
1027 3ccafd0e Iustin Pop
1028 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1029 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
1030 85f03e0d Michael Hanselmann
    """Create and store a new job.
1031 f1da30e6 Michael Hanselmann

1032 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
1033 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
1034 c3f0a12f Iustin Pop

1035 009e73d0 Iustin Pop
    @type job_id: job ID
1036 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
1037 c3f0a12f Iustin Pop
    @type ops: list
1038 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
1039 ea03467c Iustin Pop
    @rtype: job ID
1040 ea03467c Iustin Pop
    @return: the job ID of the newly created job
1041 ea03467c Iustin Pop
    @raise errors.JobQueueDrainError: if the job is marked for draining
1042 c3f0a12f Iustin Pop

1043 c3f0a12f Iustin Pop
    """
1044 20571a26 Guido Trotter
    # Ok when sharing the big job queue lock, as the drain file is created when
1045 20571a26 Guido Trotter
    # the lock is exclusive.
1046 20571a26 Guido Trotter
    if self._drained:
1047 2971c913 Iustin Pop
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1048 f87b405e Michael Hanselmann
1049 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1050 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
1051 f87b405e Michael Hanselmann
1052 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
1053 f1da30e6 Michael Hanselmann
1054 f1da30e6 Michael Hanselmann
    # Write to disk
1055 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
1056 f1da30e6 Michael Hanselmann
1057 20571a26 Guido Trotter
    self._queue_size += 1
1058 20571a26 Guido Trotter
1059 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
1060 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
1061 ac0930b9 Iustin Pop
1062 85f03e0d Michael Hanselmann
    # Add to worker pool
1063 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
1064 85f03e0d Michael Hanselmann
1065 85f03e0d Michael Hanselmann
    return job.id
1066 f1da30e6 Michael Hanselmann
1067 2971c913 Iustin Pop
  @utils.LockedMethod
1068 2971c913 Iustin Pop
  @_RequireOpenQueue
1069 2971c913 Iustin Pop
  def SubmitJob(self, ops):
1070 2971c913 Iustin Pop
    """Create and store a new job.
1071 2971c913 Iustin Pop

1072 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1073 2971c913 Iustin Pop

1074 2971c913 Iustin Pop
    """
1075 009e73d0 Iustin Pop
    job_id = self._NewSerialsUnlocked(1)[0]
1076 009e73d0 Iustin Pop
    return self._SubmitJobUnlocked(job_id, ops)
1077 2971c913 Iustin Pop
1078 2971c913 Iustin Pop
  @utils.LockedMethod
1079 2971c913 Iustin Pop
  @_RequireOpenQueue
1080 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
1081 2971c913 Iustin Pop
    """Create and store multiple jobs.
1082 2971c913 Iustin Pop

1083 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1084 2971c913 Iustin Pop

1085 2971c913 Iustin Pop
    """
1086 2971c913 Iustin Pop
    results = []
1087 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1088 009e73d0 Iustin Pop
    for job_id, ops in zip(all_job_ids, jobs):
1089 2971c913 Iustin Pop
      try:
1090 009e73d0 Iustin Pop
        data = self._SubmitJobUnlocked(job_id, ops)
1091 2971c913 Iustin Pop
        status = True
1092 2971c913 Iustin Pop
      except errors.GenericError, err:
1093 2971c913 Iustin Pop
        data = str(err)
1094 2971c913 Iustin Pop
        status = False
1095 2971c913 Iustin Pop
      results.append((status, data))
1096 2971c913 Iustin Pop
1097 2971c913 Iustin Pop
    return results
1098 2971c913 Iustin Pop
1099 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1100 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
1101 ea03467c Iustin Pop
    """Update a job's on disk storage.
1102 ea03467c Iustin Pop

1103 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
1104 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
1105 ea03467c Iustin Pop
    nodes.
1106 ea03467c Iustin Pop

1107 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1108 ea03467c Iustin Pop
    @param job: the changed job
1109 ea03467c Iustin Pop

1110 ea03467c Iustin Pop
    """
1111 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
1112 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
1113 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
1114 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
1115 ac0930b9 Iustin Pop
1116 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
1117 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
1118 dfe57c22 Michael Hanselmann
1119 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
1120 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
1121 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1122 5c735209 Iustin Pop
                        timeout):
1123 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
1124 6c5a7090 Michael Hanselmann

1125 6c5a7090 Michael Hanselmann
    @type job_id: string
1126 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
1127 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1128 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1129 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1130 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1131 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1132 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1133 5c735209 Iustin Pop
    @type timeout: float
1134 5c735209 Iustin Pop
    @param timeout: maximum time to wait
1135 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1136 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1137 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1138 ea03467c Iustin Pop

1139 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1140 ea03467c Iustin Pop
        we instead return a special value,
1141 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1142 ea03467c Iustin Pop
        as such by the clients
1143 6c5a7090 Michael Hanselmann

1144 6c5a7090 Michael Hanselmann
    """
1145 6bcb1446 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1146 6bcb1446 Michael Hanselmann
    if not job:
1147 6bcb1446 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1148 6bcb1446 Michael Hanselmann
      return None
1149 5c735209 Iustin Pop
1150 6bcb1446 Michael Hanselmann
    def _CheckForChanges():
1151 6bcb1446 Michael Hanselmann
      logging.debug("Waiting for changes in job %s", job_id)
1152 dfe57c22 Michael Hanselmann
1153 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
1154 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
1155 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
1156 dfe57c22 Michael Hanselmann
1157 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
1158 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
1159 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
1160 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
1161 dfe57c22 Michael Hanselmann
      # significantly different.
1162 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1163 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1164 dfe57c22 Michael Hanselmann
1165 6bcb1446 Michael Hanselmann
      # Don't even try to wait if the job is no longer running, there will be
1166 6bcb1446 Michael Hanselmann
      # no changes.
1167 6bcb1446 Michael Hanselmann
      if (status not in (constants.JOB_STATUS_QUEUED,
1168 6bcb1446 Michael Hanselmann
                         constants.JOB_STATUS_RUNNING,
1169 6bcb1446 Michael Hanselmann
                         constants.JOB_STATUS_WAITLOCK) or
1170 6bcb1446 Michael Hanselmann
          prev_job_info != job_info or
1171 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
1172 6bcb1446 Michael Hanselmann
        logging.debug("Job %s changed", job_id)
1173 6bcb1446 Michael Hanselmann
        return (job_info, log_entries)
1174 dfe57c22 Michael Hanselmann
1175 6bcb1446 Michael Hanselmann
      raise utils.RetryAgain()
1176 dfe57c22 Michael Hanselmann
1177 6bcb1446 Michael Hanselmann
    try:
1178 6bcb1446 Michael Hanselmann
      # Setting wait function to release the queue lock while waiting
1179 6bcb1446 Michael Hanselmann
      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1180 6bcb1446 Michael Hanselmann
                         wait_fn=job.change.wait)
1181 6bcb1446 Michael Hanselmann
    except utils.RetryTimeout:
1182 6bcb1446 Michael Hanselmann
      return constants.JOB_NOTCHANGED
1183 dfe57c22 Michael Hanselmann
1184 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1185 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1186 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1187 188c5e0a Michael Hanselmann
    """Cancels a job.
1188 188c5e0a Michael Hanselmann

1189 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1190 ea03467c Iustin Pop

1191 188c5e0a Michael Hanselmann
    @type job_id: string
1192 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1193 188c5e0a Michael Hanselmann

1194 188c5e0a Michael Hanselmann
    """
1195 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1196 188c5e0a Michael Hanselmann
1197 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1198 188c5e0a Michael Hanselmann
    if not job:
1199 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1200 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1201 fbf0262f Michael Hanselmann
1202 fbf0262f Michael Hanselmann
    job_status = job.CalcStatus()
1203 188c5e0a Michael Hanselmann
1204 fbf0262f Michael Hanselmann
    if job_status not in (constants.JOB_STATUS_QUEUED,
1205 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK):
1206 a9e97393 Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1207 a9e97393 Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1208 fbf0262f Michael Hanselmann
1209 fbf0262f Michael Hanselmann
    if job_status == constants.JOB_STATUS_QUEUED:
1210 fbf0262f Michael Hanselmann
      self.CancelJobUnlocked(job)
1211 fbf0262f Michael Hanselmann
      return (True, "Job %s canceled" % job.id)
1212 188c5e0a Michael Hanselmann
1213 fbf0262f Michael Hanselmann
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1214 fbf0262f Michael Hanselmann
      # The worker will notice the new status and cancel the job
1215 fbf0262f Michael Hanselmann
      try:
1216 34327f51 Iustin Pop
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1217 fbf0262f Michael Hanselmann
      finally:
1218 fbf0262f Michael Hanselmann
        self.UpdateJobUnlocked(job)
1219 fbf0262f Michael Hanselmann
      return (True, "Job %s will be canceled" % job.id)
1220 fbf0262f Michael Hanselmann
1221 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1222 fbf0262f Michael Hanselmann
  def CancelJobUnlocked(self, job):
1223 fbf0262f Michael Hanselmann
    """Marks a job as canceled.
1224 fbf0262f Michael Hanselmann

1225 fbf0262f Michael Hanselmann
    """
1226 85f03e0d Michael Hanselmann
    try:
1227 34327f51 Iustin Pop
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1228 34327f51 Iustin Pop
                            "Job canceled by request")
1229 85f03e0d Michael Hanselmann
    finally:
1230 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
1231 188c5e0a Michael Hanselmann
1232 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1233 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
1234 d7fd1f28 Michael Hanselmann
    """Archives jobs.
1235 c609f802 Michael Hanselmann

1236 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
1237 25e7b43f Iustin Pop
    @param jobs: Job objects
1238 d7fd1f28 Michael Hanselmann
    @rtype: int
1239 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
1240 c609f802 Michael Hanselmann

1241 c609f802 Michael Hanselmann
    """
1242 d7fd1f28 Michael Hanselmann
    archive_jobs = []
1243 d7fd1f28 Michael Hanselmann
    rename_files = []
1244 d7fd1f28 Michael Hanselmann
    for job in jobs:
1245 d7fd1f28 Michael Hanselmann
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1246 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_SUCCESS,
1247 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_ERROR):
1248 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
1249 d7fd1f28 Michael Hanselmann
        continue
1250 c609f802 Michael Hanselmann
1251 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
1252 c609f802 Michael Hanselmann
1253 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
1254 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
1255 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
1256 c609f802 Michael Hanselmann
1257 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
1258 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
1259 f1da30e6 Michael Hanselmann
1260 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
1261 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
1262 d7fd1f28 Michael Hanselmann
1263 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1264 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
1265 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
1266 20571a26 Guido Trotter
    # archived jobs to fix this.
1267 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1268 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
1269 78d12585 Michael Hanselmann
1270 07cd723a Iustin Pop
  @utils.LockedMethod
1271 07cd723a Iustin Pop
  @_RequireOpenQueue
1272 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1273 07cd723a Iustin Pop
    """Archives a job.
1274 07cd723a Iustin Pop

1275 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1276 ea03467c Iustin Pop

1277 07cd723a Iustin Pop
    @type job_id: string
1278 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1279 78d12585 Michael Hanselmann
    @rtype: bool
1280 78d12585 Michael Hanselmann
    @return: Whether job was archived
1281 07cd723a Iustin Pop

1282 07cd723a Iustin Pop
    """
1283 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
1284 78d12585 Michael Hanselmann
1285 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1286 78d12585 Michael Hanselmann
    if not job:
1287 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1288 78d12585 Michael Hanselmann
      return False
1289 78d12585 Michael Hanselmann
1290 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
1291 07cd723a Iustin Pop
1292 07cd723a Iustin Pop
  @utils.LockedMethod
1293 07cd723a Iustin Pop
  @_RequireOpenQueue
1294 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
1295 07cd723a Iustin Pop
    """Archives all jobs based on age.
1296 07cd723a Iustin Pop

1297 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1298 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1299 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1300 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1301 07cd723a Iustin Pop

1302 07cd723a Iustin Pop
    @type age: int
1303 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1304 07cd723a Iustin Pop

1305 07cd723a Iustin Pop
    """
1306 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1307 07cd723a Iustin Pop
1308 07cd723a Iustin Pop
    now = time.time()
1309 f8ad5591 Michael Hanselmann
    end_time = now + timeout
1310 f8ad5591 Michael Hanselmann
    archived_count = 0
1311 f8ad5591 Michael Hanselmann
    last_touched = 0
1312 f8ad5591 Michael Hanselmann
1313 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
1314 d7fd1f28 Michael Hanselmann
    pending = []
1315 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1316 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
1317 f8ad5591 Michael Hanselmann
1318 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
1319 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
1320 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
1321 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
1322 f8ad5591 Michael Hanselmann
        break
1323 f8ad5591 Michael Hanselmann
1324 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
1325 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1326 f8ad5591 Michael Hanselmann
      if job:
1327 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
1328 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
1329 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
1330 f8ad5591 Michael Hanselmann
          else:
1331 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
1332 07cd723a Iustin Pop
        else:
1333 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
1334 f8ad5591 Michael Hanselmann
1335 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
1336 d7fd1f28 Michael Hanselmann
          pending.append(job)
1337 d7fd1f28 Michael Hanselmann
1338 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
1339 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
1340 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
1341 d7fd1f28 Michael Hanselmann
            pending = []
1342 f8ad5591 Michael Hanselmann
1343 d7fd1f28 Michael Hanselmann
    if pending:
1344 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
1345 07cd723a Iustin Pop
1346 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
1347 07cd723a Iustin Pop
1348 7e950d31 Iustin Pop
  @staticmethod
1349 7e950d31 Iustin Pop
  def _GetJobInfoUnlocked(job, fields):
1350 ea03467c Iustin Pop
    """Returns information about a job.
1351 ea03467c Iustin Pop

1352 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1353 ea03467c Iustin Pop
    @param job: the job which we query
1354 ea03467c Iustin Pop
    @type fields: list
1355 ea03467c Iustin Pop
    @param fields: names of fields to return
1356 ea03467c Iustin Pop
    @rtype: list
1357 ea03467c Iustin Pop
    @return: list with one element for each field
1358 ea03467c Iustin Pop
    @raise errors.OpExecError: when an invalid field
1359 ea03467c Iustin Pop
        has been passed
1360 ea03467c Iustin Pop

1361 ea03467c Iustin Pop
    """
1362 e2715f69 Michael Hanselmann
    row = []
1363 e2715f69 Michael Hanselmann
    for fname in fields:
1364 e2715f69 Michael Hanselmann
      if fname == "id":
1365 e2715f69 Michael Hanselmann
        row.append(job.id)
1366 e2715f69 Michael Hanselmann
      elif fname == "status":
1367 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
1368 af30b2fd Michael Hanselmann
      elif fname == "ops":
1369 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
1370 af30b2fd Michael Hanselmann
      elif fname == "opresult":
1371 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
1372 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
1373 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
1374 5b23c34c Iustin Pop
      elif fname == "oplog":
1375 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
1376 c56ec146 Iustin Pop
      elif fname == "opstart":
1377 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
1378 b9b5abcb Iustin Pop
      elif fname == "opexec":
1379 b9b5abcb Iustin Pop
        row.append([op.exec_timestamp for op in job.ops])
1380 c56ec146 Iustin Pop
      elif fname == "opend":
1381 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
1382 c56ec146 Iustin Pop
      elif fname == "received_ts":
1383 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
1384 c56ec146 Iustin Pop
      elif fname == "start_ts":
1385 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
1386 c56ec146 Iustin Pop
      elif fname == "end_ts":
1387 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
1388 1d2dcdfd Michael Hanselmann
      elif fname == "lock_status":
1389 1d2dcdfd Michael Hanselmann
        row.append(job.lock_status)
1390 60dd1473 Iustin Pop
      elif fname == "summary":
1391 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
1392 e2715f69 Michael Hanselmann
      else:
1393 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1394 e2715f69 Michael Hanselmann
    return row
1395 e2715f69 Michael Hanselmann
1396 85f03e0d Michael Hanselmann
  @utils.LockedMethod
1397 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1398 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1399 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1400 e2715f69 Michael Hanselmann

1401 ea03467c Iustin Pop
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1402 ea03467c Iustin Pop
    processing for each job.
1403 ea03467c Iustin Pop

1404 ea03467c Iustin Pop
    @type job_ids: list
1405 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1406 ea03467c Iustin Pop
    @type fields: list
1407 ea03467c Iustin Pop
    @param fields: names of fields to return
1408 ea03467c Iustin Pop
    @rtype: list
1409 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1410 ea03467c Iustin Pop
        the requested fields
1411 e2715f69 Michael Hanselmann

1412 e2715f69 Michael Hanselmann
    """
1413 85f03e0d Michael Hanselmann
    jobs = []
1414 e2715f69 Michael Hanselmann
1415 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
1416 85f03e0d Michael Hanselmann
      if job is None:
1417 85f03e0d Michael Hanselmann
        jobs.append(None)
1418 85f03e0d Michael Hanselmann
      else:
1419 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1420 e2715f69 Michael Hanselmann
1421 85f03e0d Michael Hanselmann
    return jobs
1422 e2715f69 Michael Hanselmann
1423 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1424 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1425 e2715f69 Michael Hanselmann
  def Shutdown(self):
1426 e2715f69 Michael Hanselmann
    """Stops the job queue.
1427 e2715f69 Michael Hanselmann

1428 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1429 ea03467c Iustin Pop

1430 e2715f69 Michael Hanselmann
    """
1431 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1432 85f03e0d Michael Hanselmann
1433 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
1434 a71f9c7d Guido Trotter
    self._queue_filelock = None