Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ c4feafe8

History | View | Annotate | Download (41.7 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 e2715f69 Michael Hanselmann
import threading
35 f1da30e6 Michael Hanselmann
import errno
36 f1da30e6 Michael Hanselmann
import re
37 f1048938 Iustin Pop
import time
38 5685c1a5 Michael Hanselmann
import weakref
39 498ae1cc Iustin Pop
40 e2715f69 Michael Hanselmann
from ganeti import constants
41 f1da30e6 Michael Hanselmann
from ganeti import serializer
42 e2715f69 Michael Hanselmann
from ganeti import workerpool
43 f1da30e6 Michael Hanselmann
from ganeti import opcodes
44 7a1ecaed Iustin Pop
from ganeti import errors
45 e2715f69 Michael Hanselmann
from ganeti import mcpu
46 7996a135 Iustin Pop
from ganeti import utils
47 04ab05ce Michael Hanselmann
from ganeti import jstore
48 c3f0a12f Iustin Pop
from ganeti import rpc
49 e2715f69 Michael Hanselmann
50 fbf0262f Michael Hanselmann
51 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
52 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53 e2715f69 Michael Hanselmann
54 498ae1cc Iustin Pop
55 9728ae5d Iustin Pop
class CancelJob(Exception):
56 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
57 fbf0262f Michael Hanselmann

58 fbf0262f Michael Hanselmann
  """
59 fbf0262f Michael Hanselmann
60 fbf0262f Michael Hanselmann
61 70552c46 Michael Hanselmann
def TimeStampNow():
62 ea03467c Iustin Pop
  """Returns the current timestamp.
63 ea03467c Iustin Pop

64 ea03467c Iustin Pop
  @rtype: tuple
65 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
66 ea03467c Iustin Pop

67 ea03467c Iustin Pop
  """
68 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
69 70552c46 Michael Hanselmann
70 70552c46 Michael Hanselmann
71 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
72 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
73 e2715f69 Michael Hanselmann

74 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
75 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
76 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
77 ea03467c Iustin Pop
  @ivar status: the current status
78 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
79 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
80 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
81 f1048938 Iustin Pop

82 e2715f69 Michael Hanselmann
  """
83 66d895a8 Iustin Pop
  __slots__ = ["input", "status", "result", "log",
84 66d895a8 Iustin Pop
               "start_timestamp", "end_timestamp",
85 66d895a8 Iustin Pop
               "__weakref__"]
86 66d895a8 Iustin Pop
87 85f03e0d Michael Hanselmann
  def __init__(self, op):
88 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
89 ea03467c Iustin Pop

90 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
91 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
92 ea03467c Iustin Pop

93 ea03467c Iustin Pop
    """
94 85f03e0d Michael Hanselmann
    self.input = op
95 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
96 85f03e0d Michael Hanselmann
    self.result = None
97 85f03e0d Michael Hanselmann
    self.log = []
98 70552c46 Michael Hanselmann
    self.start_timestamp = None
99 70552c46 Michael Hanselmann
    self.end_timestamp = None
100 f1da30e6 Michael Hanselmann
101 f1da30e6 Michael Hanselmann
  @classmethod
102 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
103 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
104 ea03467c Iustin Pop

105 ea03467c Iustin Pop
    @type state: dict
106 ea03467c Iustin Pop
    @param state: the serialized state
107 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
108 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
109 ea03467c Iustin Pop

110 ea03467c Iustin Pop
    """
111 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
112 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113 85f03e0d Michael Hanselmann
    obj.status = state["status"]
114 85f03e0d Michael Hanselmann
    obj.result = state["result"]
115 85f03e0d Michael Hanselmann
    obj.log = state["log"]
116 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
117 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
118 f1da30e6 Michael Hanselmann
    return obj
119 f1da30e6 Michael Hanselmann
120 f1da30e6 Michael Hanselmann
  def Serialize(self):
121 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
122 ea03467c Iustin Pop

123 ea03467c Iustin Pop
    @rtype: dict
124 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
125 ea03467c Iustin Pop

126 ea03467c Iustin Pop
    """
127 6c5a7090 Michael Hanselmann
    return {
128 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
129 6c5a7090 Michael Hanselmann
      "status": self.status,
130 6c5a7090 Michael Hanselmann
      "result": self.result,
131 6c5a7090 Michael Hanselmann
      "log": self.log,
132 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
133 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
134 6c5a7090 Michael Hanselmann
      }
135 f1048938 Iustin Pop
136 e2715f69 Michael Hanselmann
137 e2715f69 Michael Hanselmann
class _QueuedJob(object):
138 e2715f69 Michael Hanselmann
  """In-memory job representation.
139 e2715f69 Michael Hanselmann

140 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
141 ea03467c Iustin Pop
  be taken care of by users of this class.
142 ea03467c Iustin Pop

143 ea03467c Iustin Pop
  @type queue: L{JobQueue}
144 ea03467c Iustin Pop
  @ivar queue: the parent queue
145 ea03467c Iustin Pop
  @ivar id: the job ID
146 ea03467c Iustin Pop
  @type ops: list
147 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
148 ea03467c Iustin Pop
  @type log_serial: int
149 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
150 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
151 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
152 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
153 ef2df7d3 Michael Hanselmann
  @ivar lock_status: In-memory locking information for debugging
154 ea03467c Iustin Pop
  @ivar change: a Condition variable we use for waiting for job changes
155 e2715f69 Michael Hanselmann

156 e2715f69 Michael Hanselmann
  """
157 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
158 d25c1d6a Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial",
159 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
160 ef2df7d3 Michael Hanselmann
               "lock_status", "change",
161 66d895a8 Iustin Pop
               "__weakref__"]
162 66d895a8 Iustin Pop
163 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
164 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
165 ea03467c Iustin Pop

166 ea03467c Iustin Pop
    @type queue: L{JobQueue}
167 ea03467c Iustin Pop
    @param queue: our parent queue
168 ea03467c Iustin Pop
    @type job_id: job_id
169 ea03467c Iustin Pop
    @param job_id: our job id
170 ea03467c Iustin Pop
    @type ops: list
171 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
172 ea03467c Iustin Pop
        in _QueuedOpCodes
173 ea03467c Iustin Pop

174 ea03467c Iustin Pop
    """
175 e2715f69 Michael Hanselmann
    if not ops:
176 ea03467c Iustin Pop
      # TODO: use a better exception
177 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
178 e2715f69 Michael Hanselmann
179 85f03e0d Michael Hanselmann
    self.queue = queue
180 f1da30e6 Michael Hanselmann
    self.id = job_id
181 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
182 6c5a7090 Michael Hanselmann
    self.log_serial = 0
183 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
184 c56ec146 Iustin Pop
    self.start_timestamp = None
185 c56ec146 Iustin Pop
    self.end_timestamp = None
186 6c5a7090 Michael Hanselmann
187 ef2df7d3 Michael Hanselmann
    # In-memory attributes
188 ef2df7d3 Michael Hanselmann
    self.lock_status = None
189 ef2df7d3 Michael Hanselmann
190 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
191 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
192 f1da30e6 Michael Hanselmann
193 9fa2e150 Michael Hanselmann
  def __repr__(self):
194 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
195 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
196 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
197 9fa2e150 Michael Hanselmann
198 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
199 9fa2e150 Michael Hanselmann
200 f1da30e6 Michael Hanselmann
  @classmethod
201 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
202 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
203 ea03467c Iustin Pop

204 ea03467c Iustin Pop
    @type queue: L{JobQueue}
205 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
206 ea03467c Iustin Pop
    @type state: dict
207 ea03467c Iustin Pop
    @param state: the serialized state
208 ea03467c Iustin Pop
    @rtype: _JobQueue
209 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
210 ea03467c Iustin Pop

211 ea03467c Iustin Pop
    """
212 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
213 85f03e0d Michael Hanselmann
    obj.queue = queue
214 85f03e0d Michael Hanselmann
    obj.id = state["id"]
215 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
216 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
217 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
218 6c5a7090 Michael Hanselmann
219 ef2df7d3 Michael Hanselmann
    # In-memory attributes
220 ef2df7d3 Michael Hanselmann
    obj.lock_status = None
221 ef2df7d3 Michael Hanselmann
222 6c5a7090 Michael Hanselmann
    obj.ops = []
223 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
224 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
225 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
226 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
227 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
228 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
229 6c5a7090 Michael Hanselmann
230 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
231 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
232 6c5a7090 Michael Hanselmann
233 f1da30e6 Michael Hanselmann
    return obj
234 f1da30e6 Michael Hanselmann
235 f1da30e6 Michael Hanselmann
  def Serialize(self):
236 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
237 ea03467c Iustin Pop

238 ea03467c Iustin Pop
    @rtype: dict
239 ea03467c Iustin Pop
    @return: the serialized state
240 ea03467c Iustin Pop

241 ea03467c Iustin Pop
    """
242 f1da30e6 Michael Hanselmann
    return {
243 f1da30e6 Michael Hanselmann
      "id": self.id,
244 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
245 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
246 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
247 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
248 f1da30e6 Michael Hanselmann
      }
249 f1da30e6 Michael Hanselmann
250 85f03e0d Michael Hanselmann
  def CalcStatus(self):
251 ea03467c Iustin Pop
    """Compute the status of this job.
252 ea03467c Iustin Pop

253 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
254 ea03467c Iustin Pop
    based on their status, computes the job status.
255 ea03467c Iustin Pop

256 ea03467c Iustin Pop
    The algorithm is:
257 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
258 ea03467c Iustin Pop
        status will be the same
259 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
260 ea03467c Iustin Pop
          - waitlock
261 fbf0262f Michael Hanselmann
          - canceling
262 ea03467c Iustin Pop
          - running
263 ea03467c Iustin Pop

264 ea03467c Iustin Pop
        will determine the job status
265 ea03467c Iustin Pop

266 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
267 ea03467c Iustin Pop
        and the job status will be the same
268 ea03467c Iustin Pop

269 ea03467c Iustin Pop
    @return: the job status
270 ea03467c Iustin Pop

271 ea03467c Iustin Pop
    """
272 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
273 e2715f69 Michael Hanselmann
274 e2715f69 Michael Hanselmann
    all_success = True
275 85f03e0d Michael Hanselmann
    for op in self.ops:
276 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
277 e2715f69 Michael Hanselmann
        continue
278 e2715f69 Michael Hanselmann
279 e2715f69 Michael Hanselmann
      all_success = False
280 e2715f69 Michael Hanselmann
281 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
282 e2715f69 Michael Hanselmann
        pass
283 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
284 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
285 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
286 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
287 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
288 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
289 fbf0262f Michael Hanselmann
        break
290 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
291 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
292 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
293 f1da30e6 Michael Hanselmann
        break
294 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
295 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
296 4cb1d919 Michael Hanselmann
        break
297 e2715f69 Michael Hanselmann
298 e2715f69 Michael Hanselmann
    if all_success:
299 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
300 e2715f69 Michael Hanselmann
301 e2715f69 Michael Hanselmann
    return status
302 e2715f69 Michael Hanselmann
303 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
304 ea03467c Iustin Pop
    """Selectively returns the log entries.
305 ea03467c Iustin Pop

306 ea03467c Iustin Pop
    @type newer_than: None or int
307 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
308 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
309 ea03467c Iustin Pop
        than this value
310 ea03467c Iustin Pop
    @rtype: list
311 ea03467c Iustin Pop
    @return: the list of the log entries selected
312 ea03467c Iustin Pop

313 ea03467c Iustin Pop
    """
314 6c5a7090 Michael Hanselmann
    if newer_than is None:
315 6c5a7090 Michael Hanselmann
      serial = -1
316 6c5a7090 Michael Hanselmann
    else:
317 6c5a7090 Michael Hanselmann
      serial = newer_than
318 6c5a7090 Michael Hanselmann
319 6c5a7090 Michael Hanselmann
    entries = []
320 6c5a7090 Michael Hanselmann
    for op in self.ops:
321 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
322 6c5a7090 Michael Hanselmann
323 6c5a7090 Michael Hanselmann
    return entries
324 6c5a7090 Michael Hanselmann
325 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
326 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
327 34327f51 Iustin Pop

328 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
329 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
330 34327f51 Iustin Pop
    finalised are not changed.
331 34327f51 Iustin Pop

332 34327f51 Iustin Pop
    @param status: a given opcode status
333 34327f51 Iustin Pop
    @param result: the opcode result
334 34327f51 Iustin Pop

335 34327f51 Iustin Pop
    """
336 34327f51 Iustin Pop
    not_marked = True
337 34327f51 Iustin Pop
    for op in self.ops:
338 34327f51 Iustin Pop
      if op.status in constants.OPS_FINALIZED:
339 34327f51 Iustin Pop
        assert not_marked, "Finalized opcodes found after non-finalized ones"
340 34327f51 Iustin Pop
        continue
341 34327f51 Iustin Pop
      op.status = status
342 34327f51 Iustin Pop
      op.result = result
343 34327f51 Iustin Pop
      not_marked = False
344 34327f51 Iustin Pop
345 f1048938 Iustin Pop
346 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
347 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
348 031a3e57 Michael Hanselmann
    """Initializes this class.
349 ea03467c Iustin Pop

350 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
351 031a3e57 Michael Hanselmann
    @param queue: Job queue
352 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
353 031a3e57 Michael Hanselmann
    @param job: Job object
354 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
355 031a3e57 Michael Hanselmann
    @param op: OpCode
356 031a3e57 Michael Hanselmann

357 031a3e57 Michael Hanselmann
    """
358 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
359 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
360 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
361 031a3e57 Michael Hanselmann
362 031a3e57 Michael Hanselmann
    self._queue = queue
363 031a3e57 Michael Hanselmann
    self._job = job
364 031a3e57 Michael Hanselmann
    self._op = op
365 031a3e57 Michael Hanselmann
366 031a3e57 Michael Hanselmann
  def NotifyStart(self):
367 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
368 e92376d7 Iustin Pop

369 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
370 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
371 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
372 031a3e57 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
373 e92376d7 Iustin Pop

374 e92376d7 Iustin Pop
    """
375 031a3e57 Michael Hanselmann
    self._queue.acquire()
376 e92376d7 Iustin Pop
    try:
377 031a3e57 Michael Hanselmann
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
378 031a3e57 Michael Hanselmann
                                 constants.OP_STATUS_CANCELING)
379 fbf0262f Michael Hanselmann
380 ef2df7d3 Michael Hanselmann
      # All locks are acquired by now
381 ef2df7d3 Michael Hanselmann
      self._job.lock_status = None
382 ef2df7d3 Michael Hanselmann
383 fbf0262f Michael Hanselmann
      # Cancel here if we were asked to
384 031a3e57 Michael Hanselmann
      if self._op.status == constants.OP_STATUS_CANCELING:
385 fbf0262f Michael Hanselmann
        raise CancelJob()
386 fbf0262f Michael Hanselmann
387 031a3e57 Michael Hanselmann
      self._op.status = constants.OP_STATUS_RUNNING
388 e92376d7 Iustin Pop
    finally:
389 031a3e57 Michael Hanselmann
      self._queue.release()
390 031a3e57 Michael Hanselmann
391 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
392 031a3e57 Michael Hanselmann
    """Append a log entry.
393 031a3e57 Michael Hanselmann

394 031a3e57 Michael Hanselmann
    """
395 031a3e57 Michael Hanselmann
    assert len(args) < 3
396 031a3e57 Michael Hanselmann
397 031a3e57 Michael Hanselmann
    if len(args) == 1:
398 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
399 031a3e57 Michael Hanselmann
      log_msg = args[0]
400 031a3e57 Michael Hanselmann
    else:
401 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
402 031a3e57 Michael Hanselmann
403 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
404 031a3e57 Michael Hanselmann
    # precision.
405 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
406 e92376d7 Iustin Pop
407 031a3e57 Michael Hanselmann
    self._queue.acquire()
408 031a3e57 Michael Hanselmann
    try:
409 031a3e57 Michael Hanselmann
      self._job.log_serial += 1
410 031a3e57 Michael Hanselmann
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
411 031a3e57 Michael Hanselmann
412 031a3e57 Michael Hanselmann
      self._job.change.notifyAll()
413 031a3e57 Michael Hanselmann
    finally:
414 031a3e57 Michael Hanselmann
      self._queue.release()
415 031a3e57 Michael Hanselmann
416 ef2df7d3 Michael Hanselmann
  def ReportLocks(self, msg):
417 ef2df7d3 Michael Hanselmann
    """Write locking information to the job.
418 ef2df7d3 Michael Hanselmann

419 ef2df7d3 Michael Hanselmann
    Called whenever the LU processor is waiting for a lock or has acquired one.
420 ef2df7d3 Michael Hanselmann

421 ef2df7d3 Michael Hanselmann
    """
422 ef2df7d3 Michael Hanselmann
    # Not getting the queue lock because this is a single assignment
423 ef2df7d3 Michael Hanselmann
    self._job.lock_status = msg
424 ef2df7d3 Michael Hanselmann
425 031a3e57 Michael Hanselmann
426 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
427 031a3e57 Michael Hanselmann
  """The actual job workers.
428 031a3e57 Michael Hanselmann

429 031a3e57 Michael Hanselmann
  """
430 7260cfbe Iustin Pop
  def RunTask(self, job): # pylint: disable-msg=W0221
431 e2715f69 Michael Hanselmann
    """Job executor.
432 e2715f69 Michael Hanselmann

433 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
434 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
435 e2715f69 Michael Hanselmann

436 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
437 ea03467c Iustin Pop
    @param job: the job to be processed
438 ea03467c Iustin Pop

439 e2715f69 Michael Hanselmann
    """
440 02fc74da Michael Hanselmann
    logging.info("Processing job %s", job.id)
441 adfa97e3 Guido Trotter
    proc = mcpu.Processor(self.pool.queue.context, job.id)
442 031a3e57 Michael Hanselmann
    queue = job.queue
443 e2715f69 Michael Hanselmann
    try:
444 85f03e0d Michael Hanselmann
      try:
445 85f03e0d Michael Hanselmann
        count = len(job.ops)
446 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
447 d21d09d6 Iustin Pop
          op_summary = op.input.Summary()
448 f6424741 Iustin Pop
          if op.status == constants.OP_STATUS_SUCCESS:
449 f6424741 Iustin Pop
            # this is a job that was partially completed before master
450 f6424741 Iustin Pop
            # daemon shutdown, so it can be expected that some opcodes
451 f6424741 Iustin Pop
            # are already completed successfully (if any did error
452 f6424741 Iustin Pop
            # out, then the whole job should have been aborted and not
453 f6424741 Iustin Pop
            # resubmitted for processing)
454 f6424741 Iustin Pop
            logging.info("Op %s/%s: opcode %s already processed, skipping",
455 f6424741 Iustin Pop
                         idx + 1, count, op_summary)
456 f6424741 Iustin Pop
            continue
457 85f03e0d Michael Hanselmann
          try:
458 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
459 d21d09d6 Iustin Pop
                         op_summary)
460 85f03e0d Michael Hanselmann
461 85f03e0d Michael Hanselmann
            queue.acquire()
462 85f03e0d Michael Hanselmann
            try:
463 df0fb067 Iustin Pop
              if op.status == constants.OP_STATUS_CANCELED:
464 df0fb067 Iustin Pop
                raise CancelJob()
465 fbf0262f Michael Hanselmann
              assert op.status == constants.OP_STATUS_QUEUED
466 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
467 85f03e0d Michael Hanselmann
              op.result = None
468 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
469 c56ec146 Iustin Pop
              if idx == 0: # first opcode
470 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
471 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
472 85f03e0d Michael Hanselmann
473 38206f3c Iustin Pop
              input_opcode = op.input
474 85f03e0d Michael Hanselmann
            finally:
475 85f03e0d Michael Hanselmann
              queue.release()
476 85f03e0d Michael Hanselmann
477 031a3e57 Michael Hanselmann
            # Make sure not to hold queue lock while calling ExecOpCode
478 031a3e57 Michael Hanselmann
            result = proc.ExecOpCode(input_opcode,
479 ef2df7d3 Michael Hanselmann
                                     _OpExecCallbacks(queue, job, op))
480 85f03e0d Michael Hanselmann
481 85f03e0d Michael Hanselmann
            queue.acquire()
482 85f03e0d Michael Hanselmann
            try:
483 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
484 85f03e0d Michael Hanselmann
              op.result = result
485 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
486 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
487 85f03e0d Michael Hanselmann
            finally:
488 85f03e0d Michael Hanselmann
              queue.release()
489 85f03e0d Michael Hanselmann
490 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Successfully finished opcode %s",
491 d21d09d6 Iustin Pop
                         idx + 1, count, op_summary)
492 fbf0262f Michael Hanselmann
          except CancelJob:
493 fbf0262f Michael Hanselmann
            # Will be handled further up
494 fbf0262f Michael Hanselmann
            raise
495 85f03e0d Michael Hanselmann
          except Exception, err:
496 85f03e0d Michael Hanselmann
            queue.acquire()
497 85f03e0d Michael Hanselmann
            try:
498 85f03e0d Michael Hanselmann
              try:
499 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
500 bcb66fca Iustin Pop
                if isinstance(err, errors.GenericError):
501 bcb66fca Iustin Pop
                  op.result = errors.EncodeException(err)
502 bcb66fca Iustin Pop
                else:
503 bcb66fca Iustin Pop
                  op.result = str(err)
504 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
505 0f6be82a Iustin Pop
                logging.info("Op %s/%s: Error in opcode %s: %s",
506 0f6be82a Iustin Pop
                             idx + 1, count, op_summary, err)
507 85f03e0d Michael Hanselmann
              finally:
508 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
509 85f03e0d Michael Hanselmann
            finally:
510 85f03e0d Michael Hanselmann
              queue.release()
511 85f03e0d Michael Hanselmann
            raise
512 85f03e0d Michael Hanselmann
513 fbf0262f Michael Hanselmann
      except CancelJob:
514 fbf0262f Michael Hanselmann
        queue.acquire()
515 fbf0262f Michael Hanselmann
        try:
516 fbf0262f Michael Hanselmann
          queue.CancelJobUnlocked(job)
517 fbf0262f Michael Hanselmann
        finally:
518 fbf0262f Michael Hanselmann
          queue.release()
519 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
520 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
521 85f03e0d Michael Hanselmann
      except:
522 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
523 e2715f69 Michael Hanselmann
    finally:
524 85f03e0d Michael Hanselmann
      queue.acquire()
525 85f03e0d Michael Hanselmann
      try:
526 65548ed5 Michael Hanselmann
        try:
527 ef2df7d3 Michael Hanselmann
          job.lock_status = None
528 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
529 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
530 65548ed5 Michael Hanselmann
        finally:
531 65548ed5 Michael Hanselmann
          job_id = job.id
532 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
533 85f03e0d Michael Hanselmann
      finally:
534 85f03e0d Michael Hanselmann
        queue.release()
535 ef2df7d3 Michael Hanselmann
536 02fc74da Michael Hanselmann
      logging.info("Finished job %s, status = %s", job_id, status)
537 e2715f69 Michael Hanselmann
538 e2715f69 Michael Hanselmann
539 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
540 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
541 ea03467c Iustin Pop

542 ea03467c Iustin Pop
  """
543 5bdce580 Michael Hanselmann
  def __init__(self, queue):
544 89e2b4d2 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
545 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
546 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
547 5bdce580 Michael Hanselmann
    self.queue = queue
548 e2715f69 Michael Hanselmann
549 e2715f69 Michael Hanselmann
550 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
551 6c881c52 Iustin Pop
  """Decorator for "public" functions.
552 ea03467c Iustin Pop

553 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
554 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
555 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
556 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
557 6c881c52 Iustin Pop
  a '_queue_lock' argument.
558 ea03467c Iustin Pop

559 6c881c52 Iustin Pop
  @warning: Use this decorator only after utils.LockedMethod!
560 f1da30e6 Michael Hanselmann

561 6c881c52 Iustin Pop
  Example::
562 6c881c52 Iustin Pop
    @utils.LockedMethod
563 6c881c52 Iustin Pop
    @_RequireOpenQueue
564 6c881c52 Iustin Pop
    def Example(self):
565 6c881c52 Iustin Pop
      pass
566 db37da70 Michael Hanselmann

567 6c881c52 Iustin Pop
  """
568 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
569 7260cfbe Iustin Pop
    # pylint: disable-msg=W0212
570 6c881c52 Iustin Pop
    assert self._queue_lock is not None, "Queue should be open"
571 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
572 6c881c52 Iustin Pop
  return wrapper
573 db37da70 Michael Hanselmann
574 db37da70 Michael Hanselmann
575 6c881c52 Iustin Pop
class JobQueue(object):
576 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
577 db37da70 Michael Hanselmann

578 6c881c52 Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
579 6c881c52 Iustin Pop

580 6c881c52 Iustin Pop
  """
581 6c881c52 Iustin Pop
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
582 db37da70 Michael Hanselmann
583 85f03e0d Michael Hanselmann
  def __init__(self, context):
584 ea03467c Iustin Pop
    """Constructor for JobQueue.
585 ea03467c Iustin Pop

586 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
587 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
588 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
589 ea03467c Iustin Pop
    running).
590 ea03467c Iustin Pop

591 ea03467c Iustin Pop
    @type context: GanetiContext
592 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
593 ea03467c Iustin Pop
        data and other ganeti objects
594 ea03467c Iustin Pop

595 ea03467c Iustin Pop
    """
596 5bdce580 Michael Hanselmann
    self.context = context
597 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
598 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
599 f1da30e6 Michael Hanselmann
600 85f03e0d Michael Hanselmann
    # Locking
601 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
602 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
603 85f03e0d Michael Hanselmann
    self.release = self._lock.release
604 85f03e0d Michael Hanselmann
605 04ab05ce Michael Hanselmann
    # Initialize
606 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
607 f1da30e6 Michael Hanselmann
608 04ab05ce Michael Hanselmann
    # Read serial file
609 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
610 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
611 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
612 c4beba1c Iustin Pop
613 23752136 Michael Hanselmann
    # Get initial list of nodes
614 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
615 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
616 59303563 Iustin Pop
                       if n.master_candidate)
617 8e00939c Michael Hanselmann
618 8e00939c Michael Hanselmann
    # Remove master node
619 8e00939c Michael Hanselmann
    try:
620 99aabbed Iustin Pop
      del self._nodes[self._my_hostname]
621 33987705 Iustin Pop
    except KeyError:
622 8e00939c Michael Hanselmann
      pass
623 23752136 Michael Hanselmann
624 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
625 23752136 Michael Hanselmann
626 85f03e0d Michael Hanselmann
    # Setup worker pool
627 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
628 85f03e0d Michael Hanselmann
    try:
629 16714921 Michael Hanselmann
      # We need to lock here because WorkerPool.AddTask() may start a job while
630 16714921 Michael Hanselmann
      # we're still doing our work.
631 16714921 Michael Hanselmann
      self.acquire()
632 16714921 Michael Hanselmann
      try:
633 711b5124 Michael Hanselmann
        logging.info("Inspecting job queue")
634 711b5124 Michael Hanselmann
635 711b5124 Michael Hanselmann
        all_job_ids = self._GetJobIDsUnlocked()
636 b7cb9024 Michael Hanselmann
        jobs_count = len(all_job_ids)
637 711b5124 Michael Hanselmann
        lastinfo = time.time()
638 711b5124 Michael Hanselmann
        for idx, job_id in enumerate(all_job_ids):
639 711b5124 Michael Hanselmann
          # Give an update every 1000 jobs or 10 seconds
640 b7cb9024 Michael Hanselmann
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
641 b7cb9024 Michael Hanselmann
              idx == (jobs_count - 1)):
642 711b5124 Michael Hanselmann
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
643 b7cb9024 Michael Hanselmann
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
644 711b5124 Michael Hanselmann
            lastinfo = time.time()
645 711b5124 Michael Hanselmann
646 711b5124 Michael Hanselmann
          job = self._LoadJobUnlocked(job_id)
647 711b5124 Michael Hanselmann
648 16714921 Michael Hanselmann
          # a failure in loading the job can cause 'None' to be returned
649 16714921 Michael Hanselmann
          if job is None:
650 16714921 Michael Hanselmann
            continue
651 94ed59a5 Iustin Pop
652 16714921 Michael Hanselmann
          status = job.CalcStatus()
653 85f03e0d Michael Hanselmann
654 16714921 Michael Hanselmann
          if status in (constants.JOB_STATUS_QUEUED, ):
655 16714921 Michael Hanselmann
            self._wpool.AddTask(job)
656 85f03e0d Michael Hanselmann
657 16714921 Michael Hanselmann
          elif status in (constants.JOB_STATUS_RUNNING,
658 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK,
659 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_CANCELING):
660 16714921 Michael Hanselmann
            logging.warning("Unfinished job %s found: %s", job.id, job)
661 16714921 Michael Hanselmann
            try:
662 34327f51 Iustin Pop
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
663 34327f51 Iustin Pop
                                    "Unclean master daemon shutdown")
664 16714921 Michael Hanselmann
            finally:
665 16714921 Michael Hanselmann
              self.UpdateJobUnlocked(job)
666 711b5124 Michael Hanselmann
667 711b5124 Michael Hanselmann
        logging.info("Job queue inspection finished")
668 16714921 Michael Hanselmann
      finally:
669 16714921 Michael Hanselmann
        self.release()
670 16714921 Michael Hanselmann
    except:
671 16714921 Michael Hanselmann
      self._wpool.TerminateWorkers()
672 16714921 Michael Hanselmann
      raise
673 85f03e0d Michael Hanselmann
674 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
675 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
676 99aabbed Iustin Pop
  def AddNode(self, node):
677 99aabbed Iustin Pop
    """Register a new node with the queue.
678 99aabbed Iustin Pop

679 99aabbed Iustin Pop
    @type node: L{objects.Node}
680 99aabbed Iustin Pop
    @param node: the node object to be added
681 99aabbed Iustin Pop

682 99aabbed Iustin Pop
    """
683 99aabbed Iustin Pop
    node_name = node.name
684 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
685 23752136 Michael Hanselmann
686 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
687 c8457ce7 Iustin Pop
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
688 3cebe102 Michael Hanselmann
    msg = result.fail_msg
689 c8457ce7 Iustin Pop
    if msg:
690 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
691 c8457ce7 Iustin Pop
                      node_name, msg)
692 23752136 Michael Hanselmann
693 59303563 Iustin Pop
    if not node.master_candidate:
694 59303563 Iustin Pop
      # remove if existing, ignoring errors
695 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
696 59303563 Iustin Pop
      # and skip the replication of the job ids
697 59303563 Iustin Pop
      return
698 59303563 Iustin Pop
699 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
700 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
701 23752136 Michael Hanselmann
702 d2e03a33 Michael Hanselmann
    # Upload current serial file
703 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
704 d2e03a33 Michael Hanselmann
705 d2e03a33 Michael Hanselmann
    for file_name in files:
706 9f774ee8 Michael Hanselmann
      # Read file content
707 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
708 9f774ee8 Michael Hanselmann
709 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
710 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
711 a3811745 Michael Hanselmann
                                                  file_name, content)
712 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
713 c8457ce7 Iustin Pop
      if msg:
714 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
715 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
716 d2e03a33 Michael Hanselmann
717 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
718 d2e03a33 Michael Hanselmann
719 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
720 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
721 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
722 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
723 ea03467c Iustin Pop

724 ea03467c Iustin Pop
    @type node_name: str
725 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
726 ea03467c Iustin Pop

727 ea03467c Iustin Pop
    """
728 23752136 Michael Hanselmann
    try:
729 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
730 99aabbed Iustin Pop
      del self._nodes[node_name]
731 d2e03a33 Michael Hanselmann
    except KeyError:
732 23752136 Michael Hanselmann
      pass
733 23752136 Michael Hanselmann
734 7e950d31 Iustin Pop
  @staticmethod
735 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
736 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
737 ea03467c Iustin Pop

738 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
739 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
740 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
741 ea03467c Iustin Pop

742 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
743 ea03467c Iustin Pop
    @type nodes: list
744 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
745 ea03467c Iustin Pop
    @type failmsg: str
746 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
747 ea03467c Iustin Pop

748 ea03467c Iustin Pop
    """
749 e74798c1 Michael Hanselmann
    failed = []
750 e74798c1 Michael Hanselmann
    success = []
751 e74798c1 Michael Hanselmann
752 e74798c1 Michael Hanselmann
    for node in nodes:
753 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
754 c8457ce7 Iustin Pop
      if msg:
755 e74798c1 Michael Hanselmann
        failed.append(node)
756 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
757 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
758 c8457ce7 Iustin Pop
      else:
759 c8457ce7 Iustin Pop
        success.append(node)
760 e74798c1 Michael Hanselmann
761 e74798c1 Michael Hanselmann
    # +1 for the master node
762 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
763 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
764 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
765 e74798c1 Michael Hanselmann
766 99aabbed Iustin Pop
  def _GetNodeIp(self):
767 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
768 99aabbed Iustin Pop

769 ea03467c Iustin Pop
    @rtype: (list, list)
770 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
771 ea03467c Iustin Pop
        names and the second one with the node addresses
772 ea03467c Iustin Pop

773 99aabbed Iustin Pop
    """
774 99aabbed Iustin Pop
    name_list = self._nodes.keys()
775 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
776 99aabbed Iustin Pop
    return name_list, addr_list
777 99aabbed Iustin Pop
778 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
779 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
780 8e00939c Michael Hanselmann

781 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
782 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
783 ea03467c Iustin Pop

784 ea03467c Iustin Pop
    @type file_name: str
785 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
786 ea03467c Iustin Pop
    @type data: str
787 ea03467c Iustin Pop
    @param data: the new contents of the file
788 ea03467c Iustin Pop

789 8e00939c Michael Hanselmann
    """
790 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
791 8e00939c Michael Hanselmann
792 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
793 a3811745 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
794 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
795 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
796 23752136 Michael Hanselmann
797 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
798 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
799 ea03467c Iustin Pop

800 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
801 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
802 ea03467c Iustin Pop

803 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
804 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
805 ea03467c Iustin Pop

806 ea03467c Iustin Pop
    """
807 dd875d32 Michael Hanselmann
    # Rename them locally
808 d7fd1f28 Michael Hanselmann
    for old, new in rename:
809 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
810 abc1f2ce Michael Hanselmann
811 dd875d32 Michael Hanselmann
    # ... and on all nodes
812 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
813 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
814 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
815 abc1f2ce Michael Hanselmann
816 7e950d31 Iustin Pop
  @staticmethod
817 7e950d31 Iustin Pop
  def _FormatJobID(job_id):
818 ea03467c Iustin Pop
    """Convert a job ID to string format.
819 ea03467c Iustin Pop

820 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
821 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
822 ea03467c Iustin Pop
    abstract this change.
823 ea03467c Iustin Pop

824 ea03467c Iustin Pop
    @type job_id: int or long
825 ea03467c Iustin Pop
    @param job_id: the numeric job id
826 ea03467c Iustin Pop
    @rtype: str
827 ea03467c Iustin Pop
    @return: the formatted job id
828 ea03467c Iustin Pop

829 ea03467c Iustin Pop
    """
830 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
831 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
832 85f03e0d Michael Hanselmann
    if job_id < 0:
833 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
834 85f03e0d Michael Hanselmann
835 85f03e0d Michael Hanselmann
    return str(job_id)
836 85f03e0d Michael Hanselmann
837 58b22b6e Michael Hanselmann
  @classmethod
838 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
839 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
840 58b22b6e Michael Hanselmann

841 58b22b6e Michael Hanselmann
    @type job_id: str
842 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
843 58b22b6e Michael Hanselmann
    @rtype: str
844 58b22b6e Michael Hanselmann
    @return: Directory name
845 58b22b6e Michael Hanselmann

846 58b22b6e Michael Hanselmann
    """
847 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
848 58b22b6e Michael Hanselmann
849 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
850 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
851 f1da30e6 Michael Hanselmann

852 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
853 f1da30e6 Michael Hanselmann

854 009e73d0 Iustin Pop
    @type count: integer
855 009e73d0 Iustin Pop
    @param count: how many serials to return
856 ea03467c Iustin Pop
    @rtype: str
857 ea03467c Iustin Pop
    @return: a string representing the job identifier.
858 f1da30e6 Michael Hanselmann

859 f1da30e6 Michael Hanselmann
    """
860 009e73d0 Iustin Pop
    assert count > 0
861 f1da30e6 Michael Hanselmann
    # New number
862 009e73d0 Iustin Pop
    serial = self._last_serial + count
863 f1da30e6 Michael Hanselmann
864 f1da30e6 Michael Hanselmann
    # Write to file
865 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
866 23752136 Michael Hanselmann
                                        "%s\n" % serial)
867 f1da30e6 Michael Hanselmann
868 009e73d0 Iustin Pop
    result = [self._FormatJobID(v)
869 009e73d0 Iustin Pop
              for v in range(self._last_serial, serial + 1)]
870 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
871 f1da30e6 Michael Hanselmann
    self._last_serial = serial
872 f1da30e6 Michael Hanselmann
873 009e73d0 Iustin Pop
    return result
874 f1da30e6 Michael Hanselmann
875 85f03e0d Michael Hanselmann
  @staticmethod
876 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
877 ea03467c Iustin Pop
    """Returns the job file for a given job id.
878 ea03467c Iustin Pop

879 ea03467c Iustin Pop
    @type job_id: str
880 ea03467c Iustin Pop
    @param job_id: the job identifier
881 ea03467c Iustin Pop
    @rtype: str
882 ea03467c Iustin Pop
    @return: the path to the job file
883 ea03467c Iustin Pop

884 ea03467c Iustin Pop
    """
885 c4feafe8 Iustin Pop
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
886 f1da30e6 Michael Hanselmann
887 58b22b6e Michael Hanselmann
  @classmethod
888 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
889 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
890 ea03467c Iustin Pop

891 ea03467c Iustin Pop
    @type job_id: str
892 ea03467c Iustin Pop
    @param job_id: the job identifier
893 ea03467c Iustin Pop
    @rtype: str
894 ea03467c Iustin Pop
    @return: the path to the archived job file
895 ea03467c Iustin Pop

896 ea03467c Iustin Pop
    """
897 58b22b6e Michael Hanselmann
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
898 c4feafe8 Iustin Pop
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR, path)
899 0cb94105 Michael Hanselmann
900 85f03e0d Michael Hanselmann
  @classmethod
901 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
902 ea03467c Iustin Pop
    """Extract the job id from a filename.
903 ea03467c Iustin Pop

904 ea03467c Iustin Pop
    @type name: str
905 ea03467c Iustin Pop
    @param name: the job filename
906 ea03467c Iustin Pop
    @rtype: job id or None
907 ea03467c Iustin Pop
    @return: the job id corresponding to the given filename,
908 ea03467c Iustin Pop
        or None if the filename does not represent a valid
909 ea03467c Iustin Pop
        job file
910 ea03467c Iustin Pop

911 ea03467c Iustin Pop
    """
912 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
913 fae737ac Michael Hanselmann
    if m:
914 fae737ac Michael Hanselmann
      return m.group(1)
915 fae737ac Michael Hanselmann
    else:
916 fae737ac Michael Hanselmann
      return None
917 fae737ac Michael Hanselmann
918 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
919 911a495b Iustin Pop
    """Return all known job IDs.
920 911a495b Iustin Pop

921 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
922 911a495b Iustin Pop
    included. Currently this argument is unused.
923 911a495b Iustin Pop

924 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
925 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
926 ac0930b9 Iustin Pop
    extra IDs).
927 ac0930b9 Iustin Pop

928 ea03467c Iustin Pop
    @rtype: list
929 ea03467c Iustin Pop
    @return: the list of job IDs
930 ea03467c Iustin Pop

931 911a495b Iustin Pop
    """
932 2d54e29c Iustin Pop
    # pylint: disable-msg=W0613
933 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
934 3b87986e Iustin Pop
    jlist = utils.NiceSort(jlist)
935 f0d874fe Iustin Pop
    return jlist
936 911a495b Iustin Pop
937 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
938 ea03467c Iustin Pop
    """Returns the list of current job files.
939 ea03467c Iustin Pop

940 ea03467c Iustin Pop
    @rtype: list
941 ea03467c Iustin Pop
    @return: the list of job file names
942 ea03467c Iustin Pop

943 ea03467c Iustin Pop
    """
944 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
945 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
946 f1da30e6 Michael Hanselmann
947 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
948 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
949 ea03467c Iustin Pop

950 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
951 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
952 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
953 ea03467c Iustin Pop

954 ea03467c Iustin Pop
    @param job_id: the job id
955 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
956 ea03467c Iustin Pop
    @return: either None or the job object
957 ea03467c Iustin Pop

958 ea03467c Iustin Pop
    """
959 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
960 5685c1a5 Michael Hanselmann
    if job:
961 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
962 5685c1a5 Michael Hanselmann
      return job
963 ac0930b9 Iustin Pop
964 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
965 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
966 f1da30e6 Michael Hanselmann
    try:
967 13998ef2 Michael Hanselmann
      raw_data = utils.ReadFile(filepath)
968 f1da30e6 Michael Hanselmann
    except IOError, err:
969 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
970 f1da30e6 Michael Hanselmann
        return None
971 f1da30e6 Michael Hanselmann
      raise
972 13998ef2 Michael Hanselmann
973 13998ef2 Michael Hanselmann
    data = serializer.LoadJson(raw_data)
974 f1da30e6 Michael Hanselmann
975 94ed59a5 Iustin Pop
    try:
976 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
977 7260cfbe Iustin Pop
    except Exception, err: # pylint: disable-msg=W0703
978 94ed59a5 Iustin Pop
      new_path = self._GetArchivedJobPath(job_id)
979 94ed59a5 Iustin Pop
      if filepath == new_path:
980 94ed59a5 Iustin Pop
        # job already archived (future case)
981 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s", job_id)
982 94ed59a5 Iustin Pop
      else:
983 94ed59a5 Iustin Pop
        # non-archived case
984 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s, will archive.", job_id)
985 d7fd1f28 Michael Hanselmann
        self._RenameFilesUnlocked([(filepath, new_path)])
986 94ed59a5 Iustin Pop
      return None
987 94ed59a5 Iustin Pop
988 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
989 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
990 ac0930b9 Iustin Pop
    return job
991 f1da30e6 Michael Hanselmann
992 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
993 ea03467c Iustin Pop
    """Return a list of jobs based on their IDs.
994 ea03467c Iustin Pop

995 ea03467c Iustin Pop
    @type job_ids: list
996 ea03467c Iustin Pop
    @param job_ids: either an empty list (meaning all jobs),
997 ea03467c Iustin Pop
        or a list of job IDs
998 ea03467c Iustin Pop
    @rtype: list
999 ea03467c Iustin Pop
    @return: the list of job objects
1000 ea03467c Iustin Pop

1001 ea03467c Iustin Pop
    """
1002 911a495b Iustin Pop
    if not job_ids:
1003 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
1004 f1da30e6 Michael Hanselmann
1005 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
1006 f1da30e6 Michael Hanselmann
1007 686d7433 Iustin Pop
  @staticmethod
1008 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
1009 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
1010 686d7433 Iustin Pop

1011 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
1012 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
1013 686d7433 Iustin Pop

1014 ea03467c Iustin Pop
    @rtype: boolean
1015 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
1016 ea03467c Iustin Pop

1017 686d7433 Iustin Pop
    """
1018 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1019 686d7433 Iustin Pop
1020 3ccafd0e Iustin Pop
  @staticmethod
1021 3ccafd0e Iustin Pop
  def SetDrainFlag(drain_flag):
1022 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
1023 3ccafd0e Iustin Pop

1024 3ccafd0e Iustin Pop
    This is similar to the function L{backend.JobQueueSetDrainFlag},
1025 3ccafd0e Iustin Pop
    and in the future we might merge them.
1026 3ccafd0e Iustin Pop

1027 ea03467c Iustin Pop
    @type drain_flag: boolean
1028 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
1029 ea03467c Iustin Pop

1030 3ccafd0e Iustin Pop
    """
1031 3ccafd0e Iustin Pop
    if drain_flag:
1032 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1033 3ccafd0e Iustin Pop
    else:
1034 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1035 3ccafd0e Iustin Pop
    return True
1036 3ccafd0e Iustin Pop
1037 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1038 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
1039 85f03e0d Michael Hanselmann
    """Create and store a new job.
1040 f1da30e6 Michael Hanselmann

1041 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
1042 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
1043 c3f0a12f Iustin Pop

1044 009e73d0 Iustin Pop
    @type job_id: job ID
1045 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
1046 c3f0a12f Iustin Pop
    @type ops: list
1047 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
1048 ea03467c Iustin Pop
    @rtype: job ID
1049 ea03467c Iustin Pop
    @return: the job ID of the newly created job
1050 ea03467c Iustin Pop
    @raise errors.JobQueueDrainError: if the job is marked for draining
1051 c3f0a12f Iustin Pop

1052 c3f0a12f Iustin Pop
    """
1053 686d7433 Iustin Pop
    if self._IsQueueMarkedDrain():
1054 2971c913 Iustin Pop
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1055 f87b405e Michael Hanselmann
1056 f87b405e Michael Hanselmann
    # Check job queue size
1057 f87b405e Michael Hanselmann
    size = len(self._ListJobFiles())
1058 f87b405e Michael Hanselmann
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1059 f87b405e Michael Hanselmann
      # TODO: Autoarchive jobs. Make sure it's not done on every job
1060 f87b405e Michael Hanselmann
      # submission, though.
1061 f87b405e Michael Hanselmann
      #size = ...
1062 f87b405e Michael Hanselmann
      pass
1063 f87b405e Michael Hanselmann
1064 f87b405e Michael Hanselmann
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1065 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
1066 f87b405e Michael Hanselmann
1067 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
1068 f1da30e6 Michael Hanselmann
1069 f1da30e6 Michael Hanselmann
    # Write to disk
1070 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
1071 f1da30e6 Michael Hanselmann
1072 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
1073 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
1074 ac0930b9 Iustin Pop
1075 85f03e0d Michael Hanselmann
    # Add to worker pool
1076 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
1077 85f03e0d Michael Hanselmann
1078 85f03e0d Michael Hanselmann
    return job.id
1079 f1da30e6 Michael Hanselmann
1080 2971c913 Iustin Pop
  @utils.LockedMethod
1081 2971c913 Iustin Pop
  @_RequireOpenQueue
1082 2971c913 Iustin Pop
  def SubmitJob(self, ops):
1083 2971c913 Iustin Pop
    """Create and store a new job.
1084 2971c913 Iustin Pop

1085 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1086 2971c913 Iustin Pop

1087 2971c913 Iustin Pop
    """
1088 009e73d0 Iustin Pop
    job_id = self._NewSerialsUnlocked(1)[0]
1089 009e73d0 Iustin Pop
    return self._SubmitJobUnlocked(job_id, ops)
1090 2971c913 Iustin Pop
1091 2971c913 Iustin Pop
  @utils.LockedMethod
1092 2971c913 Iustin Pop
  @_RequireOpenQueue
1093 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
1094 2971c913 Iustin Pop
    """Create and store multiple jobs.
1095 2971c913 Iustin Pop

1096 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1097 2971c913 Iustin Pop

1098 2971c913 Iustin Pop
    """
1099 2971c913 Iustin Pop
    results = []
1100 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1101 009e73d0 Iustin Pop
    for job_id, ops in zip(all_job_ids, jobs):
1102 2971c913 Iustin Pop
      try:
1103 009e73d0 Iustin Pop
        data = self._SubmitJobUnlocked(job_id, ops)
1104 2971c913 Iustin Pop
        status = True
1105 2971c913 Iustin Pop
      except errors.GenericError, err:
1106 2971c913 Iustin Pop
        data = str(err)
1107 2971c913 Iustin Pop
        status = False
1108 2971c913 Iustin Pop
      results.append((status, data))
1109 2971c913 Iustin Pop
1110 2971c913 Iustin Pop
    return results
1111 2971c913 Iustin Pop
1112 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1113 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
1114 ea03467c Iustin Pop
    """Update a job's on disk storage.
1115 ea03467c Iustin Pop

1116 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
1117 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
1118 ea03467c Iustin Pop
    nodes.
1119 ea03467c Iustin Pop

1120 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1121 ea03467c Iustin Pop
    @param job: the changed job
1122 ea03467c Iustin Pop

1123 ea03467c Iustin Pop
    """
1124 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
1125 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
1126 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
1127 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
1128 ac0930b9 Iustin Pop
1129 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
1130 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
1131 dfe57c22 Michael Hanselmann
1132 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
1133 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
1134 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1135 5c735209 Iustin Pop
                        timeout):
1136 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
1137 6c5a7090 Michael Hanselmann

1138 6c5a7090 Michael Hanselmann
    @type job_id: string
1139 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
1140 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1141 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1142 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1143 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1144 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1145 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1146 5c735209 Iustin Pop
    @type timeout: float
1147 5c735209 Iustin Pop
    @param timeout: maximum time to wait
1148 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1149 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1150 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1151 ea03467c Iustin Pop

1152 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1153 ea03467c Iustin Pop
        we instead return a special value,
1154 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1155 ea03467c Iustin Pop
        as such by the clients
1156 6c5a7090 Michael Hanselmann

1157 6c5a7090 Michael Hanselmann
    """
1158 6bcb1446 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1159 6bcb1446 Michael Hanselmann
    if not job:
1160 6bcb1446 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1161 6bcb1446 Michael Hanselmann
      return None
1162 5c735209 Iustin Pop
1163 6bcb1446 Michael Hanselmann
    def _CheckForChanges():
1164 6bcb1446 Michael Hanselmann
      logging.debug("Waiting for changes in job %s", job_id)
1165 dfe57c22 Michael Hanselmann
1166 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
1167 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
1168 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
1169 dfe57c22 Michael Hanselmann
1170 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
1171 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
1172 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
1173 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
1174 dfe57c22 Michael Hanselmann
      # significantly different.
1175 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1176 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1177 dfe57c22 Michael Hanselmann
1178 6bcb1446 Michael Hanselmann
      # Don't even try to wait if the job is no longer running, there will be
1179 6bcb1446 Michael Hanselmann
      # no changes.
1180 6bcb1446 Michael Hanselmann
      if (status not in (constants.JOB_STATUS_QUEUED,
1181 6bcb1446 Michael Hanselmann
                         constants.JOB_STATUS_RUNNING,
1182 6bcb1446 Michael Hanselmann
                         constants.JOB_STATUS_WAITLOCK) or
1183 6bcb1446 Michael Hanselmann
          prev_job_info != job_info or
1184 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
1185 6bcb1446 Michael Hanselmann
        logging.debug("Job %s changed", job_id)
1186 6bcb1446 Michael Hanselmann
        return (job_info, log_entries)
1187 dfe57c22 Michael Hanselmann
1188 6bcb1446 Michael Hanselmann
      raise utils.RetryAgain()
1189 dfe57c22 Michael Hanselmann
1190 6bcb1446 Michael Hanselmann
    try:
1191 6bcb1446 Michael Hanselmann
      # Setting wait function to release the queue lock while waiting
1192 6bcb1446 Michael Hanselmann
      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1193 6bcb1446 Michael Hanselmann
                         wait_fn=job.change.wait)
1194 6bcb1446 Michael Hanselmann
    except utils.RetryTimeout:
1195 6bcb1446 Michael Hanselmann
      return constants.JOB_NOTCHANGED
1196 dfe57c22 Michael Hanselmann
1197 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1198 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1199 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1200 188c5e0a Michael Hanselmann
    """Cancels a job.
1201 188c5e0a Michael Hanselmann

1202 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1203 ea03467c Iustin Pop

1204 188c5e0a Michael Hanselmann
    @type job_id: string
1205 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1206 188c5e0a Michael Hanselmann

1207 188c5e0a Michael Hanselmann
    """
1208 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1209 188c5e0a Michael Hanselmann
1210 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1211 188c5e0a Michael Hanselmann
    if not job:
1212 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1213 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1214 fbf0262f Michael Hanselmann
1215 fbf0262f Michael Hanselmann
    job_status = job.CalcStatus()
1216 188c5e0a Michael Hanselmann
1217 fbf0262f Michael Hanselmann
    if job_status not in (constants.JOB_STATUS_QUEUED,
1218 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK):
1219 a9e97393 Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1220 a9e97393 Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1221 fbf0262f Michael Hanselmann
1222 fbf0262f Michael Hanselmann
    if job_status == constants.JOB_STATUS_QUEUED:
1223 fbf0262f Michael Hanselmann
      self.CancelJobUnlocked(job)
1224 fbf0262f Michael Hanselmann
      return (True, "Job %s canceled" % job.id)
1225 188c5e0a Michael Hanselmann
1226 fbf0262f Michael Hanselmann
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1227 fbf0262f Michael Hanselmann
      # The worker will notice the new status and cancel the job
1228 fbf0262f Michael Hanselmann
      try:
1229 34327f51 Iustin Pop
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1230 fbf0262f Michael Hanselmann
      finally:
1231 fbf0262f Michael Hanselmann
        self.UpdateJobUnlocked(job)
1232 fbf0262f Michael Hanselmann
      return (True, "Job %s will be canceled" % job.id)
1233 fbf0262f Michael Hanselmann
1234 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1235 fbf0262f Michael Hanselmann
  def CancelJobUnlocked(self, job):
1236 fbf0262f Michael Hanselmann
    """Marks a job as canceled.
1237 fbf0262f Michael Hanselmann

1238 fbf0262f Michael Hanselmann
    """
1239 85f03e0d Michael Hanselmann
    try:
1240 34327f51 Iustin Pop
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1241 34327f51 Iustin Pop
                            "Job canceled by request")
1242 85f03e0d Michael Hanselmann
    finally:
1243 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
1244 188c5e0a Michael Hanselmann
1245 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1246 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
1247 d7fd1f28 Michael Hanselmann
    """Archives jobs.
1248 c609f802 Michael Hanselmann

1249 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
1250 25e7b43f Iustin Pop
    @param jobs: Job objects
1251 d7fd1f28 Michael Hanselmann
    @rtype: int
1252 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
1253 c609f802 Michael Hanselmann

1254 c609f802 Michael Hanselmann
    """
1255 d7fd1f28 Michael Hanselmann
    archive_jobs = []
1256 d7fd1f28 Michael Hanselmann
    rename_files = []
1257 d7fd1f28 Michael Hanselmann
    for job in jobs:
1258 d7fd1f28 Michael Hanselmann
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1259 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_SUCCESS,
1260 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_ERROR):
1261 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
1262 d7fd1f28 Michael Hanselmann
        continue
1263 c609f802 Michael Hanselmann
1264 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
1265 c609f802 Michael Hanselmann
1266 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
1267 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
1268 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
1269 c609f802 Michael Hanselmann
1270 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
1271 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
1272 f1da30e6 Michael Hanselmann
1273 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
1274 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
1275 d7fd1f28 Michael Hanselmann
1276 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
1277 78d12585 Michael Hanselmann
1278 07cd723a Iustin Pop
  @utils.LockedMethod
1279 07cd723a Iustin Pop
  @_RequireOpenQueue
1280 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1281 07cd723a Iustin Pop
    """Archives a job.
1282 07cd723a Iustin Pop

1283 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1284 ea03467c Iustin Pop

1285 07cd723a Iustin Pop
    @type job_id: string
1286 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1287 78d12585 Michael Hanselmann
    @rtype: bool
1288 78d12585 Michael Hanselmann
    @return: Whether job was archived
1289 07cd723a Iustin Pop

1290 07cd723a Iustin Pop
    """
1291 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
1292 78d12585 Michael Hanselmann
1293 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1294 78d12585 Michael Hanselmann
    if not job:
1295 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1296 78d12585 Michael Hanselmann
      return False
1297 78d12585 Michael Hanselmann
1298 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
1299 07cd723a Iustin Pop
1300 07cd723a Iustin Pop
  @utils.LockedMethod
1301 07cd723a Iustin Pop
  @_RequireOpenQueue
1302 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
1303 07cd723a Iustin Pop
    """Archives all jobs based on age.
1304 07cd723a Iustin Pop

1305 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1306 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1307 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1308 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1309 07cd723a Iustin Pop

1310 07cd723a Iustin Pop
    @type age: int
1311 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1312 07cd723a Iustin Pop

1313 07cd723a Iustin Pop
    """
1314 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1315 07cd723a Iustin Pop
1316 07cd723a Iustin Pop
    now = time.time()
1317 f8ad5591 Michael Hanselmann
    end_time = now + timeout
1318 f8ad5591 Michael Hanselmann
    archived_count = 0
1319 f8ad5591 Michael Hanselmann
    last_touched = 0
1320 f8ad5591 Michael Hanselmann
1321 f8ad5591 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1322 d7fd1f28 Michael Hanselmann
    pending = []
1323 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1324 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
1325 f8ad5591 Michael Hanselmann
1326 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
1327 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
1328 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
1329 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
1330 f8ad5591 Michael Hanselmann
        break
1331 f8ad5591 Michael Hanselmann
1332 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
1333 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1334 f8ad5591 Michael Hanselmann
      if job:
1335 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
1336 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
1337 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
1338 f8ad5591 Michael Hanselmann
          else:
1339 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
1340 07cd723a Iustin Pop
        else:
1341 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
1342 f8ad5591 Michael Hanselmann
1343 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
1344 d7fd1f28 Michael Hanselmann
          pending.append(job)
1345 d7fd1f28 Michael Hanselmann
1346 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
1347 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
1348 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
1349 d7fd1f28 Michael Hanselmann
            pending = []
1350 f8ad5591 Michael Hanselmann
1351 d7fd1f28 Michael Hanselmann
    if pending:
1352 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
1353 07cd723a Iustin Pop
1354 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
1355 07cd723a Iustin Pop
1356 7e950d31 Iustin Pop
  @staticmethod
1357 7e950d31 Iustin Pop
  def _GetJobInfoUnlocked(job, fields):
1358 ea03467c Iustin Pop
    """Returns information about a job.
1359 ea03467c Iustin Pop

1360 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1361 ea03467c Iustin Pop
    @param job: the job which we query
1362 ea03467c Iustin Pop
    @type fields: list
1363 ea03467c Iustin Pop
    @param fields: names of fields to return
1364 ea03467c Iustin Pop
    @rtype: list
1365 ea03467c Iustin Pop
    @return: list with one element for each field
1366 ea03467c Iustin Pop
    @raise errors.OpExecError: when an invalid field
1367 ea03467c Iustin Pop
        has been passed
1368 ea03467c Iustin Pop

1369 ea03467c Iustin Pop
    """
1370 e2715f69 Michael Hanselmann
    row = []
1371 e2715f69 Michael Hanselmann
    for fname in fields:
1372 e2715f69 Michael Hanselmann
      if fname == "id":
1373 e2715f69 Michael Hanselmann
        row.append(job.id)
1374 e2715f69 Michael Hanselmann
      elif fname == "status":
1375 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
1376 af30b2fd Michael Hanselmann
      elif fname == "ops":
1377 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
1378 af30b2fd Michael Hanselmann
      elif fname == "opresult":
1379 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
1380 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
1381 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
1382 5b23c34c Iustin Pop
      elif fname == "oplog":
1383 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
1384 c56ec146 Iustin Pop
      elif fname == "opstart":
1385 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
1386 c56ec146 Iustin Pop
      elif fname == "opend":
1387 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
1388 c56ec146 Iustin Pop
      elif fname == "received_ts":
1389 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
1390 c56ec146 Iustin Pop
      elif fname == "start_ts":
1391 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
1392 c56ec146 Iustin Pop
      elif fname == "end_ts":
1393 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
1394 1d2dcdfd Michael Hanselmann
      elif fname == "lock_status":
1395 1d2dcdfd Michael Hanselmann
        row.append(job.lock_status)
1396 60dd1473 Iustin Pop
      elif fname == "summary":
1397 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
1398 e2715f69 Michael Hanselmann
      else:
1399 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1400 e2715f69 Michael Hanselmann
    return row
1401 e2715f69 Michael Hanselmann
1402 85f03e0d Michael Hanselmann
  @utils.LockedMethod
1403 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1404 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1405 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1406 e2715f69 Michael Hanselmann

1407 ea03467c Iustin Pop
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1408 ea03467c Iustin Pop
    processing for each job.
1409 ea03467c Iustin Pop

1410 ea03467c Iustin Pop
    @type job_ids: list
1411 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1412 ea03467c Iustin Pop
    @type fields: list
1413 ea03467c Iustin Pop
    @param fields: names of fields to return
1414 ea03467c Iustin Pop
    @rtype: list
1415 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1416 ea03467c Iustin Pop
        the requested fields
1417 e2715f69 Michael Hanselmann

1418 e2715f69 Michael Hanselmann
    """
1419 85f03e0d Michael Hanselmann
    jobs = []
1420 e2715f69 Michael Hanselmann
1421 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
1422 85f03e0d Michael Hanselmann
      if job is None:
1423 85f03e0d Michael Hanselmann
        jobs.append(None)
1424 85f03e0d Michael Hanselmann
      else:
1425 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1426 e2715f69 Michael Hanselmann
1427 85f03e0d Michael Hanselmann
    return jobs
1428 e2715f69 Michael Hanselmann
1429 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1430 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1431 e2715f69 Michael Hanselmann
  def Shutdown(self):
1432 e2715f69 Michael Hanselmann
    """Stops the job queue.
1433 e2715f69 Michael Hanselmann

1434 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1435 ea03467c Iustin Pop

1436 e2715f69 Michael Hanselmann
    """
1437 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1438 85f03e0d Michael Hanselmann
1439 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
1440 04ab05ce Michael Hanselmann
    self._queue_lock = None