Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ fcad7225

History | View | Annotate | Download (71.6 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 b95479a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 e2715f69 Michael Hanselmann
import logging
33 f1da30e6 Michael Hanselmann
import errno
34 f1da30e6 Michael Hanselmann
import re
35 f1048938 Iustin Pop
import time
36 5685c1a5 Michael Hanselmann
import weakref
37 b95479a5 Michael Hanselmann
import threading
38 dfc8824a Michael Hanselmann
import itertools
39 498ae1cc Iustin Pop
40 6c2549d6 Guido Trotter
try:
41 6c2549d6 Guido Trotter
  # pylint: disable-msg=E0611
42 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
43 6c2549d6 Guido Trotter
except ImportError:
44 6c2549d6 Guido Trotter
  import pyinotify
45 6c2549d6 Guido Trotter
46 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
47 e2715f69 Michael Hanselmann
from ganeti import constants
48 f1da30e6 Michael Hanselmann
from ganeti import serializer
49 e2715f69 Michael Hanselmann
from ganeti import workerpool
50 99bd4f0a Guido Trotter
from ganeti import locking
51 f1da30e6 Michael Hanselmann
from ganeti import opcodes
52 7a1ecaed Iustin Pop
from ganeti import errors
53 e2715f69 Michael Hanselmann
from ganeti import mcpu
54 7996a135 Iustin Pop
from ganeti import utils
55 04ab05ce Michael Hanselmann
from ganeti import jstore
56 c3f0a12f Iustin Pop
from ganeti import rpc
57 82b22e19 René Nussbaumer
from ganeti import runtime
58 a744b676 Manuel Franceschini
from ganeti import netutils
59 989a8bee Michael Hanselmann
from ganeti import compat
60 b95479a5 Michael Hanselmann
from ganeti import ht
61 e2715f69 Michael Hanselmann
62 fbf0262f Michael Hanselmann
63 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
64 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
65 e2715f69 Michael Hanselmann
66 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
67 ebb80afa Guido Trotter
_LOCK = "_lock"
68 ebb80afa Guido Trotter
_QUEUE = "_queue"
69 99bd4f0a Guido Trotter
70 498ae1cc Iustin Pop
71 9728ae5d Iustin Pop
class CancelJob(Exception):
72 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
73 fbf0262f Michael Hanselmann

74 fbf0262f Michael Hanselmann
  """
75 fbf0262f Michael Hanselmann
76 fbf0262f Michael Hanselmann
77 70552c46 Michael Hanselmann
def TimeStampNow():
78 ea03467c Iustin Pop
  """Returns the current timestamp.
79 ea03467c Iustin Pop

80 ea03467c Iustin Pop
  @rtype: tuple
81 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
82 ea03467c Iustin Pop

83 ea03467c Iustin Pop
  """
84 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
85 70552c46 Michael Hanselmann
86 70552c46 Michael Hanselmann
87 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
88 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
89 e2715f69 Michael Hanselmann

90 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
91 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
92 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
93 ea03467c Iustin Pop
  @ivar status: the current status
94 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
95 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
96 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
97 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
98 f1048938 Iustin Pop

99 e2715f69 Michael Hanselmann
  """
100 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
101 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
102 66d895a8 Iustin Pop
               "__weakref__"]
103 66d895a8 Iustin Pop
104 85f03e0d Michael Hanselmann
  def __init__(self, op):
105 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
106 ea03467c Iustin Pop

107 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
108 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
109 ea03467c Iustin Pop

110 ea03467c Iustin Pop
    """
111 85f03e0d Michael Hanselmann
    self.input = op
112 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
113 85f03e0d Michael Hanselmann
    self.result = None
114 85f03e0d Michael Hanselmann
    self.log = []
115 70552c46 Michael Hanselmann
    self.start_timestamp = None
116 b9b5abcb Iustin Pop
    self.exec_timestamp = None
117 70552c46 Michael Hanselmann
    self.end_timestamp = None
118 f1da30e6 Michael Hanselmann
119 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
120 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
121 8f5c488d Michael Hanselmann
122 f1da30e6 Michael Hanselmann
  @classmethod
123 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
124 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
125 ea03467c Iustin Pop

126 ea03467c Iustin Pop
    @type state: dict
127 ea03467c Iustin Pop
    @param state: the serialized state
128 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
129 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
130 ea03467c Iustin Pop

131 ea03467c Iustin Pop
    """
132 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
133 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
134 85f03e0d Michael Hanselmann
    obj.status = state["status"]
135 85f03e0d Michael Hanselmann
    obj.result = state["result"]
136 85f03e0d Michael Hanselmann
    obj.log = state["log"]
137 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
138 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
139 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
140 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
141 f1da30e6 Michael Hanselmann
    return obj
142 f1da30e6 Michael Hanselmann
143 f1da30e6 Michael Hanselmann
  def Serialize(self):
144 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
145 ea03467c Iustin Pop

146 ea03467c Iustin Pop
    @rtype: dict
147 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
148 ea03467c Iustin Pop

149 ea03467c Iustin Pop
    """
150 6c5a7090 Michael Hanselmann
    return {
151 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
152 6c5a7090 Michael Hanselmann
      "status": self.status,
153 6c5a7090 Michael Hanselmann
      "result": self.result,
154 6c5a7090 Michael Hanselmann
      "log": self.log,
155 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
156 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
157 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
158 8f5c488d Michael Hanselmann
      "priority": self.priority,
159 6c5a7090 Michael Hanselmann
      }
160 f1048938 Iustin Pop
161 e2715f69 Michael Hanselmann
162 e2715f69 Michael Hanselmann
class _QueuedJob(object):
163 e2715f69 Michael Hanselmann
  """In-memory job representation.
164 e2715f69 Michael Hanselmann

165 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
166 ea03467c Iustin Pop
  be taken care of by users of this class.
167 ea03467c Iustin Pop

168 ea03467c Iustin Pop
  @type queue: L{JobQueue}
169 ea03467c Iustin Pop
  @ivar queue: the parent queue
170 ea03467c Iustin Pop
  @ivar id: the job ID
171 ea03467c Iustin Pop
  @type ops: list
172 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
173 ea03467c Iustin Pop
  @type log_serial: int
174 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
175 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
176 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
177 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
178 c0f6d0d8 Michael Hanselmann
  @ivar writable: Whether the job is allowed to be modified
179 e2715f69 Michael Hanselmann

180 e2715f69 Michael Hanselmann
  """
181 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
182 26d3fd2f Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
183 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
184 c0f6d0d8 Michael Hanselmann
               "__weakref__", "processor_lock", "writable"]
185 66d895a8 Iustin Pop
186 c0f6d0d8 Michael Hanselmann
  def __init__(self, queue, job_id, ops, writable):
187 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
188 ea03467c Iustin Pop

189 ea03467c Iustin Pop
    @type queue: L{JobQueue}
190 ea03467c Iustin Pop
    @param queue: our parent queue
191 ea03467c Iustin Pop
    @type job_id: job_id
192 ea03467c Iustin Pop
    @param job_id: our job id
193 ea03467c Iustin Pop
    @type ops: list
194 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
195 ea03467c Iustin Pop
        in _QueuedOpCodes
196 c0f6d0d8 Michael Hanselmann
    @type writable: bool
197 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
198 ea03467c Iustin Pop

199 ea03467c Iustin Pop
    """
200 e2715f69 Michael Hanselmann
    if not ops:
201 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
202 e2715f69 Michael Hanselmann
203 85f03e0d Michael Hanselmann
    self.queue = queue
204 f1da30e6 Michael Hanselmann
    self.id = job_id
205 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
206 6c5a7090 Michael Hanselmann
    self.log_serial = 0
207 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
208 c56ec146 Iustin Pop
    self.start_timestamp = None
209 c56ec146 Iustin Pop
    self.end_timestamp = None
210 6c5a7090 Michael Hanselmann
211 c0f6d0d8 Michael Hanselmann
    self._InitInMemory(self, writable)
212 fa4aa6b4 Michael Hanselmann
213 fa4aa6b4 Michael Hanselmann
  @staticmethod
214 c0f6d0d8 Michael Hanselmann
  def _InitInMemory(obj, writable):
215 fa4aa6b4 Michael Hanselmann
    """Initializes in-memory variables.
216 fa4aa6b4 Michael Hanselmann

217 fa4aa6b4 Michael Hanselmann
    """
218 c0f6d0d8 Michael Hanselmann
    obj.writable = writable
219 03b63608 Michael Hanselmann
    obj.ops_iter = None
220 26d3fd2f Michael Hanselmann
    obj.cur_opctx = None
221 f8a4adfa Michael Hanselmann
222 f8a4adfa Michael Hanselmann
    # Read-only jobs are not processed and therefore don't need a lock
223 f8a4adfa Michael Hanselmann
    if writable:
224 f8a4adfa Michael Hanselmann
      obj.processor_lock = threading.Lock()
225 f8a4adfa Michael Hanselmann
    else:
226 f8a4adfa Michael Hanselmann
      obj.processor_lock = None
227 be760ba8 Michael Hanselmann
228 9fa2e150 Michael Hanselmann
  def __repr__(self):
229 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
230 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
231 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
232 9fa2e150 Michael Hanselmann
233 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
234 9fa2e150 Michael Hanselmann
235 f1da30e6 Michael Hanselmann
  @classmethod
236 c0f6d0d8 Michael Hanselmann
  def Restore(cls, queue, state, writable):
237 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
238 ea03467c Iustin Pop

239 ea03467c Iustin Pop
    @type queue: L{JobQueue}
240 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
241 ea03467c Iustin Pop
    @type state: dict
242 ea03467c Iustin Pop
    @param state: the serialized state
243 c0f6d0d8 Michael Hanselmann
    @type writable: bool
244 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
245 ea03467c Iustin Pop
    @rtype: _JobQueue
246 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
247 ea03467c Iustin Pop

248 ea03467c Iustin Pop
    """
249 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
250 85f03e0d Michael Hanselmann
    obj.queue = queue
251 85f03e0d Michael Hanselmann
    obj.id = state["id"]
252 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
253 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
254 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
255 6c5a7090 Michael Hanselmann
256 6c5a7090 Michael Hanselmann
    obj.ops = []
257 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
258 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
259 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
260 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
261 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
262 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
263 6c5a7090 Michael Hanselmann
264 c0f6d0d8 Michael Hanselmann
    cls._InitInMemory(obj, writable)
265 be760ba8 Michael Hanselmann
266 f1da30e6 Michael Hanselmann
    return obj
267 f1da30e6 Michael Hanselmann
268 f1da30e6 Michael Hanselmann
  def Serialize(self):
269 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
270 ea03467c Iustin Pop

271 ea03467c Iustin Pop
    @rtype: dict
272 ea03467c Iustin Pop
    @return: the serialized state
273 ea03467c Iustin Pop

274 ea03467c Iustin Pop
    """
275 f1da30e6 Michael Hanselmann
    return {
276 f1da30e6 Michael Hanselmann
      "id": self.id,
277 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
278 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
279 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
280 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
281 f1da30e6 Michael Hanselmann
      }
282 f1da30e6 Michael Hanselmann
283 85f03e0d Michael Hanselmann
  def CalcStatus(self):
284 ea03467c Iustin Pop
    """Compute the status of this job.
285 ea03467c Iustin Pop

286 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
287 ea03467c Iustin Pop
    based on their status, computes the job status.
288 ea03467c Iustin Pop

289 ea03467c Iustin Pop
    The algorithm is:
290 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
291 ea03467c Iustin Pop
        status will be the same
292 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
293 ea03467c Iustin Pop
          - waitlock
294 fbf0262f Michael Hanselmann
          - canceling
295 ea03467c Iustin Pop
          - running
296 ea03467c Iustin Pop

297 ea03467c Iustin Pop
        will determine the job status
298 ea03467c Iustin Pop

299 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
300 ea03467c Iustin Pop
        and the job status will be the same
301 ea03467c Iustin Pop

302 ea03467c Iustin Pop
    @return: the job status
303 ea03467c Iustin Pop

304 ea03467c Iustin Pop
    """
305 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
306 e2715f69 Michael Hanselmann
307 e2715f69 Michael Hanselmann
    all_success = True
308 85f03e0d Michael Hanselmann
    for op in self.ops:
309 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
310 e2715f69 Michael Hanselmann
        continue
311 e2715f69 Michael Hanselmann
312 e2715f69 Michael Hanselmann
      all_success = False
313 e2715f69 Michael Hanselmann
314 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
315 e2715f69 Michael Hanselmann
        pass
316 47099cd1 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING:
317 47099cd1 Michael Hanselmann
        status = constants.JOB_STATUS_WAITING
318 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
319 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
320 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
321 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
322 fbf0262f Michael Hanselmann
        break
323 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
324 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
325 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
326 f1da30e6 Michael Hanselmann
        break
327 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
328 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
329 4cb1d919 Michael Hanselmann
        break
330 e2715f69 Michael Hanselmann
331 e2715f69 Michael Hanselmann
    if all_success:
332 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
333 e2715f69 Michael Hanselmann
334 e2715f69 Michael Hanselmann
    return status
335 e2715f69 Michael Hanselmann
336 8f5c488d Michael Hanselmann
  def CalcPriority(self):
337 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
338 8f5c488d Michael Hanselmann

339 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
340 8f5c488d Michael Hanselmann
    priority is used.
341 8f5c488d Michael Hanselmann

342 8f5c488d Michael Hanselmann
    @rtype: int
343 8f5c488d Michael Hanselmann

344 8f5c488d Michael Hanselmann
    """
345 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
346 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
347 8f5c488d Michael Hanselmann
348 8f5c488d Michael Hanselmann
    if not priorities:
349 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
350 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
351 8f5c488d Michael Hanselmann
352 8f5c488d Michael Hanselmann
    return min(priorities)
353 8f5c488d Michael Hanselmann
354 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
355 ea03467c Iustin Pop
    """Selectively returns the log entries.
356 ea03467c Iustin Pop

357 ea03467c Iustin Pop
    @type newer_than: None or int
358 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
359 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
360 ea03467c Iustin Pop
        than this value
361 ea03467c Iustin Pop
    @rtype: list
362 ea03467c Iustin Pop
    @return: the list of the log entries selected
363 ea03467c Iustin Pop

364 ea03467c Iustin Pop
    """
365 6c5a7090 Michael Hanselmann
    if newer_than is None:
366 6c5a7090 Michael Hanselmann
      serial = -1
367 6c5a7090 Michael Hanselmann
    else:
368 6c5a7090 Michael Hanselmann
      serial = newer_than
369 6c5a7090 Michael Hanselmann
370 6c5a7090 Michael Hanselmann
    entries = []
371 6c5a7090 Michael Hanselmann
    for op in self.ops:
372 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
373 6c5a7090 Michael Hanselmann
374 6c5a7090 Michael Hanselmann
    return entries
375 6c5a7090 Michael Hanselmann
376 6a290889 Guido Trotter
  def GetInfo(self, fields):
377 6a290889 Guido Trotter
    """Returns information about a job.
378 6a290889 Guido Trotter

379 6a290889 Guido Trotter
    @type fields: list
380 6a290889 Guido Trotter
    @param fields: names of fields to return
381 6a290889 Guido Trotter
    @rtype: list
382 6a290889 Guido Trotter
    @return: list with one element for each field
383 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
384 6a290889 Guido Trotter
        has been passed
385 6a290889 Guido Trotter

386 6a290889 Guido Trotter
    """
387 6a290889 Guido Trotter
    row = []
388 6a290889 Guido Trotter
    for fname in fields:
389 6a290889 Guido Trotter
      if fname == "id":
390 6a290889 Guido Trotter
        row.append(self.id)
391 6a290889 Guido Trotter
      elif fname == "status":
392 6a290889 Guido Trotter
        row.append(self.CalcStatus())
393 b8802cc4 Michael Hanselmann
      elif fname == "priority":
394 b8802cc4 Michael Hanselmann
        row.append(self.CalcPriority())
395 6a290889 Guido Trotter
      elif fname == "ops":
396 6a290889 Guido Trotter
        row.append([op.input.__getstate__() for op in self.ops])
397 6a290889 Guido Trotter
      elif fname == "opresult":
398 6a290889 Guido Trotter
        row.append([op.result for op in self.ops])
399 6a290889 Guido Trotter
      elif fname == "opstatus":
400 6a290889 Guido Trotter
        row.append([op.status for op in self.ops])
401 6a290889 Guido Trotter
      elif fname == "oplog":
402 6a290889 Guido Trotter
        row.append([op.log for op in self.ops])
403 6a290889 Guido Trotter
      elif fname == "opstart":
404 6a290889 Guido Trotter
        row.append([op.start_timestamp for op in self.ops])
405 6a290889 Guido Trotter
      elif fname == "opexec":
406 6a290889 Guido Trotter
        row.append([op.exec_timestamp for op in self.ops])
407 6a290889 Guido Trotter
      elif fname == "opend":
408 6a290889 Guido Trotter
        row.append([op.end_timestamp for op in self.ops])
409 b8802cc4 Michael Hanselmann
      elif fname == "oppriority":
410 b8802cc4 Michael Hanselmann
        row.append([op.priority for op in self.ops])
411 6a290889 Guido Trotter
      elif fname == "received_ts":
412 6a290889 Guido Trotter
        row.append(self.received_timestamp)
413 6a290889 Guido Trotter
      elif fname == "start_ts":
414 6a290889 Guido Trotter
        row.append(self.start_timestamp)
415 6a290889 Guido Trotter
      elif fname == "end_ts":
416 6a290889 Guido Trotter
        row.append(self.end_timestamp)
417 6a290889 Guido Trotter
      elif fname == "summary":
418 6a290889 Guido Trotter
        row.append([op.input.Summary() for op in self.ops])
419 6a290889 Guido Trotter
      else:
420 6a290889 Guido Trotter
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
421 6a290889 Guido Trotter
    return row
422 6a290889 Guido Trotter
423 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
424 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
425 34327f51 Iustin Pop

426 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
427 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
428 34327f51 Iustin Pop
    finalised are not changed.
429 34327f51 Iustin Pop

430 34327f51 Iustin Pop
    @param status: a given opcode status
431 34327f51 Iustin Pop
    @param result: the opcode result
432 34327f51 Iustin Pop

433 34327f51 Iustin Pop
    """
434 747f6113 Michael Hanselmann
    not_marked = True
435 747f6113 Michael Hanselmann
    for op in self.ops:
436 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
437 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
438 747f6113 Michael Hanselmann
        continue
439 747f6113 Michael Hanselmann
      op.status = status
440 747f6113 Michael Hanselmann
      op.result = result
441 747f6113 Michael Hanselmann
      not_marked = False
442 34327f51 Iustin Pop
443 66bd7445 Michael Hanselmann
  def Finalize(self):
444 66bd7445 Michael Hanselmann
    """Marks the job as finalized.
445 66bd7445 Michael Hanselmann

446 66bd7445 Michael Hanselmann
    """
447 66bd7445 Michael Hanselmann
    self.end_timestamp = TimeStampNow()
448 66bd7445 Michael Hanselmann
449 099b2870 Michael Hanselmann
  def Cancel(self):
450 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
451 a0d2fe2c Michael Hanselmann

452 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
453 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
454 a0d2fe2c Michael Hanselmann
      as canceling and a text message
455 a0d2fe2c Michael Hanselmann

456 a0d2fe2c Michael Hanselmann
    """
457 099b2870 Michael Hanselmann
    status = self.CalcStatus()
458 099b2870 Michael Hanselmann
459 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
460 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
461 099b2870 Michael Hanselmann
                             "Job canceled by request")
462 66bd7445 Michael Hanselmann
      self.Finalize()
463 86b16e9d Michael Hanselmann
      return (True, "Job %s canceled" % self.id)
464 099b2870 Michael Hanselmann
465 47099cd1 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITING:
466 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
467 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
468 86b16e9d Michael Hanselmann
      return (True, "Job %s will be canceled" % self.id)
469 099b2870 Michael Hanselmann
470 86b16e9d Michael Hanselmann
    else:
471 86b16e9d Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
472 86b16e9d Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
473 099b2870 Michael Hanselmann
474 f1048938 Iustin Pop
475 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
476 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
477 031a3e57 Michael Hanselmann
    """Initializes this class.
478 ea03467c Iustin Pop

479 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
480 031a3e57 Michael Hanselmann
    @param queue: Job queue
481 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
482 031a3e57 Michael Hanselmann
    @param job: Job object
483 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
484 031a3e57 Michael Hanselmann
    @param op: OpCode
485 031a3e57 Michael Hanselmann

486 031a3e57 Michael Hanselmann
    """
487 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
488 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
489 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
490 031a3e57 Michael Hanselmann
491 031a3e57 Michael Hanselmann
    self._queue = queue
492 031a3e57 Michael Hanselmann
    self._job = job
493 031a3e57 Michael Hanselmann
    self._op = op
494 031a3e57 Michael Hanselmann
495 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
496 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
497 dc1e2262 Michael Hanselmann

498 dc1e2262 Michael Hanselmann
    """
499 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
500 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
501 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
502 dc1e2262 Michael Hanselmann
      raise CancelJob()
503 dc1e2262 Michael Hanselmann
504 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
505 031a3e57 Michael Hanselmann
  def NotifyStart(self):
506 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
507 e92376d7 Iustin Pop

508 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
509 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
510 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
511 47099cd1 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
512 e92376d7 Iustin Pop

513 e92376d7 Iustin Pop
    """
514 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
515 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
516 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
517 fbf0262f Michael Hanselmann
518 271daef8 Iustin Pop
    # Cancel here if we were asked to
519 dc1e2262 Michael Hanselmann
    self._CheckCancel()
520 fbf0262f Michael Hanselmann
521 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
522 9bdab621 Michael Hanselmann
523 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
524 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
525 271daef8 Iustin Pop
526 271daef8 Iustin Pop
    # And finally replicate the job status
527 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
528 031a3e57 Michael Hanselmann
529 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
530 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
531 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
532 9bf5e01f Guido Trotter

533 9bf5e01f Guido Trotter
    """
534 9bf5e01f Guido Trotter
    self._job.log_serial += 1
535 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
536 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
537 9bf5e01f Guido Trotter
538 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
539 031a3e57 Michael Hanselmann
    """Append a log entry.
540 031a3e57 Michael Hanselmann

541 031a3e57 Michael Hanselmann
    """
542 031a3e57 Michael Hanselmann
    assert len(args) < 3
543 031a3e57 Michael Hanselmann
544 031a3e57 Michael Hanselmann
    if len(args) == 1:
545 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
546 031a3e57 Michael Hanselmann
      log_msg = args[0]
547 031a3e57 Michael Hanselmann
    else:
548 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
549 031a3e57 Michael Hanselmann
550 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
551 031a3e57 Michael Hanselmann
    # precision.
552 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
553 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
554 031a3e57 Michael Hanselmann
555 acf931b7 Michael Hanselmann
  def CheckCancel(self):
556 acf931b7 Michael Hanselmann
    """Check whether job has been cancelled.
557 ef2df7d3 Michael Hanselmann

558 ef2df7d3 Michael Hanselmann
    """
559 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
560 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
561 dc1e2262 Michael Hanselmann
562 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
563 dc1e2262 Michael Hanselmann
    self._CheckCancel()
564 dc1e2262 Michael Hanselmann
565 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
566 6a373640 Michael Hanselmann
    """Submits jobs for processing.
567 6a373640 Michael Hanselmann

568 6a373640 Michael Hanselmann
    See L{JobQueue.SubmitManyJobs}.
569 6a373640 Michael Hanselmann

570 6a373640 Michael Hanselmann
    """
571 6a373640 Michael Hanselmann
    # Locking is done in job queue
572 6a373640 Michael Hanselmann
    return self._queue.SubmitManyJobs(jobs)
573 6a373640 Michael Hanselmann
574 031a3e57 Michael Hanselmann
575 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
576 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
577 989a8bee Michael Hanselmann
    """Initializes this class.
578 6c2549d6 Guido Trotter

579 989a8bee Michael Hanselmann
    @type fields: list of strings
580 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
581 989a8bee Michael Hanselmann
    @type prev_job_info: string
582 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
583 989a8bee Michael Hanselmann
    @type prev_log_serial: string
584 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
585 6c2549d6 Guido Trotter

586 989a8bee Michael Hanselmann
    """
587 989a8bee Michael Hanselmann
    self._fields = fields
588 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
589 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
590 6c2549d6 Guido Trotter
591 989a8bee Michael Hanselmann
  def __call__(self, job):
592 989a8bee Michael Hanselmann
    """Checks whether job has changed.
593 6c2549d6 Guido Trotter

594 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
595 989a8bee Michael Hanselmann
    @param job: Job object
596 6c2549d6 Guido Trotter

597 6c2549d6 Guido Trotter
    """
598 c0f6d0d8 Michael Hanselmann
    assert not job.writable, "Expected read-only job"
599 c0f6d0d8 Michael Hanselmann
600 989a8bee Michael Hanselmann
    status = job.CalcStatus()
601 989a8bee Michael Hanselmann
    job_info = job.GetInfo(self._fields)
602 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
603 6c2549d6 Guido Trotter
604 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
605 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
606 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
607 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
608 6c2549d6 Guido Trotter
    # significantly different.
609 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
610 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
611 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
612 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
613 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
614 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
615 6c2549d6 Guido Trotter
616 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
617 6c2549d6 Guido Trotter
    # no changes.
618 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
619 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
620 47099cd1 Michael Hanselmann
                       constants.JOB_STATUS_WAITING) or
621 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
622 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
623 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
624 989a8bee Michael Hanselmann
      return (job_info, log_entries)
625 6c2549d6 Guido Trotter
626 989a8bee Michael Hanselmann
    return None
627 989a8bee Michael Hanselmann
628 989a8bee Michael Hanselmann
629 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
630 989a8bee Michael Hanselmann
  def __init__(self, filename):
631 989a8bee Michael Hanselmann
    """Initializes this class.
632 989a8bee Michael Hanselmann

633 989a8bee Michael Hanselmann
    @type filename: string
634 989a8bee Michael Hanselmann
    @param filename: Path to job file
635 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
636 6c2549d6 Guido Trotter

637 989a8bee Michael Hanselmann
    """
638 989a8bee Michael Hanselmann
    self._wm = pyinotify.WatchManager()
639 989a8bee Michael Hanselmann
    self._inotify_handler = \
640 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
641 989a8bee Michael Hanselmann
    self._notifier = \
642 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
643 989a8bee Michael Hanselmann
    try:
644 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
645 989a8bee Michael Hanselmann
    except Exception:
646 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
647 989a8bee Michael Hanselmann
      self._notifier.stop()
648 989a8bee Michael Hanselmann
      raise
649 989a8bee Michael Hanselmann
650 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
651 989a8bee Michael Hanselmann
    """Callback for inotify.
652 989a8bee Michael Hanselmann

653 989a8bee Michael Hanselmann
    """
654 6c2549d6 Guido Trotter
    if not notifier_enabled:
655 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
656 989a8bee Michael Hanselmann
657 989a8bee Michael Hanselmann
  def Wait(self, timeout):
658 989a8bee Michael Hanselmann
    """Waits for the job file to change.
659 989a8bee Michael Hanselmann

660 989a8bee Michael Hanselmann
    @type timeout: float
661 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
662 989a8bee Michael Hanselmann
    @return: Whether there have been events
663 989a8bee Michael Hanselmann

664 989a8bee Michael Hanselmann
    """
665 989a8bee Michael Hanselmann
    assert timeout >= 0
666 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
667 989a8bee Michael Hanselmann
    if have_events:
668 989a8bee Michael Hanselmann
      self._notifier.read_events()
669 989a8bee Michael Hanselmann
    self._notifier.process_events()
670 989a8bee Michael Hanselmann
    return have_events
671 989a8bee Michael Hanselmann
672 989a8bee Michael Hanselmann
  def Close(self):
673 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
674 989a8bee Michael Hanselmann

675 989a8bee Michael Hanselmann
    """
676 989a8bee Michael Hanselmann
    self._notifier.stop()
677 989a8bee Michael Hanselmann
678 989a8bee Michael Hanselmann
679 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
680 989a8bee Michael Hanselmann
  def __init__(self, filename):
681 989a8bee Michael Hanselmann
    """Initializes this class.
682 989a8bee Michael Hanselmann

683 989a8bee Michael Hanselmann
    @type filename: string
684 989a8bee Michael Hanselmann
    @param filename: Path to job file
685 989a8bee Michael Hanselmann

686 989a8bee Michael Hanselmann
    """
687 989a8bee Michael Hanselmann
    self._filewaiter = None
688 989a8bee Michael Hanselmann
    self._filename = filename
689 6c2549d6 Guido Trotter
690 989a8bee Michael Hanselmann
  def Wait(self, timeout):
691 989a8bee Michael Hanselmann
    """Waits for a job to change.
692 6c2549d6 Guido Trotter

693 989a8bee Michael Hanselmann
    @type timeout: float
694 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
695 989a8bee Michael Hanselmann
    @return: Whether there have been events
696 989a8bee Michael Hanselmann

697 989a8bee Michael Hanselmann
    """
698 989a8bee Michael Hanselmann
    if self._filewaiter:
699 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
700 989a8bee Michael Hanselmann
701 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
702 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
703 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
704 989a8bee Michael Hanselmann
    # race condition.
705 989a8bee Michael Hanselmann
    self._filewaiter = _JobFileChangesWaiter(self._filename)
706 989a8bee Michael Hanselmann
707 989a8bee Michael Hanselmann
    return True
708 989a8bee Michael Hanselmann
709 989a8bee Michael Hanselmann
  def Close(self):
710 989a8bee Michael Hanselmann
    """Closes underlying waiter.
711 989a8bee Michael Hanselmann

712 989a8bee Michael Hanselmann
    """
713 989a8bee Michael Hanselmann
    if self._filewaiter:
714 989a8bee Michael Hanselmann
      self._filewaiter.Close()
715 989a8bee Michael Hanselmann
716 989a8bee Michael Hanselmann
717 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
718 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
719 989a8bee Michael Hanselmann

720 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
721 989a8bee Michael Hanselmann
  the current job status has changed.
722 989a8bee Michael Hanselmann

723 989a8bee Michael Hanselmann
  """
724 989a8bee Michael Hanselmann
  @staticmethod
725 dfc8824a Michael Hanselmann
  def _CheckForChanges(counter, job_load_fn, check_fn):
726 dfc8824a Michael Hanselmann
    if counter.next() > 0:
727 dfc8824a Michael Hanselmann
      # If this isn't the first check the job is given some more time to change
728 dfc8824a Michael Hanselmann
      # again. This gives better performance for jobs generating many
729 dfc8824a Michael Hanselmann
      # changes/messages.
730 dfc8824a Michael Hanselmann
      time.sleep(0.1)
731 dfc8824a Michael Hanselmann
732 989a8bee Michael Hanselmann
    job = job_load_fn()
733 989a8bee Michael Hanselmann
    if not job:
734 989a8bee Michael Hanselmann
      raise errors.JobLost()
735 989a8bee Michael Hanselmann
736 989a8bee Michael Hanselmann
    result = check_fn(job)
737 989a8bee Michael Hanselmann
    if result is None:
738 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
739 989a8bee Michael Hanselmann
740 989a8bee Michael Hanselmann
    return result
741 989a8bee Michael Hanselmann
742 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
743 989a8bee Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout):
744 989a8bee Michael Hanselmann
    """Waits for changes on a job.
745 989a8bee Michael Hanselmann

746 989a8bee Michael Hanselmann
    @type filename: string
747 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
748 989a8bee Michael Hanselmann
    @type job_load_fn: callable
749 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
750 989a8bee Michael Hanselmann
    @type fields: list of strings
751 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
752 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
753 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
754 989a8bee Michael Hanselmann
    @type prev_log_serial: int
755 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
756 989a8bee Michael Hanselmann
    @type timeout: float
757 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
758 989a8bee Michael Hanselmann

759 989a8bee Michael Hanselmann
    """
760 dfc8824a Michael Hanselmann
    counter = itertools.count()
761 6c2549d6 Guido Trotter
    try:
762 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
763 989a8bee Michael Hanselmann
      waiter = _JobChangesWaiter(filename)
764 989a8bee Michael Hanselmann
      try:
765 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
766 dfc8824a Michael Hanselmann
                                          counter, job_load_fn, check_fn),
767 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
768 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
769 989a8bee Michael Hanselmann
      finally:
770 989a8bee Michael Hanselmann
        waiter.Close()
771 6c2549d6 Guido Trotter
    except (errors.InotifyError, errors.JobLost):
772 6c2549d6 Guido Trotter
      return None
773 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
774 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
775 6c2549d6 Guido Trotter
776 6c2549d6 Guido Trotter
777 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
778 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
779 6760e4ed Michael Hanselmann

780 6760e4ed Michael Hanselmann
  """
781 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
782 6760e4ed Michael Hanselmann
    to_encode = err
783 6760e4ed Michael Hanselmann
  else:
784 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
785 6760e4ed Michael Hanselmann
786 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
787 6760e4ed Michael Hanselmann
788 6760e4ed Michael Hanselmann
789 26d3fd2f Michael Hanselmann
class _TimeoutStrategyWrapper:
790 26d3fd2f Michael Hanselmann
  def __init__(self, fn):
791 26d3fd2f Michael Hanselmann
    """Initializes this class.
792 26d3fd2f Michael Hanselmann

793 26d3fd2f Michael Hanselmann
    """
794 26d3fd2f Michael Hanselmann
    self._fn = fn
795 26d3fd2f Michael Hanselmann
    self._next = None
796 26d3fd2f Michael Hanselmann
797 26d3fd2f Michael Hanselmann
  def _Advance(self):
798 26d3fd2f Michael Hanselmann
    """Gets the next timeout if necessary.
799 26d3fd2f Michael Hanselmann

800 26d3fd2f Michael Hanselmann
    """
801 26d3fd2f Michael Hanselmann
    if self._next is None:
802 26d3fd2f Michael Hanselmann
      self._next = self._fn()
803 26d3fd2f Michael Hanselmann
804 26d3fd2f Michael Hanselmann
  def Peek(self):
805 26d3fd2f Michael Hanselmann
    """Returns the next timeout.
806 26d3fd2f Michael Hanselmann

807 26d3fd2f Michael Hanselmann
    """
808 26d3fd2f Michael Hanselmann
    self._Advance()
809 26d3fd2f Michael Hanselmann
    return self._next
810 26d3fd2f Michael Hanselmann
811 26d3fd2f Michael Hanselmann
  def Next(self):
812 26d3fd2f Michael Hanselmann
    """Returns the current timeout and advances the internal state.
813 26d3fd2f Michael Hanselmann

814 26d3fd2f Michael Hanselmann
    """
815 26d3fd2f Michael Hanselmann
    self._Advance()
816 26d3fd2f Michael Hanselmann
    result = self._next
817 26d3fd2f Michael Hanselmann
    self._next = None
818 26d3fd2f Michael Hanselmann
    return result
819 26d3fd2f Michael Hanselmann
820 26d3fd2f Michael Hanselmann
821 b80cc518 Michael Hanselmann
class _OpExecContext:
822 26d3fd2f Michael Hanselmann
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
823 b80cc518 Michael Hanselmann
    """Initializes this class.
824 b80cc518 Michael Hanselmann

825 b80cc518 Michael Hanselmann
    """
826 b80cc518 Michael Hanselmann
    self.op = op
827 b80cc518 Michael Hanselmann
    self.index = index
828 b80cc518 Michael Hanselmann
    self.log_prefix = log_prefix
829 b80cc518 Michael Hanselmann
    self.summary = op.input.Summary()
830 b80cc518 Michael Hanselmann
831 b95479a5 Michael Hanselmann
    # Create local copy to modify
832 b95479a5 Michael Hanselmann
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
833 b95479a5 Michael Hanselmann
      self.jobdeps = op.input.depends[:]
834 b95479a5 Michael Hanselmann
    else:
835 b95479a5 Michael Hanselmann
      self.jobdeps = None
836 b95479a5 Michael Hanselmann
837 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = timeout_strategy_factory
838 26d3fd2f Michael Hanselmann
    self._ResetTimeoutStrategy()
839 26d3fd2f Michael Hanselmann
840 26d3fd2f Michael Hanselmann
  def _ResetTimeoutStrategy(self):
841 26d3fd2f Michael Hanselmann
    """Creates a new timeout strategy.
842 26d3fd2f Michael Hanselmann

843 26d3fd2f Michael Hanselmann
    """
844 26d3fd2f Michael Hanselmann
    self._timeout_strategy = \
845 26d3fd2f Michael Hanselmann
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
846 26d3fd2f Michael Hanselmann
847 26d3fd2f Michael Hanselmann
  def CheckPriorityIncrease(self):
848 26d3fd2f Michael Hanselmann
    """Checks whether priority can and should be increased.
849 26d3fd2f Michael Hanselmann

850 26d3fd2f Michael Hanselmann
    Called when locks couldn't be acquired.
851 26d3fd2f Michael Hanselmann

852 26d3fd2f Michael Hanselmann
    """
853 26d3fd2f Michael Hanselmann
    op = self.op
854 26d3fd2f Michael Hanselmann
855 26d3fd2f Michael Hanselmann
    # Exhausted all retries and next round should not use blocking acquire
856 26d3fd2f Michael Hanselmann
    # for locks?
857 26d3fd2f Michael Hanselmann
    if (self._timeout_strategy.Peek() is None and
858 26d3fd2f Michael Hanselmann
        op.priority > constants.OP_PRIO_HIGHEST):
859 26d3fd2f Michael Hanselmann
      logging.debug("Increasing priority")
860 26d3fd2f Michael Hanselmann
      op.priority -= 1
861 26d3fd2f Michael Hanselmann
      self._ResetTimeoutStrategy()
862 26d3fd2f Michael Hanselmann
      return True
863 26d3fd2f Michael Hanselmann
864 26d3fd2f Michael Hanselmann
    return False
865 26d3fd2f Michael Hanselmann
866 26d3fd2f Michael Hanselmann
  def GetNextLockTimeout(self):
867 26d3fd2f Michael Hanselmann
    """Returns the next lock acquire timeout.
868 26d3fd2f Michael Hanselmann

869 26d3fd2f Michael Hanselmann
    """
870 26d3fd2f Michael Hanselmann
    return self._timeout_strategy.Next()
871 26d3fd2f Michael Hanselmann
872 b80cc518 Michael Hanselmann
873 be760ba8 Michael Hanselmann
class _JobProcessor(object):
874 75d81fc8 Michael Hanselmann
  (DEFER,
875 75d81fc8 Michael Hanselmann
   WAITDEP,
876 75d81fc8 Michael Hanselmann
   FINISHED) = range(1, 4)
877 75d81fc8 Michael Hanselmann
878 26d3fd2f Michael Hanselmann
  def __init__(self, queue, opexec_fn, job,
879 26d3fd2f Michael Hanselmann
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
880 be760ba8 Michael Hanselmann
    """Initializes this class.
881 be760ba8 Michael Hanselmann

882 be760ba8 Michael Hanselmann
    """
883 be760ba8 Michael Hanselmann
    self.queue = queue
884 be760ba8 Michael Hanselmann
    self.opexec_fn = opexec_fn
885 be760ba8 Michael Hanselmann
    self.job = job
886 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = _timeout_strategy_factory
887 be760ba8 Michael Hanselmann
888 be760ba8 Michael Hanselmann
  @staticmethod
889 26d3fd2f Michael Hanselmann
  def _FindNextOpcode(job, timeout_strategy_factory):
890 be760ba8 Michael Hanselmann
    """Locates the next opcode to run.
891 be760ba8 Michael Hanselmann

892 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
893 be760ba8 Michael Hanselmann
    @param job: Job object
894 26d3fd2f Michael Hanselmann
    @param timeout_strategy_factory: Callable to create new timeout strategy
895 be760ba8 Michael Hanselmann

896 be760ba8 Michael Hanselmann
    """
897 be760ba8 Michael Hanselmann
    # Create some sort of a cache to speed up locating next opcode for future
898 be760ba8 Michael Hanselmann
    # lookups
899 be760ba8 Michael Hanselmann
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
900 be760ba8 Michael Hanselmann
    # pending and one for processed ops.
901 03b63608 Michael Hanselmann
    if job.ops_iter is None:
902 03b63608 Michael Hanselmann
      job.ops_iter = enumerate(job.ops)
903 be760ba8 Michael Hanselmann
904 be760ba8 Michael Hanselmann
    # Find next opcode to run
905 be760ba8 Michael Hanselmann
    while True:
906 be760ba8 Michael Hanselmann
      try:
907 03b63608 Michael Hanselmann
        (idx, op) = job.ops_iter.next()
908 be760ba8 Michael Hanselmann
      except StopIteration:
909 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for a finished job")
910 be760ba8 Michael Hanselmann
911 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_RUNNING:
912 be760ba8 Michael Hanselmann
        # Found an opcode already marked as running
913 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for job marked as running")
914 be760ba8 Michael Hanselmann
915 26d3fd2f Michael Hanselmann
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
916 26d3fd2f Michael Hanselmann
                             timeout_strategy_factory)
917 be760ba8 Michael Hanselmann
918 66bd7445 Michael Hanselmann
      if op.status not in constants.OPS_FINALIZED:
919 66bd7445 Michael Hanselmann
        return opctx
920 be760ba8 Michael Hanselmann
921 66bd7445 Michael Hanselmann
      # This is a job that was partially completed before master daemon
922 66bd7445 Michael Hanselmann
      # shutdown, so it can be expected that some opcodes are already
923 66bd7445 Michael Hanselmann
      # completed successfully (if any did error out, then the whole job
924 66bd7445 Michael Hanselmann
      # should have been aborted and not resubmitted for processing).
925 66bd7445 Michael Hanselmann
      logging.info("%s: opcode %s already processed, skipping",
926 66bd7445 Michael Hanselmann
                   opctx.log_prefix, opctx.summary)
927 be760ba8 Michael Hanselmann
928 be760ba8 Michael Hanselmann
  @staticmethod
929 be760ba8 Michael Hanselmann
  def _MarkWaitlock(job, op):
930 be760ba8 Michael Hanselmann
    """Marks an opcode as waiting for locks.
931 be760ba8 Michael Hanselmann

932 be760ba8 Michael Hanselmann
    The job's start timestamp is also set if necessary.
933 be760ba8 Michael Hanselmann

934 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
935 be760ba8 Michael Hanselmann
    @param job: Job object
936 a38e8674 Michael Hanselmann
    @type op: L{_QueuedOpCode}
937 a38e8674 Michael Hanselmann
    @param op: Opcode object
938 be760ba8 Michael Hanselmann

939 be760ba8 Michael Hanselmann
    """
940 be760ba8 Michael Hanselmann
    assert op in job.ops
941 5fd6b694 Michael Hanselmann
    assert op.status in (constants.OP_STATUS_QUEUED,
942 47099cd1 Michael Hanselmann
                         constants.OP_STATUS_WAITING)
943 5fd6b694 Michael Hanselmann
944 5fd6b694 Michael Hanselmann
    update = False
945 be760ba8 Michael Hanselmann
946 be760ba8 Michael Hanselmann
    op.result = None
947 5fd6b694 Michael Hanselmann
948 5fd6b694 Michael Hanselmann
    if op.status == constants.OP_STATUS_QUEUED:
949 47099cd1 Michael Hanselmann
      op.status = constants.OP_STATUS_WAITING
950 5fd6b694 Michael Hanselmann
      update = True
951 5fd6b694 Michael Hanselmann
952 5fd6b694 Michael Hanselmann
    if op.start_timestamp is None:
953 5fd6b694 Michael Hanselmann
      op.start_timestamp = TimeStampNow()
954 5fd6b694 Michael Hanselmann
      update = True
955 be760ba8 Michael Hanselmann
956 be760ba8 Michael Hanselmann
    if job.start_timestamp is None:
957 be760ba8 Michael Hanselmann
      job.start_timestamp = op.start_timestamp
958 5fd6b694 Michael Hanselmann
      update = True
959 5fd6b694 Michael Hanselmann
960 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
961 5fd6b694 Michael Hanselmann
962 5fd6b694 Michael Hanselmann
    return update
963 be760ba8 Michael Hanselmann
964 b95479a5 Michael Hanselmann
  @staticmethod
965 b95479a5 Michael Hanselmann
  def _CheckDependencies(queue, job, opctx):
966 b95479a5 Michael Hanselmann
    """Checks if an opcode has dependencies and if so, processes them.
967 b95479a5 Michael Hanselmann

968 b95479a5 Michael Hanselmann
    @type queue: L{JobQueue}
969 b95479a5 Michael Hanselmann
    @param queue: Queue object
970 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
971 b95479a5 Michael Hanselmann
    @param job: Job object
972 b95479a5 Michael Hanselmann
    @type opctx: L{_OpExecContext}
973 b95479a5 Michael Hanselmann
    @param opctx: Opcode execution context
974 b95479a5 Michael Hanselmann
    @rtype: bool
975 b95479a5 Michael Hanselmann
    @return: Whether opcode will be re-scheduled by dependency tracker
976 b95479a5 Michael Hanselmann

977 b95479a5 Michael Hanselmann
    """
978 b95479a5 Michael Hanselmann
    op = opctx.op
979 b95479a5 Michael Hanselmann
980 b95479a5 Michael Hanselmann
    result = False
981 b95479a5 Michael Hanselmann
982 b95479a5 Michael Hanselmann
    while opctx.jobdeps:
983 b95479a5 Michael Hanselmann
      (dep_job_id, dep_status) = opctx.jobdeps[0]
984 b95479a5 Michael Hanselmann
985 b95479a5 Michael Hanselmann
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
986 b95479a5 Michael Hanselmann
                                                          dep_status)
987 b95479a5 Michael Hanselmann
      assert ht.TNonEmptyString(depmsg), "No dependency message"
988 b95479a5 Michael Hanselmann
989 b95479a5 Michael Hanselmann
      logging.info("%s: %s", opctx.log_prefix, depmsg)
990 b95479a5 Michael Hanselmann
991 b95479a5 Michael Hanselmann
      if depresult == _JobDependencyManager.CONTINUE:
992 b95479a5 Michael Hanselmann
        # Remove dependency and continue
993 b95479a5 Michael Hanselmann
        opctx.jobdeps.pop(0)
994 b95479a5 Michael Hanselmann
995 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.WAIT:
996 b95479a5 Michael Hanselmann
        # Need to wait for notification, dependency tracker will re-add job
997 b95479a5 Michael Hanselmann
        # to workerpool
998 b95479a5 Michael Hanselmann
        result = True
999 b95479a5 Michael Hanselmann
        break
1000 b95479a5 Michael Hanselmann
1001 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.CANCEL:
1002 b95479a5 Michael Hanselmann
        # Job was cancelled, cancel this job as well
1003 b95479a5 Michael Hanselmann
        job.Cancel()
1004 b95479a5 Michael Hanselmann
        assert op.status == constants.OP_STATUS_CANCELING
1005 b95479a5 Michael Hanselmann
        break
1006 b95479a5 Michael Hanselmann
1007 b95479a5 Michael Hanselmann
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1008 b95479a5 Michael Hanselmann
                         _JobDependencyManager.ERROR):
1009 b95479a5 Michael Hanselmann
        # Job failed or there was an error, this job must fail
1010 b95479a5 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1011 b95479a5 Michael Hanselmann
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1012 b95479a5 Michael Hanselmann
        break
1013 b95479a5 Michael Hanselmann
1014 b95479a5 Michael Hanselmann
      else:
1015 b95479a5 Michael Hanselmann
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1016 b95479a5 Michael Hanselmann
                                     depresult)
1017 b95479a5 Michael Hanselmann
1018 b95479a5 Michael Hanselmann
    return result
1019 b95479a5 Michael Hanselmann
1020 b80cc518 Michael Hanselmann
  def _ExecOpCodeUnlocked(self, opctx):
1021 be760ba8 Michael Hanselmann
    """Processes one opcode and returns the result.
1022 be760ba8 Michael Hanselmann

1023 be760ba8 Michael Hanselmann
    """
1024 b80cc518 Michael Hanselmann
    op = opctx.op
1025 b80cc518 Michael Hanselmann
1026 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1027 be760ba8 Michael Hanselmann
1028 26d3fd2f Michael Hanselmann
    timeout = opctx.GetNextLockTimeout()
1029 26d3fd2f Michael Hanselmann
1030 be760ba8 Michael Hanselmann
    try:
1031 be760ba8 Michael Hanselmann
      # Make sure not to hold queue lock while calling ExecOpCode
1032 be760ba8 Michael Hanselmann
      result = self.opexec_fn(op.input,
1033 26d3fd2f Michael Hanselmann
                              _OpExecCallbacks(self.queue, self.job, op),
1034 f23db633 Michael Hanselmann
                              timeout=timeout, priority=op.priority)
1035 26d3fd2f Michael Hanselmann
    except mcpu.LockAcquireTimeout:
1036 26d3fd2f Michael Hanselmann
      assert timeout is not None, "Received timeout for blocking acquire"
1037 26d3fd2f Michael Hanselmann
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1038 9e49dfc5 Michael Hanselmann
1039 47099cd1 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_WAITING,
1040 9e49dfc5 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1041 9e49dfc5 Michael Hanselmann
1042 9e49dfc5 Michael Hanselmann
      # Was job cancelled while we were waiting for the lock?
1043 9e49dfc5 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELING:
1044 9e49dfc5 Michael Hanselmann
        return (constants.OP_STATUS_CANCELING, None)
1045 9e49dfc5 Michael Hanselmann
1046 5fd6b694 Michael Hanselmann
      # Stay in waitlock while trying to re-acquire lock
1047 47099cd1 Michael Hanselmann
      return (constants.OP_STATUS_WAITING, None)
1048 be760ba8 Michael Hanselmann
    except CancelJob:
1049 b80cc518 Michael Hanselmann
      logging.exception("%s: Canceling job", opctx.log_prefix)
1050 be760ba8 Michael Hanselmann
      assert op.status == constants.OP_STATUS_CANCELING
1051 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_CANCELING, None)
1052 be760ba8 Michael Hanselmann
    except Exception, err: # pylint: disable-msg=W0703
1053 b80cc518 Michael Hanselmann
      logging.exception("%s: Caught exception in %s",
1054 b80cc518 Michael Hanselmann
                        opctx.log_prefix, opctx.summary)
1055 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1056 be760ba8 Michael Hanselmann
    else:
1057 b80cc518 Michael Hanselmann
      logging.debug("%s: %s successful",
1058 b80cc518 Michael Hanselmann
                    opctx.log_prefix, opctx.summary)
1059 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_SUCCESS, result)
1060 be760ba8 Michael Hanselmann
1061 26d3fd2f Michael Hanselmann
  def __call__(self, _nextop_fn=None):
1062 be760ba8 Michael Hanselmann
    """Continues execution of a job.
1063 be760ba8 Michael Hanselmann

1064 26d3fd2f Michael Hanselmann
    @param _nextop_fn: Callback function for tests
1065 75d81fc8 Michael Hanselmann
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1066 75d81fc8 Michael Hanselmann
      be deferred and C{WAITDEP} if the dependency manager
1067 75d81fc8 Michael Hanselmann
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1068 be760ba8 Michael Hanselmann

1069 be760ba8 Michael Hanselmann
    """
1070 be760ba8 Michael Hanselmann
    queue = self.queue
1071 be760ba8 Michael Hanselmann
    job = self.job
1072 be760ba8 Michael Hanselmann
1073 be760ba8 Michael Hanselmann
    logging.debug("Processing job %s", job.id)
1074 be760ba8 Michael Hanselmann
1075 be760ba8 Michael Hanselmann
    queue.acquire(shared=1)
1076 be760ba8 Michael Hanselmann
    try:
1077 be760ba8 Michael Hanselmann
      opcount = len(job.ops)
1078 be760ba8 Michael Hanselmann
1079 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Expected writable job"
1080 c0f6d0d8 Michael Hanselmann
1081 66bd7445 Michael Hanselmann
      # Don't do anything for finalized jobs
1082 66bd7445 Michael Hanselmann
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1083 75d81fc8 Michael Hanselmann
        return self.FINISHED
1084 66bd7445 Michael Hanselmann
1085 26d3fd2f Michael Hanselmann
      # Is a previous opcode still pending?
1086 26d3fd2f Michael Hanselmann
      if job.cur_opctx:
1087 26d3fd2f Michael Hanselmann
        opctx = job.cur_opctx
1088 5fd6b694 Michael Hanselmann
        job.cur_opctx = None
1089 26d3fd2f Michael Hanselmann
      else:
1090 26d3fd2f Michael Hanselmann
        if __debug__ and _nextop_fn:
1091 26d3fd2f Michael Hanselmann
          _nextop_fn()
1092 26d3fd2f Michael Hanselmann
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1093 26d3fd2f Michael Hanselmann
1094 b80cc518 Michael Hanselmann
      op = opctx.op
1095 be760ba8 Michael Hanselmann
1096 be760ba8 Michael Hanselmann
      # Consistency check
1097 be760ba8 Michael Hanselmann
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1098 66bd7445 Michael Hanselmann
                                     constants.OP_STATUS_CANCELING)
1099 5fd6b694 Michael Hanselmann
                        for i in job.ops[opctx.index + 1:])
1100 be760ba8 Michael Hanselmann
1101 be760ba8 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_QUEUED,
1102 47099cd1 Michael Hanselmann
                           constants.OP_STATUS_WAITING,
1103 66bd7445 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1104 be760ba8 Michael Hanselmann
1105 26d3fd2f Michael Hanselmann
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1106 26d3fd2f Michael Hanselmann
              op.priority >= constants.OP_PRIO_HIGHEST)
1107 26d3fd2f Michael Hanselmann
1108 b95479a5 Michael Hanselmann
      waitjob = None
1109 b95479a5 Michael Hanselmann
1110 66bd7445 Michael Hanselmann
      if op.status != constants.OP_STATUS_CANCELING:
1111 30c945d0 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
1112 47099cd1 Michael Hanselmann
                             constants.OP_STATUS_WAITING)
1113 30c945d0 Michael Hanselmann
1114 be760ba8 Michael Hanselmann
        # Prepare to start opcode
1115 5fd6b694 Michael Hanselmann
        if self._MarkWaitlock(job, op):
1116 5fd6b694 Michael Hanselmann
          # Write to disk
1117 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1118 be760ba8 Michael Hanselmann
1119 47099cd1 Michael Hanselmann
        assert op.status == constants.OP_STATUS_WAITING
1120 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1121 5fd6b694 Michael Hanselmann
        assert job.start_timestamp and op.start_timestamp
1122 b95479a5 Michael Hanselmann
        assert waitjob is None
1123 b95479a5 Michael Hanselmann
1124 b95479a5 Michael Hanselmann
        # Check if waiting for a job is necessary
1125 b95479a5 Michael Hanselmann
        waitjob = self._CheckDependencies(queue, job, opctx)
1126 be760ba8 Michael Hanselmann
1127 47099cd1 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_WAITING,
1128 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_CANCELING,
1129 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_ERROR)
1130 be760ba8 Michael Hanselmann
1131 b95479a5 Michael Hanselmann
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1132 b95479a5 Michael Hanselmann
                                         constants.OP_STATUS_ERROR)):
1133 b95479a5 Michael Hanselmann
          logging.info("%s: opcode %s waiting for locks",
1134 b95479a5 Michael Hanselmann
                       opctx.log_prefix, opctx.summary)
1135 be760ba8 Michael Hanselmann
1136 b95479a5 Michael Hanselmann
          assert not opctx.jobdeps, "Not all dependencies were removed"
1137 b95479a5 Michael Hanselmann
1138 b95479a5 Michael Hanselmann
          queue.release()
1139 b95479a5 Michael Hanselmann
          try:
1140 b95479a5 Michael Hanselmann
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1141 b95479a5 Michael Hanselmann
          finally:
1142 b95479a5 Michael Hanselmann
            queue.acquire(shared=1)
1143 b95479a5 Michael Hanselmann
1144 b95479a5 Michael Hanselmann
          op.status = op_status
1145 b95479a5 Michael Hanselmann
          op.result = op_result
1146 b95479a5 Michael Hanselmann
1147 b95479a5 Michael Hanselmann
          assert not waitjob
1148 be760ba8 Michael Hanselmann
1149 47099cd1 Michael Hanselmann
        if op.status == constants.OP_STATUS_WAITING:
1150 26d3fd2f Michael Hanselmann
          # Couldn't get locks in time
1151 26d3fd2f Michael Hanselmann
          assert not op.end_timestamp
1152 be760ba8 Michael Hanselmann
        else:
1153 26d3fd2f Michael Hanselmann
          # Finalize opcode
1154 26d3fd2f Michael Hanselmann
          op.end_timestamp = TimeStampNow()
1155 be760ba8 Michael Hanselmann
1156 26d3fd2f Michael Hanselmann
          if op.status == constants.OP_STATUS_CANCELING:
1157 26d3fd2f Michael Hanselmann
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1158 26d3fd2f Michael Hanselmann
                                  for i in job.ops[opctx.index:])
1159 26d3fd2f Michael Hanselmann
          else:
1160 26d3fd2f Michael Hanselmann
            assert op.status in constants.OPS_FINALIZED
1161 be760ba8 Michael Hanselmann
1162 47099cd1 Michael Hanselmann
      if op.status == constants.OP_STATUS_WAITING or waitjob:
1163 be760ba8 Michael Hanselmann
        finalize = False
1164 be760ba8 Michael Hanselmann
1165 b95479a5 Michael Hanselmann
        if not waitjob and opctx.CheckPriorityIncrease():
1166 5fd6b694 Michael Hanselmann
          # Priority was changed, need to update on-disk file
1167 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1168 be760ba8 Michael Hanselmann
1169 26d3fd2f Michael Hanselmann
        # Keep around for another round
1170 26d3fd2f Michael Hanselmann
        job.cur_opctx = opctx
1171 be760ba8 Michael Hanselmann
1172 26d3fd2f Michael Hanselmann
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1173 26d3fd2f Michael Hanselmann
                op.priority >= constants.OP_PRIO_HIGHEST)
1174 be760ba8 Michael Hanselmann
1175 26d3fd2f Michael Hanselmann
        # In no case must the status be finalized here
1176 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1177 be760ba8 Michael Hanselmann
1178 be760ba8 Michael Hanselmann
      else:
1179 26d3fd2f Michael Hanselmann
        # Ensure all opcodes so far have been successful
1180 26d3fd2f Michael Hanselmann
        assert (opctx.index == 0 or
1181 26d3fd2f Michael Hanselmann
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1182 26d3fd2f Michael Hanselmann
                           for i in job.ops[:opctx.index]))
1183 26d3fd2f Michael Hanselmann
1184 26d3fd2f Michael Hanselmann
        # Reset context
1185 26d3fd2f Michael Hanselmann
        job.cur_opctx = None
1186 26d3fd2f Michael Hanselmann
1187 26d3fd2f Michael Hanselmann
        if op.status == constants.OP_STATUS_SUCCESS:
1188 26d3fd2f Michael Hanselmann
          finalize = False
1189 26d3fd2f Michael Hanselmann
1190 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_ERROR:
1191 26d3fd2f Michael Hanselmann
          # Ensure failed opcode has an exception as its result
1192 26d3fd2f Michael Hanselmann
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1193 26d3fd2f Michael Hanselmann
1194 26d3fd2f Michael Hanselmann
          to_encode = errors.OpExecError("Preceding opcode failed")
1195 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1196 26d3fd2f Michael Hanselmann
                                _EncodeOpError(to_encode))
1197 26d3fd2f Michael Hanselmann
          finalize = True
1198 be760ba8 Michael Hanselmann
1199 26d3fd2f Michael Hanselmann
          # Consistency check
1200 26d3fd2f Michael Hanselmann
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1201 26d3fd2f Michael Hanselmann
                            errors.GetEncodedError(i.result)
1202 26d3fd2f Michael Hanselmann
                            for i in job.ops[opctx.index:])
1203 be760ba8 Michael Hanselmann
1204 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELING:
1205 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1206 26d3fd2f Michael Hanselmann
                                "Job canceled by request")
1207 26d3fd2f Michael Hanselmann
          finalize = True
1208 26d3fd2f Michael Hanselmann
1209 26d3fd2f Michael Hanselmann
        else:
1210 26d3fd2f Michael Hanselmann
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1211 26d3fd2f Michael Hanselmann
1212 66bd7445 Michael Hanselmann
        if opctx.index == (opcount - 1):
1213 66bd7445 Michael Hanselmann
          # Finalize on last opcode
1214 66bd7445 Michael Hanselmann
          finalize = True
1215 66bd7445 Michael Hanselmann
1216 66bd7445 Michael Hanselmann
        if finalize:
1217 26d3fd2f Michael Hanselmann
          # All opcodes have been run, finalize job
1218 66bd7445 Michael Hanselmann
          job.Finalize()
1219 26d3fd2f Michael Hanselmann
1220 26d3fd2f Michael Hanselmann
        # Write to disk. If the job status is final, this is the final write
1221 26d3fd2f Michael Hanselmann
        # allowed. Once the file has been written, it can be archived anytime.
1222 26d3fd2f Michael Hanselmann
        queue.UpdateJobUnlocked(job)
1223 be760ba8 Michael Hanselmann
1224 b95479a5 Michael Hanselmann
        assert not waitjob
1225 b95479a5 Michael Hanselmann
1226 66bd7445 Michael Hanselmann
        if finalize:
1227 26d3fd2f Michael Hanselmann
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1228 75d81fc8 Michael Hanselmann
          return self.FINISHED
1229 be760ba8 Michael Hanselmann
1230 b95479a5 Michael Hanselmann
      assert not waitjob or queue.depmgr.JobWaiting(job)
1231 b95479a5 Michael Hanselmann
1232 75d81fc8 Michael Hanselmann
      if waitjob:
1233 75d81fc8 Michael Hanselmann
        return self.WAITDEP
1234 75d81fc8 Michael Hanselmann
      else:
1235 75d81fc8 Michael Hanselmann
        return self.DEFER
1236 be760ba8 Michael Hanselmann
    finally:
1237 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Job became read-only while being processed"
1238 be760ba8 Michael Hanselmann
      queue.release()
1239 be760ba8 Michael Hanselmann
1240 be760ba8 Michael Hanselmann
1241 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
1242 031a3e57 Michael Hanselmann
  """The actual job workers.
1243 031a3e57 Michael Hanselmann

1244 031a3e57 Michael Hanselmann
  """
1245 7260cfbe Iustin Pop
  def RunTask(self, job): # pylint: disable-msg=W0221
1246 e2715f69 Michael Hanselmann
    """Job executor.
1247 e2715f69 Michael Hanselmann

1248 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1249 ea03467c Iustin Pop
    @param job: the job to be processed
1250 ea03467c Iustin Pop

1251 e2715f69 Michael Hanselmann
    """
1252 f8a4adfa Michael Hanselmann
    assert job.writable, "Expected writable job"
1253 f8a4adfa Michael Hanselmann
1254 b95479a5 Michael Hanselmann
    # Ensure only one worker is active on a single job. If a job registers for
1255 b95479a5 Michael Hanselmann
    # a dependency job, and the other job notifies before the first worker is
1256 b95479a5 Michael Hanselmann
    # done, the job can end up in the tasklist more than once.
1257 b95479a5 Michael Hanselmann
    job.processor_lock.acquire()
1258 b95479a5 Michael Hanselmann
    try:
1259 b95479a5 Michael Hanselmann
      return self._RunTaskInner(job)
1260 b95479a5 Michael Hanselmann
    finally:
1261 b95479a5 Michael Hanselmann
      job.processor_lock.release()
1262 b95479a5 Michael Hanselmann
1263 b95479a5 Michael Hanselmann
  def _RunTaskInner(self, job):
1264 b95479a5 Michael Hanselmann
    """Executes a job.
1265 b95479a5 Michael Hanselmann

1266 b95479a5 Michael Hanselmann
    Must be called with per-job lock acquired.
1267 b95479a5 Michael Hanselmann

1268 b95479a5 Michael Hanselmann
    """
1269 be760ba8 Michael Hanselmann
    queue = job.queue
1270 be760ba8 Michael Hanselmann
    assert queue == self.pool.queue
1271 be760ba8 Michael Hanselmann
1272 0aeeb6e3 Michael Hanselmann
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1273 0aeeb6e3 Michael Hanselmann
    setname_fn(None)
1274 daba67c7 Michael Hanselmann
1275 be760ba8 Michael Hanselmann
    proc = mcpu.Processor(queue.context, job.id)
1276 be760ba8 Michael Hanselmann
1277 0aeeb6e3 Michael Hanselmann
    # Create wrapper for setting thread name
1278 0aeeb6e3 Michael Hanselmann
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1279 0aeeb6e3 Michael Hanselmann
                                    proc.ExecOpCode)
1280 0aeeb6e3 Michael Hanselmann
1281 75d81fc8 Michael Hanselmann
    result = _JobProcessor(queue, wrap_execop_fn, job)()
1282 75d81fc8 Michael Hanselmann
1283 75d81fc8 Michael Hanselmann
    if result == _JobProcessor.FINISHED:
1284 75d81fc8 Michael Hanselmann
      # Notify waiting jobs
1285 75d81fc8 Michael Hanselmann
      queue.depmgr.NotifyWaiters(job.id)
1286 75d81fc8 Michael Hanselmann
1287 75d81fc8 Michael Hanselmann
    elif result == _JobProcessor.DEFER:
1288 be760ba8 Michael Hanselmann
      # Schedule again
1289 26d3fd2f Michael Hanselmann
      raise workerpool.DeferTask(priority=job.CalcPriority())
1290 e2715f69 Michael Hanselmann
1291 75d81fc8 Michael Hanselmann
    elif result == _JobProcessor.WAITDEP:
1292 75d81fc8 Michael Hanselmann
      # No-op, dependency manager will re-schedule
1293 75d81fc8 Michael Hanselmann
      pass
1294 75d81fc8 Michael Hanselmann
1295 75d81fc8 Michael Hanselmann
    else:
1296 75d81fc8 Michael Hanselmann
      raise errors.ProgrammerError("Job processor returned unknown status %s" %
1297 75d81fc8 Michael Hanselmann
                                   (result, ))
1298 75d81fc8 Michael Hanselmann
1299 0aeeb6e3 Michael Hanselmann
  @staticmethod
1300 0aeeb6e3 Michael Hanselmann
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1301 0aeeb6e3 Michael Hanselmann
    """Updates the worker thread name to include a short summary of the opcode.
1302 0aeeb6e3 Michael Hanselmann

1303 0aeeb6e3 Michael Hanselmann
    @param setname_fn: Callable setting worker thread name
1304 0aeeb6e3 Michael Hanselmann
    @param execop_fn: Callable for executing opcode (usually
1305 0aeeb6e3 Michael Hanselmann
                      L{mcpu.Processor.ExecOpCode})
1306 0aeeb6e3 Michael Hanselmann

1307 0aeeb6e3 Michael Hanselmann
    """
1308 0aeeb6e3 Michael Hanselmann
    setname_fn(op)
1309 0aeeb6e3 Michael Hanselmann
    try:
1310 0aeeb6e3 Michael Hanselmann
      return execop_fn(op, *args, **kwargs)
1311 0aeeb6e3 Michael Hanselmann
    finally:
1312 0aeeb6e3 Michael Hanselmann
      setname_fn(None)
1313 0aeeb6e3 Michael Hanselmann
1314 0aeeb6e3 Michael Hanselmann
  @staticmethod
1315 0aeeb6e3 Michael Hanselmann
  def _GetWorkerName(job, op):
1316 0aeeb6e3 Michael Hanselmann
    """Sets the worker thread name.
1317 0aeeb6e3 Michael Hanselmann

1318 0aeeb6e3 Michael Hanselmann
    @type job: L{_QueuedJob}
1319 0aeeb6e3 Michael Hanselmann
    @type op: L{opcodes.OpCode}
1320 0aeeb6e3 Michael Hanselmann

1321 0aeeb6e3 Michael Hanselmann
    """
1322 0aeeb6e3 Michael Hanselmann
    parts = ["Job%s" % job.id]
1323 0aeeb6e3 Michael Hanselmann
1324 0aeeb6e3 Michael Hanselmann
    if op:
1325 0aeeb6e3 Michael Hanselmann
      parts.append(op.TinySummary())
1326 0aeeb6e3 Michael Hanselmann
1327 0aeeb6e3 Michael Hanselmann
    return "/".join(parts)
1328 0aeeb6e3 Michael Hanselmann
1329 e2715f69 Michael Hanselmann
1330 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
1331 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
1332 ea03467c Iustin Pop

1333 ea03467c Iustin Pop
  """
1334 5bdce580 Michael Hanselmann
  def __init__(self, queue):
1335 0aeeb6e3 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("Jq",
1336 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
1337 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
1338 5bdce580 Michael Hanselmann
    self.queue = queue
1339 e2715f69 Michael Hanselmann
1340 e2715f69 Michael Hanselmann
1341 b95479a5 Michael Hanselmann
class _JobDependencyManager:
1342 b95479a5 Michael Hanselmann
  """Keeps track of job dependencies.
1343 b95479a5 Michael Hanselmann

1344 b95479a5 Michael Hanselmann
  """
1345 b95479a5 Michael Hanselmann
  (WAIT,
1346 b95479a5 Michael Hanselmann
   ERROR,
1347 b95479a5 Michael Hanselmann
   CANCEL,
1348 b95479a5 Michael Hanselmann
   CONTINUE,
1349 b95479a5 Michael Hanselmann
   WRONGSTATUS) = range(1, 6)
1350 b95479a5 Michael Hanselmann
1351 b95479a5 Michael Hanselmann
  def __init__(self, getstatus_fn, enqueue_fn):
1352 b95479a5 Michael Hanselmann
    """Initializes this class.
1353 b95479a5 Michael Hanselmann

1354 b95479a5 Michael Hanselmann
    """
1355 b95479a5 Michael Hanselmann
    self._getstatus_fn = getstatus_fn
1356 b95479a5 Michael Hanselmann
    self._enqueue_fn = enqueue_fn
1357 b95479a5 Michael Hanselmann
1358 b95479a5 Michael Hanselmann
    self._waiters = {}
1359 b95479a5 Michael Hanselmann
    self._lock = locking.SharedLock("JobDepMgr")
1360 b95479a5 Michael Hanselmann
1361 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1362 fcb21ad7 Michael Hanselmann
  def GetLockInfo(self, requested): # pylint: disable-msg=W0613
1363 fcb21ad7 Michael Hanselmann
    """Retrieves information about waiting jobs.
1364 fcb21ad7 Michael Hanselmann

1365 fcb21ad7 Michael Hanselmann
    @type requested: set
1366 fcb21ad7 Michael Hanselmann
    @param requested: Requested information, see C{query.LQ_*}
1367 fcb21ad7 Michael Hanselmann

1368 fcb21ad7 Michael Hanselmann
    """
1369 fcb21ad7 Michael Hanselmann
    # No need to sort here, that's being done by the lock manager and query
1370 fcb21ad7 Michael Hanselmann
    # library. There are no priorities for notifying jobs, hence all show up as
1371 fcb21ad7 Michael Hanselmann
    # one item under "pending".
1372 fcb21ad7 Michael Hanselmann
    return [("job/%s" % job_id, None, None,
1373 fcb21ad7 Michael Hanselmann
             [("job", [job.id for job in waiters])])
1374 fcb21ad7 Michael Hanselmann
            for job_id, waiters in self._waiters.items()
1375 fcb21ad7 Michael Hanselmann
            if waiters]
1376 fcb21ad7 Michael Hanselmann
1377 fcb21ad7 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1378 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
1379 b95479a5 Michael Hanselmann
    """Checks if a job is waiting.
1380 b95479a5 Michael Hanselmann

1381 b95479a5 Michael Hanselmann
    """
1382 b95479a5 Michael Hanselmann
    return compat.any(job in jobs
1383 b95479a5 Michael Hanselmann
                      for jobs in self._waiters.values())
1384 b95479a5 Michael Hanselmann
1385 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1386 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1387 b95479a5 Michael Hanselmann
    """Checks if a dependency job has the requested status.
1388 b95479a5 Michael Hanselmann

1389 b95479a5 Michael Hanselmann
    If the other job is not yet in a finalized status, the calling job will be
1390 b95479a5 Michael Hanselmann
    notified (re-added to the workerpool) at a later point.
1391 b95479a5 Michael Hanselmann

1392 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1393 b95479a5 Michael Hanselmann
    @param job: Job object
1394 b95479a5 Michael Hanselmann
    @type dep_job_id: string
1395 b95479a5 Michael Hanselmann
    @param dep_job_id: ID of dependency job
1396 b95479a5 Michael Hanselmann
    @type dep_status: list
1397 b95479a5 Michael Hanselmann
    @param dep_status: Required status
1398 b95479a5 Michael Hanselmann

1399 b95479a5 Michael Hanselmann
    """
1400 b95479a5 Michael Hanselmann
    assert ht.TString(job.id)
1401 b95479a5 Michael Hanselmann
    assert ht.TString(dep_job_id)
1402 b95479a5 Michael Hanselmann
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1403 b95479a5 Michael Hanselmann
1404 b95479a5 Michael Hanselmann
    if job.id == dep_job_id:
1405 b95479a5 Michael Hanselmann
      return (self.ERROR, "Job can't depend on itself")
1406 b95479a5 Michael Hanselmann
1407 b95479a5 Michael Hanselmann
    # Get status of dependency job
1408 b95479a5 Michael Hanselmann
    try:
1409 b95479a5 Michael Hanselmann
      status = self._getstatus_fn(dep_job_id)
1410 b95479a5 Michael Hanselmann
    except errors.JobLost, err:
1411 b95479a5 Michael Hanselmann
      return (self.ERROR, "Dependency error: %s" % err)
1412 b95479a5 Michael Hanselmann
1413 b95479a5 Michael Hanselmann
    assert status in constants.JOB_STATUS_ALL
1414 b95479a5 Michael Hanselmann
1415 b95479a5 Michael Hanselmann
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1416 b95479a5 Michael Hanselmann
1417 b95479a5 Michael Hanselmann
    if status not in constants.JOBS_FINALIZED:
1418 b95479a5 Michael Hanselmann
      # Register for notification and wait for job to finish
1419 b95479a5 Michael Hanselmann
      job_id_waiters.add(job)
1420 b95479a5 Michael Hanselmann
      return (self.WAIT,
1421 b95479a5 Michael Hanselmann
              "Need to wait for job %s, wanted status '%s'" %
1422 b95479a5 Michael Hanselmann
              (dep_job_id, dep_status))
1423 b95479a5 Michael Hanselmann
1424 b95479a5 Michael Hanselmann
    # Remove from waiters list
1425 b95479a5 Michael Hanselmann
    if job in job_id_waiters:
1426 b95479a5 Michael Hanselmann
      job_id_waiters.remove(job)
1427 b95479a5 Michael Hanselmann
1428 b95479a5 Michael Hanselmann
    if (status == constants.JOB_STATUS_CANCELED and
1429 b95479a5 Michael Hanselmann
        constants.JOB_STATUS_CANCELED not in dep_status):
1430 b95479a5 Michael Hanselmann
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1431 b95479a5 Michael Hanselmann
1432 b95479a5 Michael Hanselmann
    elif not dep_status or status in dep_status:
1433 b95479a5 Michael Hanselmann
      return (self.CONTINUE,
1434 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s'" %
1435 b95479a5 Michael Hanselmann
              (dep_job_id, status))
1436 b95479a5 Michael Hanselmann
1437 b95479a5 Michael Hanselmann
    else:
1438 b95479a5 Michael Hanselmann
      return (self.WRONGSTATUS,
1439 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s',"
1440 b95479a5 Michael Hanselmann
              " not one of '%s' as required" %
1441 b95479a5 Michael Hanselmann
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1442 b95479a5 Michael Hanselmann
1443 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1444 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
1445 b95479a5 Michael Hanselmann
    """Notifies all jobs waiting for a certain job ID.
1446 b95479a5 Michael Hanselmann

1447 b95479a5 Michael Hanselmann
    @type job_id: string
1448 b95479a5 Michael Hanselmann
    @param job_id: Job ID
1449 b95479a5 Michael Hanselmann

1450 b95479a5 Michael Hanselmann
    """
1451 b95479a5 Michael Hanselmann
    assert ht.TString(job_id)
1452 b95479a5 Michael Hanselmann
1453 b95479a5 Michael Hanselmann
    jobs = self._waiters.pop(job_id, None)
1454 b95479a5 Michael Hanselmann
    if jobs:
1455 b95479a5 Michael Hanselmann
      # Re-add jobs to workerpool
1456 b95479a5 Michael Hanselmann
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1457 b95479a5 Michael Hanselmann
                    len(jobs), job_id)
1458 b95479a5 Michael Hanselmann
      self._enqueue_fn(jobs)
1459 b95479a5 Michael Hanselmann
1460 b95479a5 Michael Hanselmann
    # Remove all jobs without actual waiters
1461 b95479a5 Michael Hanselmann
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1462 b95479a5 Michael Hanselmann
                   if not waiters]:
1463 b95479a5 Michael Hanselmann
      del self._waiters[job_id]
1464 b95479a5 Michael Hanselmann
1465 b95479a5 Michael Hanselmann
1466 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
1467 6c881c52 Iustin Pop
  """Decorator for "public" functions.
1468 ea03467c Iustin Pop

1469 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
1470 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
1471 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
1472 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
1473 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
1474 ea03467c Iustin Pop

1475 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
1476 f1da30e6 Michael Hanselmann

1477 6c881c52 Iustin Pop
  Example::
1478 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
1479 6c881c52 Iustin Pop
    @_RequireOpenQueue
1480 6c881c52 Iustin Pop
    def Example(self):
1481 6c881c52 Iustin Pop
      pass
1482 db37da70 Michael Hanselmann

1483 6c881c52 Iustin Pop
  """
1484 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
1485 7260cfbe Iustin Pop
    # pylint: disable-msg=W0212
1486 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
1487 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
1488 6c881c52 Iustin Pop
  return wrapper
1489 db37da70 Michael Hanselmann
1490 db37da70 Michael Hanselmann
1491 6c881c52 Iustin Pop
class JobQueue(object):
1492 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
1493 db37da70 Michael Hanselmann

1494 6c881c52 Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1495 6c881c52 Iustin Pop

1496 6c881c52 Iustin Pop
  """
1497 6c881c52 Iustin Pop
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1498 db37da70 Michael Hanselmann
1499 85f03e0d Michael Hanselmann
  def __init__(self, context):
1500 ea03467c Iustin Pop
    """Constructor for JobQueue.
1501 ea03467c Iustin Pop

1502 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
1503 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
1504 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
1505 ea03467c Iustin Pop
    running).
1506 ea03467c Iustin Pop

1507 ea03467c Iustin Pop
    @type context: GanetiContext
1508 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
1509 ea03467c Iustin Pop
        data and other ganeti objects
1510 ea03467c Iustin Pop

1511 ea03467c Iustin Pop
    """
1512 5bdce580 Michael Hanselmann
    self.context = context
1513 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
1514 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
1515 f1da30e6 Michael Hanselmann
1516 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
1517 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
1518 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
1519 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
1520 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
1521 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
1522 ebb80afa Guido Trotter
1523 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
1524 ebb80afa Guido Trotter
    self.release = self._lock.release
1525 85f03e0d Michael Hanselmann
1526 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
1527 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
1528 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1529 f1da30e6 Michael Hanselmann
1530 04ab05ce Michael Hanselmann
    # Read serial file
1531 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
1532 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
1533 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
1534 c4beba1c Iustin Pop
1535 23752136 Michael Hanselmann
    # Get initial list of nodes
1536 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
1537 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
1538 59303563 Iustin Pop
                       if n.master_candidate)
1539 8e00939c Michael Hanselmann
1540 8e00939c Michael Hanselmann
    # Remove master node
1541 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
1542 23752136 Michael Hanselmann
1543 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
1544 23752136 Michael Hanselmann
1545 20571a26 Guido Trotter
    self._queue_size = 0
1546 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1547 ff699aa9 Michael Hanselmann
    self._drained = jstore.CheckDrainFlag()
1548 20571a26 Guido Trotter
1549 b95479a5 Michael Hanselmann
    # Job dependencies
1550 b95479a5 Michael Hanselmann
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1551 b95479a5 Michael Hanselmann
                                        self._EnqueueJobs)
1552 fcb21ad7 Michael Hanselmann
    self.context.glm.AddToLockMonitor(self.depmgr)
1553 b95479a5 Michael Hanselmann
1554 85f03e0d Michael Hanselmann
    # Setup worker pool
1555 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
1556 85f03e0d Michael Hanselmann
    try:
1557 de9d02c7 Michael Hanselmann
      self._InspectQueue()
1558 de9d02c7 Michael Hanselmann
    except:
1559 de9d02c7 Michael Hanselmann
      self._wpool.TerminateWorkers()
1560 de9d02c7 Michael Hanselmann
      raise
1561 711b5124 Michael Hanselmann
1562 de9d02c7 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1563 de9d02c7 Michael Hanselmann
  @_RequireOpenQueue
1564 de9d02c7 Michael Hanselmann
  def _InspectQueue(self):
1565 de9d02c7 Michael Hanselmann
    """Loads the whole job queue and resumes unfinished jobs.
1566 de9d02c7 Michael Hanselmann

1567 de9d02c7 Michael Hanselmann
    This function needs the lock here because WorkerPool.AddTask() may start a
1568 de9d02c7 Michael Hanselmann
    job while we're still doing our work.
1569 711b5124 Michael Hanselmann

1570 de9d02c7 Michael Hanselmann
    """
1571 de9d02c7 Michael Hanselmann
    logging.info("Inspecting job queue")
1572 de9d02c7 Michael Hanselmann
1573 7b5c4a69 Michael Hanselmann
    restartjobs = []
1574 7b5c4a69 Michael Hanselmann
1575 de9d02c7 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked()
1576 de9d02c7 Michael Hanselmann
    jobs_count = len(all_job_ids)
1577 de9d02c7 Michael Hanselmann
    lastinfo = time.time()
1578 de9d02c7 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1579 de9d02c7 Michael Hanselmann
      # Give an update every 1000 jobs or 10 seconds
1580 de9d02c7 Michael Hanselmann
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1581 de9d02c7 Michael Hanselmann
          idx == (jobs_count - 1)):
1582 de9d02c7 Michael Hanselmann
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1583 de9d02c7 Michael Hanselmann
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1584 711b5124 Michael Hanselmann
        lastinfo = time.time()
1585 94ed59a5 Iustin Pop
1586 de9d02c7 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1587 85f03e0d Michael Hanselmann
1588 de9d02c7 Michael Hanselmann
      # a failure in loading the job can cause 'None' to be returned
1589 de9d02c7 Michael Hanselmann
      if job is None:
1590 de9d02c7 Michael Hanselmann
        continue
1591 85f03e0d Michael Hanselmann
1592 de9d02c7 Michael Hanselmann
      status = job.CalcStatus()
1593 711b5124 Michael Hanselmann
1594 320d1daf Michael Hanselmann
      if status == constants.JOB_STATUS_QUEUED:
1595 7b5c4a69 Michael Hanselmann
        restartjobs.append(job)
1596 de9d02c7 Michael Hanselmann
1597 de9d02c7 Michael Hanselmann
      elif status in (constants.JOB_STATUS_RUNNING,
1598 47099cd1 Michael Hanselmann
                      constants.JOB_STATUS_WAITING,
1599 de9d02c7 Michael Hanselmann
                      constants.JOB_STATUS_CANCELING):
1600 de9d02c7 Michael Hanselmann
        logging.warning("Unfinished job %s found: %s", job.id, job)
1601 320d1daf Michael Hanselmann
1602 47099cd1 Michael Hanselmann
        if status == constants.JOB_STATUS_WAITING:
1603 320d1daf Michael Hanselmann
          # Restart job
1604 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1605 320d1daf Michael Hanselmann
          restartjobs.append(job)
1606 320d1daf Michael Hanselmann
        else:
1607 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1608 320d1daf Michael Hanselmann
                                "Unclean master daemon shutdown")
1609 45df0793 Michael Hanselmann
          job.Finalize()
1610 320d1daf Michael Hanselmann
1611 de9d02c7 Michael Hanselmann
        self.UpdateJobUnlocked(job)
1612 de9d02c7 Michael Hanselmann
1613 7b5c4a69 Michael Hanselmann
    if restartjobs:
1614 7b5c4a69 Michael Hanselmann
      logging.info("Restarting %s jobs", len(restartjobs))
1615 75d81fc8 Michael Hanselmann
      self._EnqueueJobsUnlocked(restartjobs)
1616 7b5c4a69 Michael Hanselmann
1617 de9d02c7 Michael Hanselmann
    logging.info("Job queue inspection finished")
1618 85f03e0d Michael Hanselmann
1619 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1620 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1621 99aabbed Iustin Pop
  def AddNode(self, node):
1622 99aabbed Iustin Pop
    """Register a new node with the queue.
1623 99aabbed Iustin Pop

1624 99aabbed Iustin Pop
    @type node: L{objects.Node}
1625 99aabbed Iustin Pop
    @param node: the node object to be added
1626 99aabbed Iustin Pop

1627 99aabbed Iustin Pop
    """
1628 99aabbed Iustin Pop
    node_name = node.name
1629 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1630 23752136 Michael Hanselmann
1631 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1632 c8457ce7 Iustin Pop
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1633 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1634 c8457ce7 Iustin Pop
    if msg:
1635 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1636 c8457ce7 Iustin Pop
                      node_name, msg)
1637 23752136 Michael Hanselmann
1638 59303563 Iustin Pop
    if not node.master_candidate:
1639 59303563 Iustin Pop
      # remove if existing, ignoring errors
1640 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1641 59303563 Iustin Pop
      # and skip the replication of the job ids
1642 59303563 Iustin Pop
      return
1643 59303563 Iustin Pop
1644 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1645 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1646 23752136 Michael Hanselmann
1647 d2e03a33 Michael Hanselmann
    # Upload current serial file
1648 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1649 d2e03a33 Michael Hanselmann
1650 d2e03a33 Michael Hanselmann
    for file_name in files:
1651 9f774ee8 Michael Hanselmann
      # Read file content
1652 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1653 9f774ee8 Michael Hanselmann
1654 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1655 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
1656 a3811745 Michael Hanselmann
                                                  file_name, content)
1657 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1658 c8457ce7 Iustin Pop
      if msg:
1659 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1660 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1661 d2e03a33 Michael Hanselmann
1662 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1663 d2e03a33 Michael Hanselmann
1664 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1665 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1666 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1667 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1668 ea03467c Iustin Pop

1669 ea03467c Iustin Pop
    @type node_name: str
1670 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1671 ea03467c Iustin Pop

1672 ea03467c Iustin Pop
    """
1673 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1674 23752136 Michael Hanselmann
1675 7e950d31 Iustin Pop
  @staticmethod
1676 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1677 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1678 ea03467c Iustin Pop

1679 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1680 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1681 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1682 ea03467c Iustin Pop

1683 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1684 ea03467c Iustin Pop
    @type nodes: list
1685 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1686 ea03467c Iustin Pop
    @type failmsg: str
1687 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1688 ea03467c Iustin Pop

1689 ea03467c Iustin Pop
    """
1690 e74798c1 Michael Hanselmann
    failed = []
1691 e74798c1 Michael Hanselmann
    success = []
1692 e74798c1 Michael Hanselmann
1693 e74798c1 Michael Hanselmann
    for node in nodes:
1694 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1695 c8457ce7 Iustin Pop
      if msg:
1696 e74798c1 Michael Hanselmann
        failed.append(node)
1697 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1698 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1699 c8457ce7 Iustin Pop
      else:
1700 c8457ce7 Iustin Pop
        success.append(node)
1701 e74798c1 Michael Hanselmann
1702 e74798c1 Michael Hanselmann
    # +1 for the master node
1703 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1704 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1705 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1706 e74798c1 Michael Hanselmann
1707 99aabbed Iustin Pop
  def _GetNodeIp(self):
1708 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1709 99aabbed Iustin Pop

1710 ea03467c Iustin Pop
    @rtype: (list, list)
1711 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1712 ea03467c Iustin Pop
        names and the second one with the node addresses
1713 ea03467c Iustin Pop

1714 99aabbed Iustin Pop
    """
1715 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1716 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1717 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1718 99aabbed Iustin Pop
    return name_list, addr_list
1719 99aabbed Iustin Pop
1720 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1721 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1722 8e00939c Michael Hanselmann

1723 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1724 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1725 ea03467c Iustin Pop

1726 ea03467c Iustin Pop
    @type file_name: str
1727 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1728 ea03467c Iustin Pop
    @type data: str
1729 ea03467c Iustin Pop
    @param data: the new contents of the file
1730 4c36bdf5 Guido Trotter
    @type replicate: boolean
1731 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1732 ea03467c Iustin Pop

1733 8e00939c Michael Hanselmann
    """
1734 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1735 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1736 82b22e19 René Nussbaumer
                    gid=getents.masterd_gid)
1737 8e00939c Michael Hanselmann
1738 4c36bdf5 Guido Trotter
    if replicate:
1739 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1740 4c36bdf5 Guido Trotter
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1741 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1742 23752136 Michael Hanselmann
1743 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1744 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1745 ea03467c Iustin Pop

1746 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1747 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1748 ea03467c Iustin Pop

1749 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1750 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1751 ea03467c Iustin Pop

1752 ea03467c Iustin Pop
    """
1753 dd875d32 Michael Hanselmann
    # Rename them locally
1754 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1755 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1756 abc1f2ce Michael Hanselmann
1757 dd875d32 Michael Hanselmann
    # ... and on all nodes
1758 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1759 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1760 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1761 abc1f2ce Michael Hanselmann
1762 7e950d31 Iustin Pop
  @staticmethod
1763 7e950d31 Iustin Pop
  def _FormatJobID(job_id):
1764 ea03467c Iustin Pop
    """Convert a job ID to string format.
1765 ea03467c Iustin Pop

1766 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
1767 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
1768 ea03467c Iustin Pop
    abstract this change.
1769 ea03467c Iustin Pop

1770 ea03467c Iustin Pop
    @type job_id: int or long
1771 ea03467c Iustin Pop
    @param job_id: the numeric job id
1772 ea03467c Iustin Pop
    @rtype: str
1773 ea03467c Iustin Pop
    @return: the formatted job id
1774 ea03467c Iustin Pop

1775 ea03467c Iustin Pop
    """
1776 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
1777 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1778 85f03e0d Michael Hanselmann
    if job_id < 0:
1779 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1780 85f03e0d Michael Hanselmann
1781 85f03e0d Michael Hanselmann
    return str(job_id)
1782 85f03e0d Michael Hanselmann
1783 58b22b6e Michael Hanselmann
  @classmethod
1784 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
1785 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
1786 58b22b6e Michael Hanselmann

1787 58b22b6e Michael Hanselmann
    @type job_id: str
1788 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
1789 58b22b6e Michael Hanselmann
    @rtype: str
1790 58b22b6e Michael Hanselmann
    @return: Directory name
1791 58b22b6e Michael Hanselmann

1792 58b22b6e Michael Hanselmann
    """
1793 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1794 58b22b6e Michael Hanselmann
1795 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1796 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1797 f1da30e6 Michael Hanselmann

1798 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1799 f1da30e6 Michael Hanselmann

1800 009e73d0 Iustin Pop
    @type count: integer
1801 009e73d0 Iustin Pop
    @param count: how many serials to return
1802 ea03467c Iustin Pop
    @rtype: str
1803 ea03467c Iustin Pop
    @return: a string representing the job identifier.
1804 f1da30e6 Michael Hanselmann

1805 f1da30e6 Michael Hanselmann
    """
1806 009e73d0 Iustin Pop
    assert count > 0
1807 f1da30e6 Michael Hanselmann
    # New number
1808 009e73d0 Iustin Pop
    serial = self._last_serial + count
1809 f1da30e6 Michael Hanselmann
1810 f1da30e6 Michael Hanselmann
    # Write to file
1811 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1812 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1813 f1da30e6 Michael Hanselmann
1814 009e73d0 Iustin Pop
    result = [self._FormatJobID(v)
1815 3c88bf36 Michael Hanselmann
              for v in range(self._last_serial + 1, serial + 1)]
1816 3c88bf36 Michael Hanselmann
1817 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1818 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1819 f1da30e6 Michael Hanselmann
1820 3c88bf36 Michael Hanselmann
    assert len(result) == count
1821 3c88bf36 Michael Hanselmann
1822 009e73d0 Iustin Pop
    return result
1823 f1da30e6 Michael Hanselmann
1824 85f03e0d Michael Hanselmann
  @staticmethod
1825 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1826 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1827 ea03467c Iustin Pop

1828 ea03467c Iustin Pop
    @type job_id: str
1829 ea03467c Iustin Pop
    @param job_id: the job identifier
1830 ea03467c Iustin Pop
    @rtype: str
1831 ea03467c Iustin Pop
    @return: the path to the job file
1832 ea03467c Iustin Pop

1833 ea03467c Iustin Pop
    """
1834 c4feafe8 Iustin Pop
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1835 f1da30e6 Michael Hanselmann
1836 58b22b6e Michael Hanselmann
  @classmethod
1837 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
1838 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1839 ea03467c Iustin Pop

1840 ea03467c Iustin Pop
    @type job_id: str
1841 ea03467c Iustin Pop
    @param job_id: the job identifier
1842 ea03467c Iustin Pop
    @rtype: str
1843 ea03467c Iustin Pop
    @return: the path to the archived job file
1844 ea03467c Iustin Pop

1845 ea03467c Iustin Pop
    """
1846 0411c011 Iustin Pop
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1847 0411c011 Iustin Pop
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1848 0cb94105 Michael Hanselmann
1849 85a1c57d Guido Trotter
  def _GetJobIDsUnlocked(self, sort=True):
1850 911a495b Iustin Pop
    """Return all known job IDs.
1851 911a495b Iustin Pop

1852 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1853 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1854 ac0930b9 Iustin Pop
    extra IDs).
1855 ac0930b9 Iustin Pop

1856 85a1c57d Guido Trotter
    @type sort: boolean
1857 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1858 ea03467c Iustin Pop
    @rtype: list
1859 ea03467c Iustin Pop
    @return: the list of job IDs
1860 ea03467c Iustin Pop

1861 911a495b Iustin Pop
    """
1862 85a1c57d Guido Trotter
    jlist = []
1863 b5b8309d Guido Trotter
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1864 85a1c57d Guido Trotter
      m = self._RE_JOB_FILE.match(filename)
1865 85a1c57d Guido Trotter
      if m:
1866 85a1c57d Guido Trotter
        jlist.append(m.group(1))
1867 85a1c57d Guido Trotter
    if sort:
1868 85a1c57d Guido Trotter
      jlist = utils.NiceSort(jlist)
1869 f0d874fe Iustin Pop
    return jlist
1870 911a495b Iustin Pop
1871 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1872 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1873 ea03467c Iustin Pop

1874 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1875 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1876 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1877 ea03467c Iustin Pop

1878 ea03467c Iustin Pop
    @param job_id: the job id
1879 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1880 ea03467c Iustin Pop
    @return: either None or the job object
1881 ea03467c Iustin Pop

1882 ea03467c Iustin Pop
    """
1883 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
1884 5685c1a5 Michael Hanselmann
    if job:
1885 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
1886 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Found read-only job in memcache"
1887 5685c1a5 Michael Hanselmann
      return job
1888 ac0930b9 Iustin Pop
1889 3d6c5566 Guido Trotter
    try:
1890 194c8ca4 Michael Hanselmann
      job = self._LoadJobFromDisk(job_id, False)
1891 aa9f8167 Iustin Pop
      if job is None:
1892 aa9f8167 Iustin Pop
        return job
1893 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
1894 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
1895 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
1896 3d6c5566 Guido Trotter
      if old_path == new_path:
1897 3d6c5566 Guido Trotter
        # job already archived (future case)
1898 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
1899 3d6c5566 Guido Trotter
      else:
1900 3d6c5566 Guido Trotter
        # non-archived case
1901 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
1902 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
1903 3d6c5566 Guido Trotter
      return None
1904 162c8636 Guido Trotter
1905 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Job just loaded is not writable"
1906 c0f6d0d8 Michael Hanselmann
1907 162c8636 Guido Trotter
    self._memcache[job_id] = job
1908 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
1909 162c8636 Guido Trotter
    return job
1910 162c8636 Guido Trotter
1911 c0f6d0d8 Michael Hanselmann
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1912 162c8636 Guido Trotter
    """Load the given job file from disk.
1913 162c8636 Guido Trotter

1914 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1915 162c8636 Guido Trotter

1916 162c8636 Guido Trotter
    @type job_id: string
1917 162c8636 Guido Trotter
    @param job_id: job identifier
1918 194c8ca4 Michael Hanselmann
    @type try_archived: bool
1919 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
1920 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
1921 162c8636 Guido Trotter
    @return: either None or the job object
1922 162c8636 Guido Trotter

1923 162c8636 Guido Trotter
    """
1924 c0f6d0d8 Michael Hanselmann
    path_functions = [(self._GetJobPath, True)]
1925 194c8ca4 Michael Hanselmann
1926 194c8ca4 Michael Hanselmann
    if try_archived:
1927 c0f6d0d8 Michael Hanselmann
      path_functions.append((self._GetArchivedJobPath, False))
1928 194c8ca4 Michael Hanselmann
1929 194c8ca4 Michael Hanselmann
    raw_data = None
1930 c0f6d0d8 Michael Hanselmann
    writable_default = None
1931 194c8ca4 Michael Hanselmann
1932 c0f6d0d8 Michael Hanselmann
    for (fn, writable_default) in path_functions:
1933 194c8ca4 Michael Hanselmann
      filepath = fn(job_id)
1934 194c8ca4 Michael Hanselmann
      logging.debug("Loading job from %s", filepath)
1935 194c8ca4 Michael Hanselmann
      try:
1936 194c8ca4 Michael Hanselmann
        raw_data = utils.ReadFile(filepath)
1937 194c8ca4 Michael Hanselmann
      except EnvironmentError, err:
1938 194c8ca4 Michael Hanselmann
        if err.errno != errno.ENOENT:
1939 194c8ca4 Michael Hanselmann
          raise
1940 194c8ca4 Michael Hanselmann
      else:
1941 194c8ca4 Michael Hanselmann
        break
1942 194c8ca4 Michael Hanselmann
1943 194c8ca4 Michael Hanselmann
    if not raw_data:
1944 194c8ca4 Michael Hanselmann
      return None
1945 13998ef2 Michael Hanselmann
1946 c0f6d0d8 Michael Hanselmann
    if writable is None:
1947 c0f6d0d8 Michael Hanselmann
      writable = writable_default
1948 c0f6d0d8 Michael Hanselmann
1949 94ed59a5 Iustin Pop
    try:
1950 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
1951 c0f6d0d8 Michael Hanselmann
      job = _QueuedJob.Restore(self, data, writable)
1952 7260cfbe Iustin Pop
    except Exception, err: # pylint: disable-msg=W0703
1953 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
1954 94ed59a5 Iustin Pop
1955 ac0930b9 Iustin Pop
    return job
1956 f1da30e6 Michael Hanselmann
1957 c0f6d0d8 Michael Hanselmann
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1958 0f9c08dc Guido Trotter
    """Load the given job file from disk.
1959 0f9c08dc Guido Trotter

1960 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1961 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
1962 0f9c08dc Guido Trotter
    exception is logged.
1963 0f9c08dc Guido Trotter

1964 0f9c08dc Guido Trotter
    @type job_id: string
1965 0f9c08dc Guido Trotter
    @param job_id: job identifier
1966 194c8ca4 Michael Hanselmann
    @type try_archived: bool
1967 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
1968 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
1969 0f9c08dc Guido Trotter
    @return: either None or the job object
1970 0f9c08dc Guido Trotter

1971 0f9c08dc Guido Trotter
    """
1972 0f9c08dc Guido Trotter
    try:
1973 c0f6d0d8 Michael Hanselmann
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1974 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
1975 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
1976 0f9c08dc Guido Trotter
      return None
1977 0f9c08dc Guido Trotter
1978 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
1979 20571a26 Guido Trotter
    """Update the queue size.
1980 20571a26 Guido Trotter

1981 20571a26 Guido Trotter
    """
1982 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1983 20571a26 Guido Trotter
1984 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1985 20571a26 Guido Trotter
  @_RequireOpenQueue
1986 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
1987 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
1988 3ccafd0e Iustin Pop

1989 ea03467c Iustin Pop
    @type drain_flag: boolean
1990 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
1991 ea03467c Iustin Pop

1992 3ccafd0e Iustin Pop
    """
1993 ff699aa9 Michael Hanselmann
    jstore.SetDrainFlag(drain_flag)
1994 20571a26 Guido Trotter
1995 20571a26 Guido Trotter
    self._drained = drain_flag
1996 20571a26 Guido Trotter
1997 3ccafd0e Iustin Pop
    return True
1998 3ccafd0e Iustin Pop
1999 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2000 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
2001 85f03e0d Michael Hanselmann
    """Create and store a new job.
2002 f1da30e6 Michael Hanselmann

2003 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
2004 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
2005 c3f0a12f Iustin Pop

2006 009e73d0 Iustin Pop
    @type job_id: job ID
2007 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
2008 c3f0a12f Iustin Pop
    @type ops: list
2009 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
2010 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
2011 7beb1e53 Guido Trotter
    @return: the job object to be queued
2012 7beb1e53 Guido Trotter
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
2013 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2014 e71c8147 Michael Hanselmann
    @raise errors.GenericError: If an opcode is not valid
2015 c3f0a12f Iustin Pop

2016 c3f0a12f Iustin Pop
    """
2017 20571a26 Guido Trotter
    # Ok when sharing the big job queue lock, as the drain file is created when
2018 20571a26 Guido Trotter
    # the lock is exclusive.
2019 20571a26 Guido Trotter
    if self._drained:
2020 2971c913 Iustin Pop
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
2021 f87b405e Michael Hanselmann
2022 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2023 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
2024 f87b405e Michael Hanselmann
2025 c0f6d0d8 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops, True)
2026 f1da30e6 Michael Hanselmann
2027 e71c8147 Michael Hanselmann
    # Check priority
2028 e71c8147 Michael Hanselmann
    for idx, op in enumerate(job.ops):
2029 e71c8147 Michael Hanselmann
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2030 e71c8147 Michael Hanselmann
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2031 e71c8147 Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2032 e71c8147 Michael Hanselmann
                                  " are %s" % (idx, op.priority, allowed))
2033 e71c8147 Michael Hanselmann
2034 b247c6fc Michael Hanselmann
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2035 b247c6fc Michael Hanselmann
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2036 b247c6fc Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2037 b247c6fc Michael Hanselmann
                                  " match %s: %s" %
2038 b247c6fc Michael Hanselmann
                                  (idx, opcodes.TNoRelativeJobDependencies,
2039 b247c6fc Michael Hanselmann
                                   dependencies))
2040 b247c6fc Michael Hanselmann
2041 f1da30e6 Michael Hanselmann
    # Write to disk
2042 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
2043 f1da30e6 Michael Hanselmann
2044 20571a26 Guido Trotter
    self._queue_size += 1
2045 20571a26 Guido Trotter
2046 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
2047 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
2048 ac0930b9 Iustin Pop
2049 7beb1e53 Guido Trotter
    return job
2050 f1da30e6 Michael Hanselmann
2051 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2052 2971c913 Iustin Pop
  @_RequireOpenQueue
2053 2971c913 Iustin Pop
  def SubmitJob(self, ops):
2054 2971c913 Iustin Pop
    """Create and store a new job.
2055 2971c913 Iustin Pop

2056 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2057 2971c913 Iustin Pop

2058 2971c913 Iustin Pop
    """
2059 b247c6fc Michael Hanselmann
    (job_id, ) = self._NewSerialsUnlocked(1)
2060 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2061 7beb1e53 Guido Trotter
    return job_id
2062 2971c913 Iustin Pop
2063 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2064 2971c913 Iustin Pop
  @_RequireOpenQueue
2065 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
2066 2971c913 Iustin Pop
    """Create and store multiple jobs.
2067 2971c913 Iustin Pop

2068 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2069 2971c913 Iustin Pop

2070 2971c913 Iustin Pop
    """
2071 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2072 b247c6fc Michael Hanselmann
2073 b247c6fc Michael Hanselmann
    (results, added_jobs) = \
2074 b247c6fc Michael Hanselmann
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2075 7b5c4a69 Michael Hanselmann
2076 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked(added_jobs)
2077 2971c913 Iustin Pop
2078 2971c913 Iustin Pop
    return results
2079 2971c913 Iustin Pop
2080 b247c6fc Michael Hanselmann
  @staticmethod
2081 b247c6fc Michael Hanselmann
  def _FormatSubmitError(msg, ops):
2082 b247c6fc Michael Hanselmann
    """Formats errors which occurred while submitting a job.
2083 b247c6fc Michael Hanselmann

2084 b247c6fc Michael Hanselmann
    """
2085 b247c6fc Michael Hanselmann
    return ("%s; opcodes %s" %
2086 b247c6fc Michael Hanselmann
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2087 b247c6fc Michael Hanselmann
2088 b247c6fc Michael Hanselmann
  @staticmethod
2089 b247c6fc Michael Hanselmann
  def _ResolveJobDependencies(resolve_fn, deps):
2090 b247c6fc Michael Hanselmann
    """Resolves relative job IDs in dependencies.
2091 b247c6fc Michael Hanselmann

2092 b247c6fc Michael Hanselmann
    @type resolve_fn: callable
2093 b247c6fc Michael Hanselmann
    @param resolve_fn: Function to resolve a relative job ID
2094 b247c6fc Michael Hanselmann
    @type deps: list
2095 b247c6fc Michael Hanselmann
    @param deps: Dependencies
2096 b247c6fc Michael Hanselmann
    @rtype: list
2097 b247c6fc Michael Hanselmann
    @return: Resolved dependencies
2098 b247c6fc Michael Hanselmann

2099 b247c6fc Michael Hanselmann
    """
2100 b247c6fc Michael Hanselmann
    result = []
2101 b247c6fc Michael Hanselmann
2102 b247c6fc Michael Hanselmann
    for (dep_job_id, dep_status) in deps:
2103 b247c6fc Michael Hanselmann
      if ht.TRelativeJobId(dep_job_id):
2104 b247c6fc Michael Hanselmann
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2105 b247c6fc Michael Hanselmann
        try:
2106 b247c6fc Michael Hanselmann
          job_id = resolve_fn(dep_job_id)
2107 b247c6fc Michael Hanselmann
        except IndexError:
2108 b247c6fc Michael Hanselmann
          # Abort
2109 b247c6fc Michael Hanselmann
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2110 b247c6fc Michael Hanselmann
      else:
2111 b247c6fc Michael Hanselmann
        job_id = dep_job_id
2112 b247c6fc Michael Hanselmann
2113 b247c6fc Michael Hanselmann
      result.append((job_id, dep_status))
2114 b247c6fc Michael Hanselmann
2115 b247c6fc Michael Hanselmann
    return (True, result)
2116 b247c6fc Michael Hanselmann
2117 b247c6fc Michael Hanselmann
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2118 b247c6fc Michael Hanselmann
    """Create and store multiple jobs.
2119 b247c6fc Michael Hanselmann

2120 b247c6fc Michael Hanselmann
    @see: L{_SubmitJobUnlocked}
2121 b247c6fc Michael Hanselmann

2122 b247c6fc Michael Hanselmann
    """
2123 b247c6fc Michael Hanselmann
    results = []
2124 b247c6fc Michael Hanselmann
    added_jobs = []
2125 b247c6fc Michael Hanselmann
2126 b247c6fc Michael Hanselmann
    def resolve_fn(job_idx, reljobid):
2127 b247c6fc Michael Hanselmann
      assert reljobid < 0
2128 b247c6fc Michael Hanselmann
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2129 b247c6fc Michael Hanselmann
2130 b247c6fc Michael Hanselmann
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2131 b247c6fc Michael Hanselmann
      for op in ops:
2132 b247c6fc Michael Hanselmann
        if getattr(op, opcodes.DEPEND_ATTR, None):
2133 b247c6fc Michael Hanselmann
          (status, data) = \
2134 b247c6fc Michael Hanselmann
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2135 b247c6fc Michael Hanselmann
                                         op.depends)
2136 b247c6fc Michael Hanselmann
          if not status:
2137 b247c6fc Michael Hanselmann
            # Abort resolving dependencies
2138 b247c6fc Michael Hanselmann
            assert ht.TNonEmptyString(data), "No error message"
2139 b247c6fc Michael Hanselmann
            break
2140 b247c6fc Michael Hanselmann
          # Use resolved dependencies
2141 b247c6fc Michael Hanselmann
          op.depends = data
2142 b247c6fc Michael Hanselmann
      else:
2143 b247c6fc Michael Hanselmann
        try:
2144 b247c6fc Michael Hanselmann
          job = self._SubmitJobUnlocked(job_id, ops)
2145 b247c6fc Michael Hanselmann
        except errors.GenericError, err:
2146 b247c6fc Michael Hanselmann
          status = False
2147 b247c6fc Michael Hanselmann
          data = self._FormatSubmitError(str(err), ops)
2148 b247c6fc Michael Hanselmann
        else:
2149 b247c6fc Michael Hanselmann
          status = True
2150 b247c6fc Michael Hanselmann
          data = job_id
2151 b247c6fc Michael Hanselmann
          added_jobs.append(job)
2152 b247c6fc Michael Hanselmann
2153 b247c6fc Michael Hanselmann
      results.append((status, data))
2154 b247c6fc Michael Hanselmann
2155 b247c6fc Michael Hanselmann
    return (results, added_jobs)
2156 b247c6fc Michael Hanselmann
2157 75d81fc8 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2158 7b5c4a69 Michael Hanselmann
  def _EnqueueJobs(self, jobs):
2159 7b5c4a69 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2160 7b5c4a69 Michael Hanselmann

2161 7b5c4a69 Michael Hanselmann
    @type jobs: list
2162 7b5c4a69 Michael Hanselmann
    @param jobs: List of all jobs
2163 7b5c4a69 Michael Hanselmann

2164 7b5c4a69 Michael Hanselmann
    """
2165 75d81fc8 Michael Hanselmann
    return self._EnqueueJobsUnlocked(jobs)
2166 75d81fc8 Michael Hanselmann
2167 75d81fc8 Michael Hanselmann
  def _EnqueueJobsUnlocked(self, jobs):
2168 75d81fc8 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2169 75d81fc8 Michael Hanselmann

2170 75d81fc8 Michael Hanselmann
    @type jobs: list
2171 75d81fc8 Michael Hanselmann
    @param jobs: List of all jobs
2172 75d81fc8 Michael Hanselmann

2173 75d81fc8 Michael Hanselmann
    """
2174 75d81fc8 Michael Hanselmann
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2175 7b5c4a69 Michael Hanselmann
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2176 7b5c4a69 Michael Hanselmann
                             priority=[job.CalcPriority() for job in jobs])
2177 7b5c4a69 Michael Hanselmann
2178 b95479a5 Michael Hanselmann
  def _GetJobStatusForDependencies(self, job_id):
2179 b95479a5 Michael Hanselmann
    """Gets the status of a job for dependencies.
2180 b95479a5 Michael Hanselmann

2181 b95479a5 Michael Hanselmann
    @type job_id: string
2182 b95479a5 Michael Hanselmann
    @param job_id: Job ID
2183 b95479a5 Michael Hanselmann
    @raise errors.JobLost: If job can't be found
2184 b95479a5 Michael Hanselmann

2185 b95479a5 Michael Hanselmann
    """
2186 b95479a5 Michael Hanselmann
    if not isinstance(job_id, basestring):
2187 b95479a5 Michael Hanselmann
      job_id = self._FormatJobID(job_id)
2188 b95479a5 Michael Hanselmann
2189 b95479a5 Michael Hanselmann
    # Not using in-memory cache as doing so would require an exclusive lock
2190 b95479a5 Michael Hanselmann
2191 b95479a5 Michael Hanselmann
    # Try to load from disk
2192 c0f6d0d8 Michael Hanselmann
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2193 c0f6d0d8 Michael Hanselmann
2194 c0f6d0d8 Michael Hanselmann
    assert not job.writable, "Got writable job"
2195 b95479a5 Michael Hanselmann
2196 b95479a5 Michael Hanselmann
    if job:
2197 b95479a5 Michael Hanselmann
      return job.CalcStatus()
2198 b95479a5 Michael Hanselmann
2199 b95479a5 Michael Hanselmann
    raise errors.JobLost("Job %s not found" % job_id)
2200 b95479a5 Michael Hanselmann
2201 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2202 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
2203 ea03467c Iustin Pop
    """Update a job's on disk storage.
2204 ea03467c Iustin Pop

2205 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
2206 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
2207 ea03467c Iustin Pop
    nodes.
2208 ea03467c Iustin Pop

2209 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
2210 ea03467c Iustin Pop
    @param job: the changed job
2211 4c36bdf5 Guido Trotter
    @type replicate: boolean
2212 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
2213 ea03467c Iustin Pop

2214 ea03467c Iustin Pop
    """
2215 66bd7445 Michael Hanselmann
    if __debug__:
2216 66bd7445 Michael Hanselmann
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2217 66bd7445 Michael Hanselmann
      assert (finalized ^ (job.end_timestamp is None))
2218 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't update read-only job"
2219 66bd7445 Michael Hanselmann
2220 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
2221 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
2222 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
2223 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
2224 ac0930b9 Iustin Pop
2225 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2226 5c735209 Iustin Pop
                        timeout):
2227 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
2228 6c5a7090 Michael Hanselmann

2229 6c5a7090 Michael Hanselmann
    @type job_id: string
2230 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
2231 6c5a7090 Michael Hanselmann
    @type fields: list of strings
2232 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
2233 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
2234 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
2235 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
2236 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
2237 5c735209 Iustin Pop
    @type timeout: float
2238 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
2239 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
2240 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
2241 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
2242 ea03467c Iustin Pop

2243 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
2244 ea03467c Iustin Pop
        we instead return a special value,
2245 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2246 ea03467c Iustin Pop
        as such by the clients
2247 6c5a7090 Michael Hanselmann

2248 6c5a7090 Michael Hanselmann
    """
2249 c0f6d0d8 Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2250 c0f6d0d8 Michael Hanselmann
                             writable=False)
2251 989a8bee Michael Hanselmann
2252 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
2253 989a8bee Michael Hanselmann
2254 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
2255 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
2256 dfe57c22 Michael Hanselmann
2257 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2258 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2259 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
2260 188c5e0a Michael Hanselmann
    """Cancels a job.
2261 188c5e0a Michael Hanselmann

2262 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
2263 ea03467c Iustin Pop

2264 188c5e0a Michael Hanselmann
    @type job_id: string
2265 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
2266 188c5e0a Michael Hanselmann

2267 188c5e0a Michael Hanselmann
    """
2268 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
2269 188c5e0a Michael Hanselmann
2270 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2271 188c5e0a Michael Hanselmann
    if not job:
2272 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2273 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
2274 fbf0262f Michael Hanselmann
2275 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Can't cancel read-only job"
2276 c0f6d0d8 Michael Hanselmann
2277 099b2870 Michael Hanselmann
    (success, msg) = job.Cancel()
2278 188c5e0a Michael Hanselmann
2279 099b2870 Michael Hanselmann
    if success:
2280 66bd7445 Michael Hanselmann
      # If the job was finalized (e.g. cancelled), this is the final write
2281 66bd7445 Michael Hanselmann
      # allowed. The job can be archived anytime.
2282 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
2283 fbf0262f Michael Hanselmann
2284 099b2870 Michael Hanselmann
    return (success, msg)
2285 fbf0262f Michael Hanselmann
2286 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
2287 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
2288 d7fd1f28 Michael Hanselmann
    """Archives jobs.
2289 c609f802 Michael Hanselmann

2290 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
2291 25e7b43f Iustin Pop
    @param jobs: Job objects
2292 d7fd1f28 Michael Hanselmann
    @rtype: int
2293 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
2294 c609f802 Michael Hanselmann

2295 c609f802 Michael Hanselmann
    """
2296 d7fd1f28 Michael Hanselmann
    archive_jobs = []
2297 d7fd1f28 Michael Hanselmann
    rename_files = []
2298 d7fd1f28 Michael Hanselmann
    for job in jobs:
2299 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't archive read-only job"
2300 c0f6d0d8 Michael Hanselmann
2301 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2302 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
2303 d7fd1f28 Michael Hanselmann
        continue
2304 c609f802 Michael Hanselmann
2305 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
2306 c609f802 Michael Hanselmann
2307 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
2308 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
2309 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
2310 c609f802 Michael Hanselmann
2311 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
2312 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
2313 f1da30e6 Michael Hanselmann
2314 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
2315 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
2316 d7fd1f28 Michael Hanselmann
2317 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2318 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
2319 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
2320 20571a26 Guido Trotter
    # archived jobs to fix this.
2321 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
2322 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
2323 78d12585 Michael Hanselmann
2324 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2325 07cd723a Iustin Pop
  @_RequireOpenQueue
2326 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
2327 07cd723a Iustin Pop
    """Archives a job.
2328 07cd723a Iustin Pop

2329 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2330 ea03467c Iustin Pop

2331 07cd723a Iustin Pop
    @type job_id: string
2332 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
2333 78d12585 Michael Hanselmann
    @rtype: bool
2334 78d12585 Michael Hanselmann
    @return: Whether job was archived
2335 07cd723a Iustin Pop

2336 07cd723a Iustin Pop
    """
2337 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
2338 78d12585 Michael Hanselmann
2339 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2340 78d12585 Michael Hanselmann
    if not job:
2341 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2342 78d12585 Michael Hanselmann
      return False
2343 78d12585 Michael Hanselmann
2344 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
2345 07cd723a Iustin Pop
2346 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2347 07cd723a Iustin Pop
  @_RequireOpenQueue
2348 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
2349 07cd723a Iustin Pop
    """Archives all jobs based on age.
2350 07cd723a Iustin Pop

2351 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
2352 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
2353 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
2354 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
2355 07cd723a Iustin Pop

2356 07cd723a Iustin Pop
    @type age: int
2357 07cd723a Iustin Pop
    @param age: the minimum age in seconds
2358 07cd723a Iustin Pop

2359 07cd723a Iustin Pop
    """
2360 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
2361 07cd723a Iustin Pop
2362 07cd723a Iustin Pop
    now = time.time()
2363 f8ad5591 Michael Hanselmann
    end_time = now + timeout
2364 f8ad5591 Michael Hanselmann
    archived_count = 0
2365 f8ad5591 Michael Hanselmann
    last_touched = 0
2366 f8ad5591 Michael Hanselmann
2367 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
2368 d7fd1f28 Michael Hanselmann
    pending = []
2369 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
2370 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
2371 f8ad5591 Michael Hanselmann
2372 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
2373 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
2374 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
2375 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
2376 f8ad5591 Michael Hanselmann
        break
2377 f8ad5591 Michael Hanselmann
2378 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
2379 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
2380 f8ad5591 Michael Hanselmann
      if job:
2381 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
2382 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
2383 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
2384 f8ad5591 Michael Hanselmann
          else:
2385 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
2386 07cd723a Iustin Pop
        else:
2387 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
2388 f8ad5591 Michael Hanselmann
2389 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
2390 d7fd1f28 Michael Hanselmann
          pending.append(job)
2391 d7fd1f28 Michael Hanselmann
2392 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
2393 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
2394 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
2395 d7fd1f28 Michael Hanselmann
            pending = []
2396 f8ad5591 Michael Hanselmann
2397 d7fd1f28 Michael Hanselmann
    if pending:
2398 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
2399 07cd723a Iustin Pop
2400 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
2401 07cd723a Iustin Pop
2402 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
2403 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
2404 e2715f69 Michael Hanselmann

2405 ea03467c Iustin Pop
    @type job_ids: list
2406 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
2407 ea03467c Iustin Pop
    @type fields: list
2408 ea03467c Iustin Pop
    @param fields: names of fields to return
2409 ea03467c Iustin Pop
    @rtype: list
2410 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
2411 ea03467c Iustin Pop
        the requested fields
2412 e2715f69 Michael Hanselmann

2413 e2715f69 Michael Hanselmann
    """
2414 85f03e0d Michael Hanselmann
    jobs = []
2415 9f7b4967 Guido Trotter
    list_all = False
2416 9f7b4967 Guido Trotter
    if not job_ids:
2417 9f7b4967 Guido Trotter
      # Since files are added to/removed from the queue atomically, there's no
2418 9f7b4967 Guido Trotter
      # risk of getting the job ids in an inconsistent state.
2419 9f7b4967 Guido Trotter
      job_ids = self._GetJobIDsUnlocked()
2420 9f7b4967 Guido Trotter
      list_all = True
2421 e2715f69 Michael Hanselmann
2422 9f7b4967 Guido Trotter
    for job_id in job_ids:
2423 194c8ca4 Michael Hanselmann
      job = self.SafeLoadJobFromDisk(job_id, True)
2424 9f7b4967 Guido Trotter
      if job is not None:
2425 6a290889 Guido Trotter
        jobs.append(job.GetInfo(fields))
2426 9f7b4967 Guido Trotter
      elif not list_all:
2427 9f7b4967 Guido Trotter
        jobs.append(None)
2428 e2715f69 Michael Hanselmann
2429 85f03e0d Michael Hanselmann
    return jobs
2430 e2715f69 Michael Hanselmann
2431 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2432 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2433 e2715f69 Michael Hanselmann
  def Shutdown(self):
2434 e2715f69 Michael Hanselmann
    """Stops the job queue.
2435 e2715f69 Michael Hanselmann

2436 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
2437 ea03467c Iustin Pop

2438 e2715f69 Michael Hanselmann
    """
2439 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
2440 85f03e0d Michael Hanselmann
2441 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
2442 a71f9c7d Guido Trotter
    self._queue_filelock = None