Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ a66bd91b

History | View | Annotate | Download (41.2 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 e2715f69 Michael Hanselmann
import threading
35 f1da30e6 Michael Hanselmann
import errno
36 f1da30e6 Michael Hanselmann
import re
37 f1048938 Iustin Pop
import time
38 5685c1a5 Michael Hanselmann
import weakref
39 498ae1cc Iustin Pop
40 e2715f69 Michael Hanselmann
from ganeti import constants
41 f1da30e6 Michael Hanselmann
from ganeti import serializer
42 e2715f69 Michael Hanselmann
from ganeti import workerpool
43 f1da30e6 Michael Hanselmann
from ganeti import opcodes
44 7a1ecaed Iustin Pop
from ganeti import errors
45 e2715f69 Michael Hanselmann
from ganeti import mcpu
46 7996a135 Iustin Pop
from ganeti import utils
47 04ab05ce Michael Hanselmann
from ganeti import jstore
48 c3f0a12f Iustin Pop
from ganeti import rpc
49 e2715f69 Michael Hanselmann
50 fbf0262f Michael Hanselmann
51 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
52 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53 e2715f69 Michael Hanselmann
54 498ae1cc Iustin Pop
55 9728ae5d Iustin Pop
class CancelJob(Exception):
56 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
57 fbf0262f Michael Hanselmann

58 fbf0262f Michael Hanselmann
  """
59 fbf0262f Michael Hanselmann
60 fbf0262f Michael Hanselmann
61 70552c46 Michael Hanselmann
def TimeStampNow():
62 ea03467c Iustin Pop
  """Returns the current timestamp.
63 ea03467c Iustin Pop

64 ea03467c Iustin Pop
  @rtype: tuple
65 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
66 ea03467c Iustin Pop

67 ea03467c Iustin Pop
  """
68 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
69 70552c46 Michael Hanselmann
70 70552c46 Michael Hanselmann
71 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
72 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
73 e2715f69 Michael Hanselmann

74 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
75 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
76 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
77 ea03467c Iustin Pop
  @ivar status: the current status
78 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
79 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
80 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
81 f1048938 Iustin Pop

82 e2715f69 Michael Hanselmann
  """
83 66d895a8 Iustin Pop
  __slots__ = ["input", "status", "result", "log",
84 66d895a8 Iustin Pop
               "start_timestamp", "end_timestamp",
85 66d895a8 Iustin Pop
               "__weakref__"]
86 66d895a8 Iustin Pop
87 85f03e0d Michael Hanselmann
  def __init__(self, op):
88 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
89 ea03467c Iustin Pop

90 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
91 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
92 ea03467c Iustin Pop

93 ea03467c Iustin Pop
    """
94 85f03e0d Michael Hanselmann
    self.input = op
95 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
96 85f03e0d Michael Hanselmann
    self.result = None
97 85f03e0d Michael Hanselmann
    self.log = []
98 70552c46 Michael Hanselmann
    self.start_timestamp = None
99 70552c46 Michael Hanselmann
    self.end_timestamp = None
100 f1da30e6 Michael Hanselmann
101 f1da30e6 Michael Hanselmann
  @classmethod
102 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
103 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
104 ea03467c Iustin Pop

105 ea03467c Iustin Pop
    @type state: dict
106 ea03467c Iustin Pop
    @param state: the serialized state
107 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
108 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
109 ea03467c Iustin Pop

110 ea03467c Iustin Pop
    """
111 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
112 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113 85f03e0d Michael Hanselmann
    obj.status = state["status"]
114 85f03e0d Michael Hanselmann
    obj.result = state["result"]
115 85f03e0d Michael Hanselmann
    obj.log = state["log"]
116 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
117 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
118 f1da30e6 Michael Hanselmann
    return obj
119 f1da30e6 Michael Hanselmann
120 f1da30e6 Michael Hanselmann
  def Serialize(self):
121 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
122 ea03467c Iustin Pop

123 ea03467c Iustin Pop
    @rtype: dict
124 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
125 ea03467c Iustin Pop

126 ea03467c Iustin Pop
    """
127 6c5a7090 Michael Hanselmann
    return {
128 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
129 6c5a7090 Michael Hanselmann
      "status": self.status,
130 6c5a7090 Michael Hanselmann
      "result": self.result,
131 6c5a7090 Michael Hanselmann
      "log": self.log,
132 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
133 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
134 6c5a7090 Michael Hanselmann
      }
135 f1048938 Iustin Pop
136 e2715f69 Michael Hanselmann
137 e2715f69 Michael Hanselmann
class _QueuedJob(object):
138 e2715f69 Michael Hanselmann
  """In-memory job representation.
139 e2715f69 Michael Hanselmann

140 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
141 ea03467c Iustin Pop
  be taken care of by users of this class.
142 ea03467c Iustin Pop

143 ea03467c Iustin Pop
  @type queue: L{JobQueue}
144 ea03467c Iustin Pop
  @ivar queue: the parent queue
145 ea03467c Iustin Pop
  @ivar id: the job ID
146 ea03467c Iustin Pop
  @type ops: list
147 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
148 ea03467c Iustin Pop
  @type log_serial: int
149 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
150 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
151 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
152 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
153 ef2df7d3 Michael Hanselmann
  @ivar lock_status: In-memory locking information for debugging
154 ea03467c Iustin Pop
  @ivar change: a Condition variable we use for waiting for job changes
155 e2715f69 Michael Hanselmann

156 e2715f69 Michael Hanselmann
  """
157 d25c1d6a Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial",
158 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
159 ef2df7d3 Michael Hanselmann
               "lock_status", "change",
160 66d895a8 Iustin Pop
               "__weakref__"]
161 66d895a8 Iustin Pop
162 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
163 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
164 ea03467c Iustin Pop

165 ea03467c Iustin Pop
    @type queue: L{JobQueue}
166 ea03467c Iustin Pop
    @param queue: our parent queue
167 ea03467c Iustin Pop
    @type job_id: job_id
168 ea03467c Iustin Pop
    @param job_id: our job id
169 ea03467c Iustin Pop
    @type ops: list
170 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
171 ea03467c Iustin Pop
        in _QueuedOpCodes
172 ea03467c Iustin Pop

173 ea03467c Iustin Pop
    """
174 e2715f69 Michael Hanselmann
    if not ops:
175 ea03467c Iustin Pop
      # TODO: use a better exception
176 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
177 e2715f69 Michael Hanselmann
178 85f03e0d Michael Hanselmann
    self.queue = queue
179 f1da30e6 Michael Hanselmann
    self.id = job_id
180 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
181 6c5a7090 Michael Hanselmann
    self.log_serial = 0
182 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
183 c56ec146 Iustin Pop
    self.start_timestamp = None
184 c56ec146 Iustin Pop
    self.end_timestamp = None
185 6c5a7090 Michael Hanselmann
186 ef2df7d3 Michael Hanselmann
    # In-memory attributes
187 ef2df7d3 Michael Hanselmann
    self.lock_status = None
188 ef2df7d3 Michael Hanselmann
189 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
190 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
191 f1da30e6 Michael Hanselmann
192 f1da30e6 Michael Hanselmann
  @classmethod
193 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
194 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
195 ea03467c Iustin Pop

196 ea03467c Iustin Pop
    @type queue: L{JobQueue}
197 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
198 ea03467c Iustin Pop
    @type state: dict
199 ea03467c Iustin Pop
    @param state: the serialized state
200 ea03467c Iustin Pop
    @rtype: _JobQueue
201 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
202 ea03467c Iustin Pop

203 ea03467c Iustin Pop
    """
204 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
205 85f03e0d Michael Hanselmann
    obj.queue = queue
206 85f03e0d Michael Hanselmann
    obj.id = state["id"]
207 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
208 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
209 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
210 6c5a7090 Michael Hanselmann
211 ef2df7d3 Michael Hanselmann
    # In-memory attributes
212 ef2df7d3 Michael Hanselmann
    obj.lock_status = None
213 ef2df7d3 Michael Hanselmann
214 6c5a7090 Michael Hanselmann
    obj.ops = []
215 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
216 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
217 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
218 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
219 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
220 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
221 6c5a7090 Michael Hanselmann
222 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
223 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
224 6c5a7090 Michael Hanselmann
225 f1da30e6 Michael Hanselmann
    return obj
226 f1da30e6 Michael Hanselmann
227 f1da30e6 Michael Hanselmann
  def Serialize(self):
228 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
229 ea03467c Iustin Pop

230 ea03467c Iustin Pop
    @rtype: dict
231 ea03467c Iustin Pop
    @return: the serialized state
232 ea03467c Iustin Pop

233 ea03467c Iustin Pop
    """
234 f1da30e6 Michael Hanselmann
    return {
235 f1da30e6 Michael Hanselmann
      "id": self.id,
236 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
237 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
238 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
239 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
240 f1da30e6 Michael Hanselmann
      }
241 f1da30e6 Michael Hanselmann
242 85f03e0d Michael Hanselmann
  def CalcStatus(self):
243 ea03467c Iustin Pop
    """Compute the status of this job.
244 ea03467c Iustin Pop

245 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
246 ea03467c Iustin Pop
    based on their status, computes the job status.
247 ea03467c Iustin Pop

248 ea03467c Iustin Pop
    The algorithm is:
249 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
250 ea03467c Iustin Pop
        status will be the same
251 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
252 ea03467c Iustin Pop
          - waitlock
253 fbf0262f Michael Hanselmann
          - canceling
254 ea03467c Iustin Pop
          - running
255 ea03467c Iustin Pop

256 ea03467c Iustin Pop
        will determine the job status
257 ea03467c Iustin Pop

258 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
259 ea03467c Iustin Pop
        and the job status will be the same
260 ea03467c Iustin Pop

261 ea03467c Iustin Pop
    @return: the job status
262 ea03467c Iustin Pop

263 ea03467c Iustin Pop
    """
264 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
265 e2715f69 Michael Hanselmann
266 e2715f69 Michael Hanselmann
    all_success = True
267 85f03e0d Michael Hanselmann
    for op in self.ops:
268 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
269 e2715f69 Michael Hanselmann
        continue
270 e2715f69 Michael Hanselmann
271 e2715f69 Michael Hanselmann
      all_success = False
272 e2715f69 Michael Hanselmann
273 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
274 e2715f69 Michael Hanselmann
        pass
275 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
276 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
277 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
278 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
279 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
280 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
281 fbf0262f Michael Hanselmann
        break
282 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
283 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
284 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
285 f1da30e6 Michael Hanselmann
        break
286 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
287 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
288 4cb1d919 Michael Hanselmann
        break
289 e2715f69 Michael Hanselmann
290 e2715f69 Michael Hanselmann
    if all_success:
291 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
292 e2715f69 Michael Hanselmann
293 e2715f69 Michael Hanselmann
    return status
294 e2715f69 Michael Hanselmann
295 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
296 ea03467c Iustin Pop
    """Selectively returns the log entries.
297 ea03467c Iustin Pop

298 ea03467c Iustin Pop
    @type newer_than: None or int
299 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
300 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
301 ea03467c Iustin Pop
        than this value
302 ea03467c Iustin Pop
    @rtype: list
303 ea03467c Iustin Pop
    @return: the list of the log entries selected
304 ea03467c Iustin Pop

305 ea03467c Iustin Pop
    """
306 6c5a7090 Michael Hanselmann
    if newer_than is None:
307 6c5a7090 Michael Hanselmann
      serial = -1
308 6c5a7090 Michael Hanselmann
    else:
309 6c5a7090 Michael Hanselmann
      serial = newer_than
310 6c5a7090 Michael Hanselmann
311 6c5a7090 Michael Hanselmann
    entries = []
312 6c5a7090 Michael Hanselmann
    for op in self.ops:
313 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
314 6c5a7090 Michael Hanselmann
315 6c5a7090 Michael Hanselmann
    return entries
316 6c5a7090 Michael Hanselmann
317 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
318 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
319 34327f51 Iustin Pop

320 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
321 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
322 34327f51 Iustin Pop
    finalised are not changed.
323 34327f51 Iustin Pop

324 34327f51 Iustin Pop
    @param status: a given opcode status
325 34327f51 Iustin Pop
    @param result: the opcode result
326 34327f51 Iustin Pop

327 34327f51 Iustin Pop
    """
328 34327f51 Iustin Pop
    not_marked = True
329 34327f51 Iustin Pop
    for op in self.ops:
330 34327f51 Iustin Pop
      if op.status in constants.OPS_FINALIZED:
331 34327f51 Iustin Pop
        assert not_marked, "Finalized opcodes found after non-finalized ones"
332 34327f51 Iustin Pop
        continue
333 34327f51 Iustin Pop
      op.status = status
334 34327f51 Iustin Pop
      op.result = result
335 34327f51 Iustin Pop
      not_marked = False
336 34327f51 Iustin Pop
337 f1048938 Iustin Pop
338 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
339 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
340 031a3e57 Michael Hanselmann
    """Initializes this class.
341 ea03467c Iustin Pop

342 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
343 031a3e57 Michael Hanselmann
    @param queue: Job queue
344 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
345 031a3e57 Michael Hanselmann
    @param job: Job object
346 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
347 031a3e57 Michael Hanselmann
    @param op: OpCode
348 031a3e57 Michael Hanselmann

349 031a3e57 Michael Hanselmann
    """
350 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
351 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
352 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
353 031a3e57 Michael Hanselmann
354 031a3e57 Michael Hanselmann
    self._queue = queue
355 031a3e57 Michael Hanselmann
    self._job = job
356 031a3e57 Michael Hanselmann
    self._op = op
357 031a3e57 Michael Hanselmann
358 031a3e57 Michael Hanselmann
  def NotifyStart(self):
359 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
360 e92376d7 Iustin Pop

361 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
362 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
363 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
364 031a3e57 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
365 e92376d7 Iustin Pop

366 e92376d7 Iustin Pop
    """
367 031a3e57 Michael Hanselmann
    self._queue.acquire()
368 e92376d7 Iustin Pop
    try:
369 031a3e57 Michael Hanselmann
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
370 031a3e57 Michael Hanselmann
                                 constants.OP_STATUS_CANCELING)
371 fbf0262f Michael Hanselmann
372 ef2df7d3 Michael Hanselmann
      # All locks are acquired by now
373 ef2df7d3 Michael Hanselmann
      self._job.lock_status = None
374 ef2df7d3 Michael Hanselmann
375 fbf0262f Michael Hanselmann
      # Cancel here if we were asked to
376 031a3e57 Michael Hanselmann
      if self._op.status == constants.OP_STATUS_CANCELING:
377 fbf0262f Michael Hanselmann
        raise CancelJob()
378 fbf0262f Michael Hanselmann
379 031a3e57 Michael Hanselmann
      self._op.status = constants.OP_STATUS_RUNNING
380 e92376d7 Iustin Pop
    finally:
381 031a3e57 Michael Hanselmann
      self._queue.release()
382 031a3e57 Michael Hanselmann
383 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
384 031a3e57 Michael Hanselmann
    """Append a log entry.
385 031a3e57 Michael Hanselmann

386 031a3e57 Michael Hanselmann
    """
387 031a3e57 Michael Hanselmann
    assert len(args) < 3
388 031a3e57 Michael Hanselmann
389 031a3e57 Michael Hanselmann
    if len(args) == 1:
390 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
391 031a3e57 Michael Hanselmann
      log_msg = args[0]
392 031a3e57 Michael Hanselmann
    else:
393 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
394 031a3e57 Michael Hanselmann
395 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
396 031a3e57 Michael Hanselmann
    # precision.
397 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
398 e92376d7 Iustin Pop
399 031a3e57 Michael Hanselmann
    self._queue.acquire()
400 031a3e57 Michael Hanselmann
    try:
401 031a3e57 Michael Hanselmann
      self._job.log_serial += 1
402 031a3e57 Michael Hanselmann
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
403 031a3e57 Michael Hanselmann
404 031a3e57 Michael Hanselmann
      self._job.change.notifyAll()
405 031a3e57 Michael Hanselmann
    finally:
406 031a3e57 Michael Hanselmann
      self._queue.release()
407 031a3e57 Michael Hanselmann
408 ef2df7d3 Michael Hanselmann
  def ReportLocks(self, msg):
409 ef2df7d3 Michael Hanselmann
    """Write locking information to the job.
410 ef2df7d3 Michael Hanselmann

411 ef2df7d3 Michael Hanselmann
    Called whenever the LU processor is waiting for a lock or has acquired one.
412 ef2df7d3 Michael Hanselmann

413 ef2df7d3 Michael Hanselmann
    """
414 ef2df7d3 Michael Hanselmann
    # Not getting the queue lock because this is a single assignment
415 ef2df7d3 Michael Hanselmann
    self._job.lock_status = msg
416 ef2df7d3 Michael Hanselmann
417 031a3e57 Michael Hanselmann
418 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
419 031a3e57 Michael Hanselmann
  """The actual job workers.
420 031a3e57 Michael Hanselmann

421 031a3e57 Michael Hanselmann
  """
422 85f03e0d Michael Hanselmann
  def RunTask(self, job):
423 e2715f69 Michael Hanselmann
    """Job executor.
424 e2715f69 Michael Hanselmann

425 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
426 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
427 e2715f69 Michael Hanselmann

428 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
429 ea03467c Iustin Pop
    @param job: the job to be processed
430 ea03467c Iustin Pop

431 e2715f69 Michael Hanselmann
    """
432 d21d09d6 Iustin Pop
    logging.info("Worker %s processing job %s",
433 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
434 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
435 031a3e57 Michael Hanselmann
    queue = job.queue
436 e2715f69 Michael Hanselmann
    try:
437 85f03e0d Michael Hanselmann
      try:
438 85f03e0d Michael Hanselmann
        count = len(job.ops)
439 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
440 d21d09d6 Iustin Pop
          op_summary = op.input.Summary()
441 f6424741 Iustin Pop
          if op.status == constants.OP_STATUS_SUCCESS:
442 f6424741 Iustin Pop
            # this is a job that was partially completed before master
443 f6424741 Iustin Pop
            # daemon shutdown, so it can be expected that some opcodes
444 f6424741 Iustin Pop
            # are already completed successfully (if any did error
445 f6424741 Iustin Pop
            # out, then the whole job should have been aborted and not
446 f6424741 Iustin Pop
            # resubmitted for processing)
447 f6424741 Iustin Pop
            logging.info("Op %s/%s: opcode %s already processed, skipping",
448 f6424741 Iustin Pop
                         idx + 1, count, op_summary)
449 f6424741 Iustin Pop
            continue
450 85f03e0d Michael Hanselmann
          try:
451 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
452 d21d09d6 Iustin Pop
                         op_summary)
453 85f03e0d Michael Hanselmann
454 85f03e0d Michael Hanselmann
            queue.acquire()
455 85f03e0d Michael Hanselmann
            try:
456 df0fb067 Iustin Pop
              if op.status == constants.OP_STATUS_CANCELED:
457 df0fb067 Iustin Pop
                raise CancelJob()
458 fbf0262f Michael Hanselmann
              assert op.status == constants.OP_STATUS_QUEUED
459 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
460 85f03e0d Michael Hanselmann
              op.result = None
461 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
462 c56ec146 Iustin Pop
              if idx == 0: # first opcode
463 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
464 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
465 85f03e0d Michael Hanselmann
466 38206f3c Iustin Pop
              input_opcode = op.input
467 85f03e0d Michael Hanselmann
            finally:
468 85f03e0d Michael Hanselmann
              queue.release()
469 85f03e0d Michael Hanselmann
470 031a3e57 Michael Hanselmann
            # Make sure not to hold queue lock while calling ExecOpCode
471 031a3e57 Michael Hanselmann
            result = proc.ExecOpCode(input_opcode,
472 ef2df7d3 Michael Hanselmann
                                     _OpExecCallbacks(queue, job, op))
473 85f03e0d Michael Hanselmann
474 85f03e0d Michael Hanselmann
            queue.acquire()
475 85f03e0d Michael Hanselmann
            try:
476 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
477 85f03e0d Michael Hanselmann
              op.result = result
478 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
479 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
480 85f03e0d Michael Hanselmann
            finally:
481 85f03e0d Michael Hanselmann
              queue.release()
482 85f03e0d Michael Hanselmann
483 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Successfully finished opcode %s",
484 d21d09d6 Iustin Pop
                         idx + 1, count, op_summary)
485 fbf0262f Michael Hanselmann
          except CancelJob:
486 fbf0262f Michael Hanselmann
            # Will be handled further up
487 fbf0262f Michael Hanselmann
            raise
488 85f03e0d Michael Hanselmann
          except Exception, err:
489 85f03e0d Michael Hanselmann
            queue.acquire()
490 85f03e0d Michael Hanselmann
            try:
491 85f03e0d Michael Hanselmann
              try:
492 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
493 bcb66fca Iustin Pop
                if isinstance(err, errors.GenericError):
494 bcb66fca Iustin Pop
                  op.result = errors.EncodeException(err)
495 bcb66fca Iustin Pop
                else:
496 bcb66fca Iustin Pop
                  op.result = str(err)
497 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
498 0f6be82a Iustin Pop
                logging.info("Op %s/%s: Error in opcode %s: %s",
499 0f6be82a Iustin Pop
                             idx + 1, count, op_summary, err)
500 85f03e0d Michael Hanselmann
              finally:
501 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
502 85f03e0d Michael Hanselmann
            finally:
503 85f03e0d Michael Hanselmann
              queue.release()
504 85f03e0d Michael Hanselmann
            raise
505 85f03e0d Michael Hanselmann
506 fbf0262f Michael Hanselmann
      except CancelJob:
507 fbf0262f Michael Hanselmann
        queue.acquire()
508 fbf0262f Michael Hanselmann
        try:
509 fbf0262f Michael Hanselmann
          queue.CancelJobUnlocked(job)
510 fbf0262f Michael Hanselmann
        finally:
511 fbf0262f Michael Hanselmann
          queue.release()
512 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
513 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
514 85f03e0d Michael Hanselmann
      except:
515 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
516 e2715f69 Michael Hanselmann
    finally:
517 85f03e0d Michael Hanselmann
      queue.acquire()
518 85f03e0d Michael Hanselmann
      try:
519 65548ed5 Michael Hanselmann
        try:
520 ef2df7d3 Michael Hanselmann
          job.lock_status = None
521 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
522 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
523 65548ed5 Michael Hanselmann
        finally:
524 65548ed5 Michael Hanselmann
          job_id = job.id
525 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
526 85f03e0d Michael Hanselmann
      finally:
527 85f03e0d Michael Hanselmann
        queue.release()
528 ef2df7d3 Michael Hanselmann
529 d21d09d6 Iustin Pop
      logging.info("Worker %s finished job %s, status = %s",
530 d21d09d6 Iustin Pop
                   self.worker_id, job_id, status)
531 e2715f69 Michael Hanselmann
532 e2715f69 Michael Hanselmann
533 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
534 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
535 ea03467c Iustin Pop

536 ea03467c Iustin Pop
  """
537 5bdce580 Michael Hanselmann
  def __init__(self, queue):
538 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
539 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
540 5bdce580 Michael Hanselmann
    self.queue = queue
541 e2715f69 Michael Hanselmann
542 e2715f69 Michael Hanselmann
543 85f03e0d Michael Hanselmann
class JobQueue(object):
544 5bbd3f7f Michael Hanselmann
  """Queue used to manage the jobs.
545 ea03467c Iustin Pop

546 ea03467c Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
547 ea03467c Iustin Pop

548 ea03467c Iustin Pop
  """
549 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
550 f1da30e6 Michael Hanselmann
551 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
552 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
553 db37da70 Michael Hanselmann

554 ea03467c Iustin Pop
    This function should be used for all 'public' functions. That is,
555 ea03467c Iustin Pop
    functions usually called from other classes.
556 db37da70 Michael Hanselmann

557 ea03467c Iustin Pop
    @warning: Use this decorator only after utils.LockedMethod!
558 db37da70 Michael Hanselmann

559 ea03467c Iustin Pop
    Example::
560 db37da70 Michael Hanselmann
      @utils.LockedMethod
561 db37da70 Michael Hanselmann
      @_RequireOpenQueue
562 db37da70 Michael Hanselmann
      def Example(self):
563 db37da70 Michael Hanselmann
        pass
564 db37da70 Michael Hanselmann

565 db37da70 Michael Hanselmann
    """
566 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
567 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
568 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
569 db37da70 Michael Hanselmann
    return wrapper
570 db37da70 Michael Hanselmann
571 85f03e0d Michael Hanselmann
  def __init__(self, context):
572 ea03467c Iustin Pop
    """Constructor for JobQueue.
573 ea03467c Iustin Pop

574 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
575 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
576 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
577 ea03467c Iustin Pop
    running).
578 ea03467c Iustin Pop

579 ea03467c Iustin Pop
    @type context: GanetiContext
580 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
581 ea03467c Iustin Pop
        data and other ganeti objects
582 ea03467c Iustin Pop

583 ea03467c Iustin Pop
    """
584 5bdce580 Michael Hanselmann
    self.context = context
585 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
586 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
587 f1da30e6 Michael Hanselmann
588 85f03e0d Michael Hanselmann
    # Locking
589 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
590 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
591 85f03e0d Michael Hanselmann
    self.release = self._lock.release
592 85f03e0d Michael Hanselmann
593 04ab05ce Michael Hanselmann
    # Initialize
594 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
595 f1da30e6 Michael Hanselmann
596 04ab05ce Michael Hanselmann
    # Read serial file
597 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
598 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
599 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
600 c4beba1c Iustin Pop
601 23752136 Michael Hanselmann
    # Get initial list of nodes
602 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
603 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
604 59303563 Iustin Pop
                       if n.master_candidate)
605 8e00939c Michael Hanselmann
606 8e00939c Michael Hanselmann
    # Remove master node
607 8e00939c Michael Hanselmann
    try:
608 99aabbed Iustin Pop
      del self._nodes[self._my_hostname]
609 33987705 Iustin Pop
    except KeyError:
610 8e00939c Michael Hanselmann
      pass
611 23752136 Michael Hanselmann
612 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
613 23752136 Michael Hanselmann
614 85f03e0d Michael Hanselmann
    # Setup worker pool
615 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
616 85f03e0d Michael Hanselmann
    try:
617 16714921 Michael Hanselmann
      # We need to lock here because WorkerPool.AddTask() may start a job while
618 16714921 Michael Hanselmann
      # we're still doing our work.
619 16714921 Michael Hanselmann
      self.acquire()
620 16714921 Michael Hanselmann
      try:
621 711b5124 Michael Hanselmann
        logging.info("Inspecting job queue")
622 711b5124 Michael Hanselmann
623 711b5124 Michael Hanselmann
        all_job_ids = self._GetJobIDsUnlocked()
624 b7cb9024 Michael Hanselmann
        jobs_count = len(all_job_ids)
625 711b5124 Michael Hanselmann
        lastinfo = time.time()
626 711b5124 Michael Hanselmann
        for idx, job_id in enumerate(all_job_ids):
627 711b5124 Michael Hanselmann
          # Give an update every 1000 jobs or 10 seconds
628 b7cb9024 Michael Hanselmann
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
629 b7cb9024 Michael Hanselmann
              idx == (jobs_count - 1)):
630 711b5124 Michael Hanselmann
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
631 b7cb9024 Michael Hanselmann
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
632 711b5124 Michael Hanselmann
            lastinfo = time.time()
633 711b5124 Michael Hanselmann
634 711b5124 Michael Hanselmann
          job = self._LoadJobUnlocked(job_id)
635 711b5124 Michael Hanselmann
636 16714921 Michael Hanselmann
          # a failure in loading the job can cause 'None' to be returned
637 16714921 Michael Hanselmann
          if job is None:
638 16714921 Michael Hanselmann
            continue
639 94ed59a5 Iustin Pop
640 16714921 Michael Hanselmann
          status = job.CalcStatus()
641 85f03e0d Michael Hanselmann
642 16714921 Michael Hanselmann
          if status in (constants.JOB_STATUS_QUEUED, ):
643 16714921 Michael Hanselmann
            self._wpool.AddTask(job)
644 85f03e0d Michael Hanselmann
645 16714921 Michael Hanselmann
          elif status in (constants.JOB_STATUS_RUNNING,
646 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK,
647 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_CANCELING):
648 16714921 Michael Hanselmann
            logging.warning("Unfinished job %s found: %s", job.id, job)
649 16714921 Michael Hanselmann
            try:
650 34327f51 Iustin Pop
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
651 34327f51 Iustin Pop
                                    "Unclean master daemon shutdown")
652 16714921 Michael Hanselmann
            finally:
653 16714921 Michael Hanselmann
              self.UpdateJobUnlocked(job)
654 711b5124 Michael Hanselmann
655 711b5124 Michael Hanselmann
        logging.info("Job queue inspection finished")
656 16714921 Michael Hanselmann
      finally:
657 16714921 Michael Hanselmann
        self.release()
658 16714921 Michael Hanselmann
    except:
659 16714921 Michael Hanselmann
      self._wpool.TerminateWorkers()
660 16714921 Michael Hanselmann
      raise
661 85f03e0d Michael Hanselmann
662 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
663 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
664 99aabbed Iustin Pop
  def AddNode(self, node):
665 99aabbed Iustin Pop
    """Register a new node with the queue.
666 99aabbed Iustin Pop

667 99aabbed Iustin Pop
    @type node: L{objects.Node}
668 99aabbed Iustin Pop
    @param node: the node object to be added
669 99aabbed Iustin Pop

670 99aabbed Iustin Pop
    """
671 99aabbed Iustin Pop
    node_name = node.name
672 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
673 23752136 Michael Hanselmann
674 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
675 c8457ce7 Iustin Pop
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
676 3cebe102 Michael Hanselmann
    msg = result.fail_msg
677 c8457ce7 Iustin Pop
    if msg:
678 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
679 c8457ce7 Iustin Pop
                      node_name, msg)
680 23752136 Michael Hanselmann
681 59303563 Iustin Pop
    if not node.master_candidate:
682 59303563 Iustin Pop
      # remove if existing, ignoring errors
683 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
684 59303563 Iustin Pop
      # and skip the replication of the job ids
685 59303563 Iustin Pop
      return
686 59303563 Iustin Pop
687 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
688 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
689 23752136 Michael Hanselmann
690 d2e03a33 Michael Hanselmann
    # Upload current serial file
691 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
692 d2e03a33 Michael Hanselmann
693 d2e03a33 Michael Hanselmann
    for file_name in files:
694 9f774ee8 Michael Hanselmann
      # Read file content
695 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
696 9f774ee8 Michael Hanselmann
697 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
698 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
699 a3811745 Michael Hanselmann
                                                  file_name, content)
700 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
701 c8457ce7 Iustin Pop
      if msg:
702 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
703 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
704 d2e03a33 Michael Hanselmann
705 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
706 d2e03a33 Michael Hanselmann
707 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
708 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
709 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
710 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
711 ea03467c Iustin Pop

712 ea03467c Iustin Pop
    @type node_name: str
713 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
714 ea03467c Iustin Pop

715 ea03467c Iustin Pop
    """
716 23752136 Michael Hanselmann
    try:
717 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
718 99aabbed Iustin Pop
      del self._nodes[node_name]
719 d2e03a33 Michael Hanselmann
    except KeyError:
720 23752136 Michael Hanselmann
      pass
721 23752136 Michael Hanselmann
722 e74798c1 Michael Hanselmann
  def _CheckRpcResult(self, result, nodes, failmsg):
723 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
724 ea03467c Iustin Pop

725 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
726 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
727 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
728 ea03467c Iustin Pop

729 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
730 ea03467c Iustin Pop
    @type nodes: list
731 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
732 ea03467c Iustin Pop
    @type failmsg: str
733 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
734 ea03467c Iustin Pop

735 ea03467c Iustin Pop
    """
736 e74798c1 Michael Hanselmann
    failed = []
737 e74798c1 Michael Hanselmann
    success = []
738 e74798c1 Michael Hanselmann
739 e74798c1 Michael Hanselmann
    for node in nodes:
740 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
741 c8457ce7 Iustin Pop
      if msg:
742 e74798c1 Michael Hanselmann
        failed.append(node)
743 c8457ce7 Iustin Pop
        logging.error("RPC call %s failed on node %s: %s",
744 c8457ce7 Iustin Pop
                      result[node].call, node, msg)
745 c8457ce7 Iustin Pop
      else:
746 c8457ce7 Iustin Pop
        success.append(node)
747 e74798c1 Michael Hanselmann
748 e74798c1 Michael Hanselmann
    # +1 for the master node
749 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
750 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
751 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
752 e74798c1 Michael Hanselmann
753 99aabbed Iustin Pop
  def _GetNodeIp(self):
754 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
755 99aabbed Iustin Pop

756 ea03467c Iustin Pop
    @rtype: (list, list)
757 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
758 ea03467c Iustin Pop
        names and the second one with the node addresses
759 ea03467c Iustin Pop

760 99aabbed Iustin Pop
    """
761 99aabbed Iustin Pop
    name_list = self._nodes.keys()
762 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
763 99aabbed Iustin Pop
    return name_list, addr_list
764 99aabbed Iustin Pop
765 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
766 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
767 8e00939c Michael Hanselmann

768 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
769 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
770 ea03467c Iustin Pop

771 ea03467c Iustin Pop
    @type file_name: str
772 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
773 ea03467c Iustin Pop
    @type data: str
774 ea03467c Iustin Pop
    @param data: the new contents of the file
775 ea03467c Iustin Pop

776 8e00939c Michael Hanselmann
    """
777 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
778 8e00939c Michael Hanselmann
779 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
780 a3811745 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
781 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
782 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
783 23752136 Michael Hanselmann
784 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
785 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
786 ea03467c Iustin Pop

787 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
788 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
789 ea03467c Iustin Pop

790 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
791 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
792 ea03467c Iustin Pop

793 ea03467c Iustin Pop
    """
794 dd875d32 Michael Hanselmann
    # Rename them locally
795 d7fd1f28 Michael Hanselmann
    for old, new in rename:
796 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
797 abc1f2ce Michael Hanselmann
798 dd875d32 Michael Hanselmann
    # ... and on all nodes
799 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
800 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
801 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
802 abc1f2ce Michael Hanselmann
803 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
804 ea03467c Iustin Pop
    """Convert a job ID to string format.
805 ea03467c Iustin Pop

806 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
807 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
808 ea03467c Iustin Pop
    abstract this change.
809 ea03467c Iustin Pop

810 ea03467c Iustin Pop
    @type job_id: int or long
811 ea03467c Iustin Pop
    @param job_id: the numeric job id
812 ea03467c Iustin Pop
    @rtype: str
813 ea03467c Iustin Pop
    @return: the formatted job id
814 ea03467c Iustin Pop

815 ea03467c Iustin Pop
    """
816 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
817 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
818 85f03e0d Michael Hanselmann
    if job_id < 0:
819 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
820 85f03e0d Michael Hanselmann
821 85f03e0d Michael Hanselmann
    return str(job_id)
822 85f03e0d Michael Hanselmann
823 58b22b6e Michael Hanselmann
  @classmethod
824 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
825 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
826 58b22b6e Michael Hanselmann

827 58b22b6e Michael Hanselmann
    @type job_id: str
828 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
829 58b22b6e Michael Hanselmann
    @rtype: str
830 58b22b6e Michael Hanselmann
    @return: Directory name
831 58b22b6e Michael Hanselmann

832 58b22b6e Michael Hanselmann
    """
833 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
834 58b22b6e Michael Hanselmann
835 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
836 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
837 f1da30e6 Michael Hanselmann

838 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
839 f1da30e6 Michael Hanselmann

840 009e73d0 Iustin Pop
    @type count: integer
841 009e73d0 Iustin Pop
    @param count: how many serials to return
842 ea03467c Iustin Pop
    @rtype: str
843 ea03467c Iustin Pop
    @return: a string representing the job identifier.
844 f1da30e6 Michael Hanselmann

845 f1da30e6 Michael Hanselmann
    """
846 009e73d0 Iustin Pop
    assert count > 0
847 f1da30e6 Michael Hanselmann
    # New number
848 009e73d0 Iustin Pop
    serial = self._last_serial + count
849 f1da30e6 Michael Hanselmann
850 f1da30e6 Michael Hanselmann
    # Write to file
851 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
852 23752136 Michael Hanselmann
                                        "%s\n" % serial)
853 f1da30e6 Michael Hanselmann
854 009e73d0 Iustin Pop
    result = [self._FormatJobID(v)
855 009e73d0 Iustin Pop
              for v in range(self._last_serial, serial + 1)]
856 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
857 f1da30e6 Michael Hanselmann
    self._last_serial = serial
858 f1da30e6 Michael Hanselmann
859 009e73d0 Iustin Pop
    return result
860 f1da30e6 Michael Hanselmann
861 85f03e0d Michael Hanselmann
  @staticmethod
862 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
863 ea03467c Iustin Pop
    """Returns the job file for a given job id.
864 ea03467c Iustin Pop

865 ea03467c Iustin Pop
    @type job_id: str
866 ea03467c Iustin Pop
    @param job_id: the job identifier
867 ea03467c Iustin Pop
    @rtype: str
868 ea03467c Iustin Pop
    @return: the path to the job file
869 ea03467c Iustin Pop

870 ea03467c Iustin Pop
    """
871 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
872 f1da30e6 Michael Hanselmann
873 58b22b6e Michael Hanselmann
  @classmethod
874 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
875 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
876 ea03467c Iustin Pop

877 ea03467c Iustin Pop
    @type job_id: str
878 ea03467c Iustin Pop
    @param job_id: the job identifier
879 ea03467c Iustin Pop
    @rtype: str
880 ea03467c Iustin Pop
    @return: the path to the archived job file
881 ea03467c Iustin Pop

882 ea03467c Iustin Pop
    """
883 58b22b6e Michael Hanselmann
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
884 58b22b6e Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
885 0cb94105 Michael Hanselmann
886 85f03e0d Michael Hanselmann
  @classmethod
887 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
888 ea03467c Iustin Pop
    """Extract the job id from a filename.
889 ea03467c Iustin Pop

890 ea03467c Iustin Pop
    @type name: str
891 ea03467c Iustin Pop
    @param name: the job filename
892 ea03467c Iustin Pop
    @rtype: job id or None
893 ea03467c Iustin Pop
    @return: the job id corresponding to the given filename,
894 ea03467c Iustin Pop
        or None if the filename does not represent a valid
895 ea03467c Iustin Pop
        job file
896 ea03467c Iustin Pop

897 ea03467c Iustin Pop
    """
898 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
899 fae737ac Michael Hanselmann
    if m:
900 fae737ac Michael Hanselmann
      return m.group(1)
901 fae737ac Michael Hanselmann
    else:
902 fae737ac Michael Hanselmann
      return None
903 fae737ac Michael Hanselmann
904 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
905 911a495b Iustin Pop
    """Return all known job IDs.
906 911a495b Iustin Pop

907 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
908 911a495b Iustin Pop
    included. Currently this argument is unused.
909 911a495b Iustin Pop

910 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
911 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
912 ac0930b9 Iustin Pop
    extra IDs).
913 ac0930b9 Iustin Pop

914 ea03467c Iustin Pop
    @rtype: list
915 ea03467c Iustin Pop
    @return: the list of job IDs
916 ea03467c Iustin Pop

917 911a495b Iustin Pop
    """
918 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
919 3b87986e Iustin Pop
    jlist = utils.NiceSort(jlist)
920 f0d874fe Iustin Pop
    return jlist
921 911a495b Iustin Pop
922 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
923 ea03467c Iustin Pop
    """Returns the list of current job files.
924 ea03467c Iustin Pop

925 ea03467c Iustin Pop
    @rtype: list
926 ea03467c Iustin Pop
    @return: the list of job file names
927 ea03467c Iustin Pop

928 ea03467c Iustin Pop
    """
929 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
930 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
931 f1da30e6 Michael Hanselmann
932 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
933 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
934 ea03467c Iustin Pop

935 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
936 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
937 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
938 ea03467c Iustin Pop

939 ea03467c Iustin Pop
    @param job_id: the job id
940 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
941 ea03467c Iustin Pop
    @return: either None or the job object
942 ea03467c Iustin Pop

943 ea03467c Iustin Pop
    """
944 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
945 5685c1a5 Michael Hanselmann
    if job:
946 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
947 5685c1a5 Michael Hanselmann
      return job
948 ac0930b9 Iustin Pop
949 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
950 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
951 f1da30e6 Michael Hanselmann
    try:
952 13998ef2 Michael Hanselmann
      raw_data = utils.ReadFile(filepath)
953 f1da30e6 Michael Hanselmann
    except IOError, err:
954 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
955 f1da30e6 Michael Hanselmann
        return None
956 f1da30e6 Michael Hanselmann
      raise
957 13998ef2 Michael Hanselmann
958 13998ef2 Michael Hanselmann
    data = serializer.LoadJson(raw_data)
959 f1da30e6 Michael Hanselmann
960 94ed59a5 Iustin Pop
    try:
961 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
962 94ed59a5 Iustin Pop
    except Exception, err:
963 94ed59a5 Iustin Pop
      new_path = self._GetArchivedJobPath(job_id)
964 94ed59a5 Iustin Pop
      if filepath == new_path:
965 94ed59a5 Iustin Pop
        # job already archived (future case)
966 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s", job_id)
967 94ed59a5 Iustin Pop
      else:
968 94ed59a5 Iustin Pop
        # non-archived case
969 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s, will archive.", job_id)
970 d7fd1f28 Michael Hanselmann
        self._RenameFilesUnlocked([(filepath, new_path)])
971 94ed59a5 Iustin Pop
      return None
972 94ed59a5 Iustin Pop
973 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
974 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
975 ac0930b9 Iustin Pop
    return job
976 f1da30e6 Michael Hanselmann
977 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
978 ea03467c Iustin Pop
    """Return a list of jobs based on their IDs.
979 ea03467c Iustin Pop

980 ea03467c Iustin Pop
    @type job_ids: list
981 ea03467c Iustin Pop
    @param job_ids: either an empty list (meaning all jobs),
982 ea03467c Iustin Pop
        or a list of job IDs
983 ea03467c Iustin Pop
    @rtype: list
984 ea03467c Iustin Pop
    @return: the list of job objects
985 ea03467c Iustin Pop

986 ea03467c Iustin Pop
    """
987 911a495b Iustin Pop
    if not job_ids:
988 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
989 f1da30e6 Michael Hanselmann
990 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
991 f1da30e6 Michael Hanselmann
992 686d7433 Iustin Pop
  @staticmethod
993 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
994 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
995 686d7433 Iustin Pop

996 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
997 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
998 686d7433 Iustin Pop

999 ea03467c Iustin Pop
    @rtype: boolean
1000 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
1001 ea03467c Iustin Pop

1002 686d7433 Iustin Pop
    """
1003 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1004 686d7433 Iustin Pop
1005 3ccafd0e Iustin Pop
  @staticmethod
1006 3ccafd0e Iustin Pop
  def SetDrainFlag(drain_flag):
1007 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
1008 3ccafd0e Iustin Pop

1009 3ccafd0e Iustin Pop
    This is similar to the function L{backend.JobQueueSetDrainFlag},
1010 3ccafd0e Iustin Pop
    and in the future we might merge them.
1011 3ccafd0e Iustin Pop

1012 ea03467c Iustin Pop
    @type drain_flag: boolean
1013 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
1014 ea03467c Iustin Pop

1015 3ccafd0e Iustin Pop
    """
1016 3ccafd0e Iustin Pop
    if drain_flag:
1017 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1018 3ccafd0e Iustin Pop
    else:
1019 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1020 3ccafd0e Iustin Pop
    return True
1021 3ccafd0e Iustin Pop
1022 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1023 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
1024 85f03e0d Michael Hanselmann
    """Create and store a new job.
1025 f1da30e6 Michael Hanselmann

1026 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
1027 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
1028 c3f0a12f Iustin Pop

1029 009e73d0 Iustin Pop
    @type job_id: job ID
1030 009e73d0 Iustin Pop
    @param jod_id: the job ID for the new job
1031 c3f0a12f Iustin Pop
    @type ops: list
1032 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
1033 ea03467c Iustin Pop
    @rtype: job ID
1034 ea03467c Iustin Pop
    @return: the job ID of the newly created job
1035 ea03467c Iustin Pop
    @raise errors.JobQueueDrainError: if the job is marked for draining
1036 c3f0a12f Iustin Pop

1037 c3f0a12f Iustin Pop
    """
1038 686d7433 Iustin Pop
    if self._IsQueueMarkedDrain():
1039 2971c913 Iustin Pop
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1040 f87b405e Michael Hanselmann
1041 f87b405e Michael Hanselmann
    # Check job queue size
1042 f87b405e Michael Hanselmann
    size = len(self._ListJobFiles())
1043 f87b405e Michael Hanselmann
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1044 f87b405e Michael Hanselmann
      # TODO: Autoarchive jobs. Make sure it's not done on every job
1045 f87b405e Michael Hanselmann
      # submission, though.
1046 f87b405e Michael Hanselmann
      #size = ...
1047 f87b405e Michael Hanselmann
      pass
1048 f87b405e Michael Hanselmann
1049 f87b405e Michael Hanselmann
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1050 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
1051 f87b405e Michael Hanselmann
1052 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
1053 f1da30e6 Michael Hanselmann
1054 f1da30e6 Michael Hanselmann
    # Write to disk
1055 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
1056 f1da30e6 Michael Hanselmann
1057 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
1058 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
1059 ac0930b9 Iustin Pop
1060 85f03e0d Michael Hanselmann
    # Add to worker pool
1061 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
1062 85f03e0d Michael Hanselmann
1063 85f03e0d Michael Hanselmann
    return job.id
1064 f1da30e6 Michael Hanselmann
1065 2971c913 Iustin Pop
  @utils.LockedMethod
1066 2971c913 Iustin Pop
  @_RequireOpenQueue
1067 2971c913 Iustin Pop
  def SubmitJob(self, ops):
1068 2971c913 Iustin Pop
    """Create and store a new job.
1069 2971c913 Iustin Pop

1070 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1071 2971c913 Iustin Pop

1072 2971c913 Iustin Pop
    """
1073 009e73d0 Iustin Pop
    job_id = self._NewSerialsUnlocked(1)[0]
1074 009e73d0 Iustin Pop
    return self._SubmitJobUnlocked(job_id, ops)
1075 2971c913 Iustin Pop
1076 2971c913 Iustin Pop
  @utils.LockedMethod
1077 2971c913 Iustin Pop
  @_RequireOpenQueue
1078 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
1079 2971c913 Iustin Pop
    """Create and store multiple jobs.
1080 2971c913 Iustin Pop

1081 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1082 2971c913 Iustin Pop

1083 2971c913 Iustin Pop
    """
1084 2971c913 Iustin Pop
    results = []
1085 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1086 009e73d0 Iustin Pop
    for job_id, ops in zip(all_job_ids, jobs):
1087 2971c913 Iustin Pop
      try:
1088 009e73d0 Iustin Pop
        data = self._SubmitJobUnlocked(job_id, ops)
1089 2971c913 Iustin Pop
        status = True
1090 2971c913 Iustin Pop
      except errors.GenericError, err:
1091 2971c913 Iustin Pop
        data = str(err)
1092 2971c913 Iustin Pop
        status = False
1093 2971c913 Iustin Pop
      results.append((status, data))
1094 2971c913 Iustin Pop
1095 2971c913 Iustin Pop
    return results
1096 2971c913 Iustin Pop
1097 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1098 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
1099 ea03467c Iustin Pop
    """Update a job's on disk storage.
1100 ea03467c Iustin Pop

1101 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
1102 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
1103 ea03467c Iustin Pop
    nodes.
1104 ea03467c Iustin Pop

1105 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1106 ea03467c Iustin Pop
    @param job: the changed job
1107 ea03467c Iustin Pop

1108 ea03467c Iustin Pop
    """
1109 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
1110 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
1111 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
1112 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
1113 ac0930b9 Iustin Pop
1114 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
1115 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
1116 dfe57c22 Michael Hanselmann
1117 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
1118 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
1119 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1120 5c735209 Iustin Pop
                        timeout):
1121 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
1122 6c5a7090 Michael Hanselmann

1123 6c5a7090 Michael Hanselmann
    @type job_id: string
1124 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
1125 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1126 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1127 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1128 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1129 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1130 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1131 5c735209 Iustin Pop
    @type timeout: float
1132 5c735209 Iustin Pop
    @param timeout: maximum time to wait
1133 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1134 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1135 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1136 ea03467c Iustin Pop

1137 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1138 ea03467c Iustin Pop
        we instead return a special value,
1139 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1140 ea03467c Iustin Pop
        as such by the clients
1141 6c5a7090 Michael Hanselmann

1142 6c5a7090 Michael Hanselmann
    """
1143 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
1144 6e237482 Michael Hanselmann
1145 6e237482 Michael Hanselmann
    job_info = None
1146 6e237482 Michael Hanselmann
    log_entries = None
1147 6e237482 Michael Hanselmann
1148 5c735209 Iustin Pop
    end_time = time.time() + timeout
1149 dfe57c22 Michael Hanselmann
    while True:
1150 5c735209 Iustin Pop
      delta_time = end_time - time.time()
1151 5c735209 Iustin Pop
      if delta_time < 0:
1152 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
1153 5c735209 Iustin Pop
1154 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1155 6c5a7090 Michael Hanselmann
      if not job:
1156 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
1157 6c5a7090 Michael Hanselmann
        break
1158 dfe57c22 Michael Hanselmann
1159 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
1160 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
1161 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
1162 dfe57c22 Michael Hanselmann
1163 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
1164 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
1165 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
1166 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
1167 dfe57c22 Michael Hanselmann
      # significantly different.
1168 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1169 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1170 dfe57c22 Michael Hanselmann
1171 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
1172 e92376d7 Iustin Pop
                        constants.JOB_STATUS_RUNNING,
1173 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
1174 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
1175 6c5a7090 Michael Hanselmann
        # no changes.
1176 dfe57c22 Michael Hanselmann
        break
1177 dfe57c22 Michael Hanselmann
1178 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
1179 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
1180 6c5a7090 Michael Hanselmann
        break
1181 6c5a7090 Michael Hanselmann
1182 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
1183 6c5a7090 Michael Hanselmann
1184 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
1185 5c735209 Iustin Pop
      job.change.wait(delta_time)
1186 dfe57c22 Michael Hanselmann
1187 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
1188 dfe57c22 Michael Hanselmann
1189 6e237482 Michael Hanselmann
    if job_info is None and log_entries is None:
1190 6e237482 Michael Hanselmann
      return None
1191 6e237482 Michael Hanselmann
    else:
1192 6e237482 Michael Hanselmann
      return (job_info, log_entries)
1193 dfe57c22 Michael Hanselmann
1194 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1195 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1196 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1197 188c5e0a Michael Hanselmann
    """Cancels a job.
1198 188c5e0a Michael Hanselmann

1199 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1200 ea03467c Iustin Pop

1201 188c5e0a Michael Hanselmann
    @type job_id: string
1202 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1203 188c5e0a Michael Hanselmann

1204 188c5e0a Michael Hanselmann
    """
1205 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1206 188c5e0a Michael Hanselmann
1207 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1208 188c5e0a Michael Hanselmann
    if not job:
1209 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1210 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1211 fbf0262f Michael Hanselmann
1212 fbf0262f Michael Hanselmann
    job_status = job.CalcStatus()
1213 188c5e0a Michael Hanselmann
1214 fbf0262f Michael Hanselmann
    if job_status not in (constants.JOB_STATUS_QUEUED,
1215 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK):
1216 a9e97393 Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1217 a9e97393 Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1218 fbf0262f Michael Hanselmann
1219 fbf0262f Michael Hanselmann
    if job_status == constants.JOB_STATUS_QUEUED:
1220 fbf0262f Michael Hanselmann
      self.CancelJobUnlocked(job)
1221 fbf0262f Michael Hanselmann
      return (True, "Job %s canceled" % job.id)
1222 188c5e0a Michael Hanselmann
1223 fbf0262f Michael Hanselmann
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1224 fbf0262f Michael Hanselmann
      # The worker will notice the new status and cancel the job
1225 fbf0262f Michael Hanselmann
      try:
1226 34327f51 Iustin Pop
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1227 fbf0262f Michael Hanselmann
      finally:
1228 fbf0262f Michael Hanselmann
        self.UpdateJobUnlocked(job)
1229 fbf0262f Michael Hanselmann
      return (True, "Job %s will be canceled" % job.id)
1230 fbf0262f Michael Hanselmann
1231 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1232 fbf0262f Michael Hanselmann
  def CancelJobUnlocked(self, job):
1233 fbf0262f Michael Hanselmann
    """Marks a job as canceled.
1234 fbf0262f Michael Hanselmann

1235 fbf0262f Michael Hanselmann
    """
1236 85f03e0d Michael Hanselmann
    try:
1237 34327f51 Iustin Pop
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1238 34327f51 Iustin Pop
                            "Job canceled by request")
1239 85f03e0d Michael Hanselmann
    finally:
1240 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
1241 188c5e0a Michael Hanselmann
1242 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1243 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
1244 d7fd1f28 Michael Hanselmann
    """Archives jobs.
1245 c609f802 Michael Hanselmann

1246 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
1247 25e7b43f Iustin Pop
    @param jobs: Job objects
1248 d7fd1f28 Michael Hanselmann
    @rtype: int
1249 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
1250 c609f802 Michael Hanselmann

1251 c609f802 Michael Hanselmann
    """
1252 d7fd1f28 Michael Hanselmann
    archive_jobs = []
1253 d7fd1f28 Michael Hanselmann
    rename_files = []
1254 d7fd1f28 Michael Hanselmann
    for job in jobs:
1255 d7fd1f28 Michael Hanselmann
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1256 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_SUCCESS,
1257 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_ERROR):
1258 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
1259 d7fd1f28 Michael Hanselmann
        continue
1260 c609f802 Michael Hanselmann
1261 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
1262 c609f802 Michael Hanselmann
1263 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
1264 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
1265 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
1266 c609f802 Michael Hanselmann
1267 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
1268 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
1269 f1da30e6 Michael Hanselmann
1270 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
1271 d7fd1f28 Michael Hanselmann
                  ", ".join(job.id for job in archive_jobs))
1272 d7fd1f28 Michael Hanselmann
1273 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
1274 78d12585 Michael Hanselmann
1275 07cd723a Iustin Pop
  @utils.LockedMethod
1276 07cd723a Iustin Pop
  @_RequireOpenQueue
1277 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1278 07cd723a Iustin Pop
    """Archives a job.
1279 07cd723a Iustin Pop

1280 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1281 ea03467c Iustin Pop

1282 07cd723a Iustin Pop
    @type job_id: string
1283 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1284 78d12585 Michael Hanselmann
    @rtype: bool
1285 78d12585 Michael Hanselmann
    @return: Whether job was archived
1286 07cd723a Iustin Pop

1287 07cd723a Iustin Pop
    """
1288 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
1289 78d12585 Michael Hanselmann
1290 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1291 78d12585 Michael Hanselmann
    if not job:
1292 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1293 78d12585 Michael Hanselmann
      return False
1294 78d12585 Michael Hanselmann
1295 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
1296 07cd723a Iustin Pop
1297 07cd723a Iustin Pop
  @utils.LockedMethod
1298 07cd723a Iustin Pop
  @_RequireOpenQueue
1299 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
1300 07cd723a Iustin Pop
    """Archives all jobs based on age.
1301 07cd723a Iustin Pop

1302 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1303 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1304 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1305 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1306 07cd723a Iustin Pop

1307 07cd723a Iustin Pop
    @type age: int
1308 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1309 07cd723a Iustin Pop

1310 07cd723a Iustin Pop
    """
1311 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1312 07cd723a Iustin Pop
1313 07cd723a Iustin Pop
    now = time.time()
1314 f8ad5591 Michael Hanselmann
    end_time = now + timeout
1315 f8ad5591 Michael Hanselmann
    archived_count = 0
1316 f8ad5591 Michael Hanselmann
    last_touched = 0
1317 f8ad5591 Michael Hanselmann
1318 f8ad5591 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1319 d7fd1f28 Michael Hanselmann
    pending = []
1320 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1321 f8ad5591 Michael Hanselmann
      last_touched = idx
1322 f8ad5591 Michael Hanselmann
1323 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
1324 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
1325 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
1326 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
1327 f8ad5591 Michael Hanselmann
        break
1328 f8ad5591 Michael Hanselmann
1329 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
1330 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1331 f8ad5591 Michael Hanselmann
      if job:
1332 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
1333 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
1334 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
1335 f8ad5591 Michael Hanselmann
          else:
1336 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
1337 07cd723a Iustin Pop
        else:
1338 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
1339 f8ad5591 Michael Hanselmann
1340 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
1341 d7fd1f28 Michael Hanselmann
          pending.append(job)
1342 d7fd1f28 Michael Hanselmann
1343 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
1344 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
1345 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
1346 d7fd1f28 Michael Hanselmann
            pending = []
1347 f8ad5591 Michael Hanselmann
1348 d7fd1f28 Michael Hanselmann
    if pending:
1349 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
1350 07cd723a Iustin Pop
1351 f8ad5591 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched - 1)
1352 07cd723a Iustin Pop
1353 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
1354 ea03467c Iustin Pop
    """Returns information about a job.
1355 ea03467c Iustin Pop

1356 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1357 ea03467c Iustin Pop
    @param job: the job which we query
1358 ea03467c Iustin Pop
    @type fields: list
1359 ea03467c Iustin Pop
    @param fields: names of fields to return
1360 ea03467c Iustin Pop
    @rtype: list
1361 ea03467c Iustin Pop
    @return: list with one element for each field
1362 ea03467c Iustin Pop
    @raise errors.OpExecError: when an invalid field
1363 ea03467c Iustin Pop
        has been passed
1364 ea03467c Iustin Pop

1365 ea03467c Iustin Pop
    """
1366 e2715f69 Michael Hanselmann
    row = []
1367 e2715f69 Michael Hanselmann
    for fname in fields:
1368 e2715f69 Michael Hanselmann
      if fname == "id":
1369 e2715f69 Michael Hanselmann
        row.append(job.id)
1370 e2715f69 Michael Hanselmann
      elif fname == "status":
1371 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
1372 af30b2fd Michael Hanselmann
      elif fname == "ops":
1373 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
1374 af30b2fd Michael Hanselmann
      elif fname == "opresult":
1375 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
1376 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
1377 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
1378 5b23c34c Iustin Pop
      elif fname == "oplog":
1379 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
1380 c56ec146 Iustin Pop
      elif fname == "opstart":
1381 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
1382 c56ec146 Iustin Pop
      elif fname == "opend":
1383 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
1384 c56ec146 Iustin Pop
      elif fname == "received_ts":
1385 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
1386 c56ec146 Iustin Pop
      elif fname == "start_ts":
1387 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
1388 c56ec146 Iustin Pop
      elif fname == "end_ts":
1389 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
1390 1d2dcdfd Michael Hanselmann
      elif fname == "lock_status":
1391 1d2dcdfd Michael Hanselmann
        row.append(job.lock_status)
1392 60dd1473 Iustin Pop
      elif fname == "summary":
1393 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
1394 e2715f69 Michael Hanselmann
      else:
1395 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1396 e2715f69 Michael Hanselmann
    return row
1397 e2715f69 Michael Hanselmann
1398 85f03e0d Michael Hanselmann
  @utils.LockedMethod
1399 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1400 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1401 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1402 e2715f69 Michael Hanselmann

1403 ea03467c Iustin Pop
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1404 ea03467c Iustin Pop
    processing for each job.
1405 ea03467c Iustin Pop

1406 ea03467c Iustin Pop
    @type job_ids: list
1407 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1408 ea03467c Iustin Pop
    @type fields: list
1409 ea03467c Iustin Pop
    @param fields: names of fields to return
1410 ea03467c Iustin Pop
    @rtype: list
1411 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1412 ea03467c Iustin Pop
        the requested fields
1413 e2715f69 Michael Hanselmann

1414 e2715f69 Michael Hanselmann
    """
1415 85f03e0d Michael Hanselmann
    jobs = []
1416 e2715f69 Michael Hanselmann
1417 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
1418 85f03e0d Michael Hanselmann
      if job is None:
1419 85f03e0d Michael Hanselmann
        jobs.append(None)
1420 85f03e0d Michael Hanselmann
      else:
1421 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1422 e2715f69 Michael Hanselmann
1423 85f03e0d Michael Hanselmann
    return jobs
1424 e2715f69 Michael Hanselmann
1425 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1426 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1427 e2715f69 Michael Hanselmann
  def Shutdown(self):
1428 e2715f69 Michael Hanselmann
    """Stops the job queue.
1429 e2715f69 Michael Hanselmann

1430 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1431 ea03467c Iustin Pop

1432 e2715f69 Michael Hanselmann
    """
1433 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1434 85f03e0d Michael Hanselmann
1435 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
1436 04ab05ce Michael Hanselmann
    self._queue_lock = None