Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 3891c95e

History | View | Annotate | Download (72.6 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 b95479a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 e2715f69 Michael Hanselmann
import logging
33 f1da30e6 Michael Hanselmann
import errno
34 f1048938 Iustin Pop
import time
35 5685c1a5 Michael Hanselmann
import weakref
36 b95479a5 Michael Hanselmann
import threading
37 dfc8824a Michael Hanselmann
import itertools
38 498ae1cc Iustin Pop
39 6c2549d6 Guido Trotter
try:
40 b459a848 Andrea Spadaccini
  # pylint: disable=E0611
41 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
42 6c2549d6 Guido Trotter
except ImportError:
43 6c2549d6 Guido Trotter
  import pyinotify
44 6c2549d6 Guido Trotter
45 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
46 e2715f69 Michael Hanselmann
from ganeti import constants
47 f1da30e6 Michael Hanselmann
from ganeti import serializer
48 e2715f69 Michael Hanselmann
from ganeti import workerpool
49 99bd4f0a Guido Trotter
from ganeti import locking
50 f1da30e6 Michael Hanselmann
from ganeti import opcodes
51 7a1ecaed Iustin Pop
from ganeti import errors
52 e2715f69 Michael Hanselmann
from ganeti import mcpu
53 7996a135 Iustin Pop
from ganeti import utils
54 04ab05ce Michael Hanselmann
from ganeti import jstore
55 c3f0a12f Iustin Pop
from ganeti import rpc
56 82b22e19 René Nussbaumer
from ganeti import runtime
57 a744b676 Manuel Franceschini
from ganeti import netutils
58 989a8bee Michael Hanselmann
from ganeti import compat
59 b95479a5 Michael Hanselmann
from ganeti import ht
60 a06c6ae8 Michael Hanselmann
from ganeti import query
61 a06c6ae8 Michael Hanselmann
from ganeti import qlang
62 e2715f69 Michael Hanselmann
63 fbf0262f Michael Hanselmann
64 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
65 e2715f69 Michael Hanselmann
66 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
67 ebb80afa Guido Trotter
_LOCK = "_lock"
68 ebb80afa Guido Trotter
_QUEUE = "_queue"
69 99bd4f0a Guido Trotter
70 498ae1cc Iustin Pop
71 9728ae5d Iustin Pop
class CancelJob(Exception):
72 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
73 fbf0262f Michael Hanselmann

74 fbf0262f Michael Hanselmann
  """
75 fbf0262f Michael Hanselmann
76 fbf0262f Michael Hanselmann
77 70552c46 Michael Hanselmann
def TimeStampNow():
78 ea03467c Iustin Pop
  """Returns the current timestamp.
79 ea03467c Iustin Pop

80 ea03467c Iustin Pop
  @rtype: tuple
81 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
82 ea03467c Iustin Pop

83 ea03467c Iustin Pop
  """
84 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
85 70552c46 Michael Hanselmann
86 70552c46 Michael Hanselmann
87 a06c6ae8 Michael Hanselmann
class _SimpleJobQuery:
88 a06c6ae8 Michael Hanselmann
  """Wrapper for job queries.
89 a06c6ae8 Michael Hanselmann

90 a06c6ae8 Michael Hanselmann
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
91 a06c6ae8 Michael Hanselmann

92 a06c6ae8 Michael Hanselmann
  """
93 a06c6ae8 Michael Hanselmann
  def __init__(self, fields):
94 a06c6ae8 Michael Hanselmann
    """Initializes this class.
95 a06c6ae8 Michael Hanselmann

96 a06c6ae8 Michael Hanselmann
    """
97 a06c6ae8 Michael Hanselmann
    self._query = query.Query(query.JOB_FIELDS, fields)
98 a06c6ae8 Michael Hanselmann
99 a06c6ae8 Michael Hanselmann
  def __call__(self, job):
100 a06c6ae8 Michael Hanselmann
    """Executes a job query using cached field list.
101 a06c6ae8 Michael Hanselmann

102 a06c6ae8 Michael Hanselmann
    """
103 a06c6ae8 Michael Hanselmann
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
104 a06c6ae8 Michael Hanselmann
105 a06c6ae8 Michael Hanselmann
106 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
107 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
108 e2715f69 Michael Hanselmann

109 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
110 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
111 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
112 ea03467c Iustin Pop
  @ivar status: the current status
113 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
114 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
115 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
116 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
117 f1048938 Iustin Pop

118 e2715f69 Michael Hanselmann
  """
119 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
120 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
121 66d895a8 Iustin Pop
               "__weakref__"]
122 66d895a8 Iustin Pop
123 85f03e0d Michael Hanselmann
  def __init__(self, op):
124 66abb9ff Michael Hanselmann
    """Initializes instances of this class.
125 ea03467c Iustin Pop

126 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
127 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
128 ea03467c Iustin Pop

129 ea03467c Iustin Pop
    """
130 85f03e0d Michael Hanselmann
    self.input = op
131 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
132 85f03e0d Michael Hanselmann
    self.result = None
133 85f03e0d Michael Hanselmann
    self.log = []
134 70552c46 Michael Hanselmann
    self.start_timestamp = None
135 b9b5abcb Iustin Pop
    self.exec_timestamp = None
136 70552c46 Michael Hanselmann
    self.end_timestamp = None
137 f1da30e6 Michael Hanselmann
138 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
139 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
140 8f5c488d Michael Hanselmann
141 f1da30e6 Michael Hanselmann
  @classmethod
142 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
143 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
144 ea03467c Iustin Pop

145 ea03467c Iustin Pop
    @type state: dict
146 ea03467c Iustin Pop
    @param state: the serialized state
147 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
148 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
149 ea03467c Iustin Pop

150 ea03467c Iustin Pop
    """
151 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
152 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
153 85f03e0d Michael Hanselmann
    obj.status = state["status"]
154 85f03e0d Michael Hanselmann
    obj.result = state["result"]
155 85f03e0d Michael Hanselmann
    obj.log = state["log"]
156 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
157 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
158 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
159 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
160 f1da30e6 Michael Hanselmann
    return obj
161 f1da30e6 Michael Hanselmann
162 f1da30e6 Michael Hanselmann
  def Serialize(self):
163 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
164 ea03467c Iustin Pop

165 ea03467c Iustin Pop
    @rtype: dict
166 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
167 ea03467c Iustin Pop

168 ea03467c Iustin Pop
    """
169 6c5a7090 Michael Hanselmann
    return {
170 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
171 6c5a7090 Michael Hanselmann
      "status": self.status,
172 6c5a7090 Michael Hanselmann
      "result": self.result,
173 6c5a7090 Michael Hanselmann
      "log": self.log,
174 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
175 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
176 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
177 8f5c488d Michael Hanselmann
      "priority": self.priority,
178 6c5a7090 Michael Hanselmann
      }
179 f1048938 Iustin Pop
180 e2715f69 Michael Hanselmann
181 e2715f69 Michael Hanselmann
class _QueuedJob(object):
182 e2715f69 Michael Hanselmann
  """In-memory job representation.
183 e2715f69 Michael Hanselmann

184 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
185 ea03467c Iustin Pop
  be taken care of by users of this class.
186 ea03467c Iustin Pop

187 ea03467c Iustin Pop
  @type queue: L{JobQueue}
188 ea03467c Iustin Pop
  @ivar queue: the parent queue
189 ea03467c Iustin Pop
  @ivar id: the job ID
190 ea03467c Iustin Pop
  @type ops: list
191 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
192 ea03467c Iustin Pop
  @type log_serial: int
193 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
194 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
195 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
196 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
197 c0f6d0d8 Michael Hanselmann
  @ivar writable: Whether the job is allowed to be modified
198 e2715f69 Michael Hanselmann

199 e2715f69 Michael Hanselmann
  """
200 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
201 26d3fd2f Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
202 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
203 c0f6d0d8 Michael Hanselmann
               "__weakref__", "processor_lock", "writable"]
204 66d895a8 Iustin Pop
205 c0f6d0d8 Michael Hanselmann
  def __init__(self, queue, job_id, ops, writable):
206 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
207 ea03467c Iustin Pop

208 ea03467c Iustin Pop
    @type queue: L{JobQueue}
209 ea03467c Iustin Pop
    @param queue: our parent queue
210 ea03467c Iustin Pop
    @type job_id: job_id
211 ea03467c Iustin Pop
    @param job_id: our job id
212 ea03467c Iustin Pop
    @type ops: list
213 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
214 ea03467c Iustin Pop
        in _QueuedOpCodes
215 c0f6d0d8 Michael Hanselmann
    @type writable: bool
216 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
217 ea03467c Iustin Pop

218 ea03467c Iustin Pop
    """
219 e2715f69 Michael Hanselmann
    if not ops:
220 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
221 e2715f69 Michael Hanselmann
222 85f03e0d Michael Hanselmann
    self.queue = queue
223 f1da30e6 Michael Hanselmann
    self.id = job_id
224 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
225 6c5a7090 Michael Hanselmann
    self.log_serial = 0
226 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
227 c56ec146 Iustin Pop
    self.start_timestamp = None
228 c56ec146 Iustin Pop
    self.end_timestamp = None
229 6c5a7090 Michael Hanselmann
230 c0f6d0d8 Michael Hanselmann
    self._InitInMemory(self, writable)
231 fa4aa6b4 Michael Hanselmann
232 fa4aa6b4 Michael Hanselmann
  @staticmethod
233 c0f6d0d8 Michael Hanselmann
  def _InitInMemory(obj, writable):
234 fa4aa6b4 Michael Hanselmann
    """Initializes in-memory variables.
235 fa4aa6b4 Michael Hanselmann

236 fa4aa6b4 Michael Hanselmann
    """
237 c0f6d0d8 Michael Hanselmann
    obj.writable = writable
238 03b63608 Michael Hanselmann
    obj.ops_iter = None
239 26d3fd2f Michael Hanselmann
    obj.cur_opctx = None
240 f8a4adfa Michael Hanselmann
241 f8a4adfa Michael Hanselmann
    # Read-only jobs are not processed and therefore don't need a lock
242 f8a4adfa Michael Hanselmann
    if writable:
243 f8a4adfa Michael Hanselmann
      obj.processor_lock = threading.Lock()
244 f8a4adfa Michael Hanselmann
    else:
245 f8a4adfa Michael Hanselmann
      obj.processor_lock = None
246 be760ba8 Michael Hanselmann
247 9fa2e150 Michael Hanselmann
  def __repr__(self):
248 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
249 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
250 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
251 9fa2e150 Michael Hanselmann
252 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
253 9fa2e150 Michael Hanselmann
254 f1da30e6 Michael Hanselmann
  @classmethod
255 c0f6d0d8 Michael Hanselmann
  def Restore(cls, queue, state, writable):
256 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
257 ea03467c Iustin Pop

258 ea03467c Iustin Pop
    @type queue: L{JobQueue}
259 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
260 ea03467c Iustin Pop
    @type state: dict
261 ea03467c Iustin Pop
    @param state: the serialized state
262 c0f6d0d8 Michael Hanselmann
    @type writable: bool
263 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
264 ea03467c Iustin Pop
    @rtype: _JobQueue
265 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
266 ea03467c Iustin Pop

267 ea03467c Iustin Pop
    """
268 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
269 85f03e0d Michael Hanselmann
    obj.queue = queue
270 85f03e0d Michael Hanselmann
    obj.id = state["id"]
271 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
272 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
273 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
274 6c5a7090 Michael Hanselmann
275 6c5a7090 Michael Hanselmann
    obj.ops = []
276 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
277 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
278 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
279 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
280 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
281 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
282 6c5a7090 Michael Hanselmann
283 c0f6d0d8 Michael Hanselmann
    cls._InitInMemory(obj, writable)
284 be760ba8 Michael Hanselmann
285 f1da30e6 Michael Hanselmann
    return obj
286 f1da30e6 Michael Hanselmann
287 f1da30e6 Michael Hanselmann
  def Serialize(self):
288 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
289 ea03467c Iustin Pop

290 ea03467c Iustin Pop
    @rtype: dict
291 ea03467c Iustin Pop
    @return: the serialized state
292 ea03467c Iustin Pop

293 ea03467c Iustin Pop
    """
294 f1da30e6 Michael Hanselmann
    return {
295 f1da30e6 Michael Hanselmann
      "id": self.id,
296 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
297 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
298 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
299 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
300 f1da30e6 Michael Hanselmann
      }
301 f1da30e6 Michael Hanselmann
302 85f03e0d Michael Hanselmann
  def CalcStatus(self):
303 ea03467c Iustin Pop
    """Compute the status of this job.
304 ea03467c Iustin Pop

305 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
306 ea03467c Iustin Pop
    based on their status, computes the job status.
307 ea03467c Iustin Pop

308 ea03467c Iustin Pop
    The algorithm is:
309 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
310 ea03467c Iustin Pop
        status will be the same
311 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
312 ea03467c Iustin Pop
          - waitlock
313 fbf0262f Michael Hanselmann
          - canceling
314 ea03467c Iustin Pop
          - running
315 ea03467c Iustin Pop

316 ea03467c Iustin Pop
        will determine the job status
317 ea03467c Iustin Pop

318 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
319 ea03467c Iustin Pop
        and the job status will be the same
320 ea03467c Iustin Pop

321 ea03467c Iustin Pop
    @return: the job status
322 ea03467c Iustin Pop

323 ea03467c Iustin Pop
    """
324 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
325 e2715f69 Michael Hanselmann
326 e2715f69 Michael Hanselmann
    all_success = True
327 85f03e0d Michael Hanselmann
    for op in self.ops:
328 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
329 e2715f69 Michael Hanselmann
        continue
330 e2715f69 Michael Hanselmann
331 e2715f69 Michael Hanselmann
      all_success = False
332 e2715f69 Michael Hanselmann
333 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
334 e2715f69 Michael Hanselmann
        pass
335 47099cd1 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING:
336 47099cd1 Michael Hanselmann
        status = constants.JOB_STATUS_WAITING
337 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
338 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
339 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
340 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
341 fbf0262f Michael Hanselmann
        break
342 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
343 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
344 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
345 f1da30e6 Michael Hanselmann
        break
346 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
347 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
348 4cb1d919 Michael Hanselmann
        break
349 e2715f69 Michael Hanselmann
350 e2715f69 Michael Hanselmann
    if all_success:
351 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
352 e2715f69 Michael Hanselmann
353 e2715f69 Michael Hanselmann
    return status
354 e2715f69 Michael Hanselmann
355 8f5c488d Michael Hanselmann
  def CalcPriority(self):
356 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
357 8f5c488d Michael Hanselmann

358 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
359 8f5c488d Michael Hanselmann
    priority is used.
360 8f5c488d Michael Hanselmann

361 8f5c488d Michael Hanselmann
    @rtype: int
362 8f5c488d Michael Hanselmann

363 8f5c488d Michael Hanselmann
    """
364 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
365 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
366 8f5c488d Michael Hanselmann
367 8f5c488d Michael Hanselmann
    if not priorities:
368 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
369 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
370 8f5c488d Michael Hanselmann
371 8f5c488d Michael Hanselmann
    return min(priorities)
372 8f5c488d Michael Hanselmann
373 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
374 ea03467c Iustin Pop
    """Selectively returns the log entries.
375 ea03467c Iustin Pop

376 ea03467c Iustin Pop
    @type newer_than: None or int
377 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
378 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
379 ea03467c Iustin Pop
        than this value
380 ea03467c Iustin Pop
    @rtype: list
381 ea03467c Iustin Pop
    @return: the list of the log entries selected
382 ea03467c Iustin Pop

383 ea03467c Iustin Pop
    """
384 6c5a7090 Michael Hanselmann
    if newer_than is None:
385 6c5a7090 Michael Hanselmann
      serial = -1
386 6c5a7090 Michael Hanselmann
    else:
387 6c5a7090 Michael Hanselmann
      serial = newer_than
388 6c5a7090 Michael Hanselmann
389 6c5a7090 Michael Hanselmann
    entries = []
390 6c5a7090 Michael Hanselmann
    for op in self.ops:
391 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
392 6c5a7090 Michael Hanselmann
393 6c5a7090 Michael Hanselmann
    return entries
394 6c5a7090 Michael Hanselmann
395 6a290889 Guido Trotter
  def GetInfo(self, fields):
396 6a290889 Guido Trotter
    """Returns information about a job.
397 6a290889 Guido Trotter

398 6a290889 Guido Trotter
    @type fields: list
399 6a290889 Guido Trotter
    @param fields: names of fields to return
400 6a290889 Guido Trotter
    @rtype: list
401 6a290889 Guido Trotter
    @return: list with one element for each field
402 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
403 6a290889 Guido Trotter
        has been passed
404 6a290889 Guido Trotter

405 6a290889 Guido Trotter
    """
406 a06c6ae8 Michael Hanselmann
    return _SimpleJobQuery(fields)(self)
407 6a290889 Guido Trotter
408 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
409 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
410 34327f51 Iustin Pop

411 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
412 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
413 34327f51 Iustin Pop
    finalised are not changed.
414 34327f51 Iustin Pop

415 34327f51 Iustin Pop
    @param status: a given opcode status
416 34327f51 Iustin Pop
    @param result: the opcode result
417 34327f51 Iustin Pop

418 34327f51 Iustin Pop
    """
419 747f6113 Michael Hanselmann
    not_marked = True
420 747f6113 Michael Hanselmann
    for op in self.ops:
421 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
422 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
423 747f6113 Michael Hanselmann
        continue
424 747f6113 Michael Hanselmann
      op.status = status
425 747f6113 Michael Hanselmann
      op.result = result
426 747f6113 Michael Hanselmann
      not_marked = False
427 34327f51 Iustin Pop
428 66bd7445 Michael Hanselmann
  def Finalize(self):
429 66bd7445 Michael Hanselmann
    """Marks the job as finalized.
430 66bd7445 Michael Hanselmann

431 66bd7445 Michael Hanselmann
    """
432 66bd7445 Michael Hanselmann
    self.end_timestamp = TimeStampNow()
433 66bd7445 Michael Hanselmann
434 099b2870 Michael Hanselmann
  def Cancel(self):
435 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
436 a0d2fe2c Michael Hanselmann

437 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
438 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
439 a0d2fe2c Michael Hanselmann
      as canceling and a text message
440 a0d2fe2c Michael Hanselmann

441 a0d2fe2c Michael Hanselmann
    """
442 099b2870 Michael Hanselmann
    status = self.CalcStatus()
443 099b2870 Michael Hanselmann
444 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
445 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
446 099b2870 Michael Hanselmann
                             "Job canceled by request")
447 66bd7445 Michael Hanselmann
      self.Finalize()
448 86b16e9d Michael Hanselmann
      return (True, "Job %s canceled" % self.id)
449 099b2870 Michael Hanselmann
450 47099cd1 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITING:
451 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
452 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
453 86b16e9d Michael Hanselmann
      return (True, "Job %s will be canceled" % self.id)
454 099b2870 Michael Hanselmann
455 86b16e9d Michael Hanselmann
    else:
456 86b16e9d Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
457 86b16e9d Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
458 099b2870 Michael Hanselmann
459 f1048938 Iustin Pop
460 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
461 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
462 031a3e57 Michael Hanselmann
    """Initializes this class.
463 ea03467c Iustin Pop

464 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
465 031a3e57 Michael Hanselmann
    @param queue: Job queue
466 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
467 031a3e57 Michael Hanselmann
    @param job: Job object
468 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
469 031a3e57 Michael Hanselmann
    @param op: OpCode
470 031a3e57 Michael Hanselmann

471 031a3e57 Michael Hanselmann
    """
472 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
473 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
474 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
475 031a3e57 Michael Hanselmann
476 031a3e57 Michael Hanselmann
    self._queue = queue
477 031a3e57 Michael Hanselmann
    self._job = job
478 031a3e57 Michael Hanselmann
    self._op = op
479 031a3e57 Michael Hanselmann
480 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
481 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
482 dc1e2262 Michael Hanselmann

483 dc1e2262 Michael Hanselmann
    """
484 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
485 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
486 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
487 dc1e2262 Michael Hanselmann
      raise CancelJob()
488 dc1e2262 Michael Hanselmann
489 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
490 031a3e57 Michael Hanselmann
  def NotifyStart(self):
491 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
492 e92376d7 Iustin Pop

493 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
494 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
495 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
496 47099cd1 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
497 e92376d7 Iustin Pop

498 e92376d7 Iustin Pop
    """
499 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
500 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
501 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
502 fbf0262f Michael Hanselmann
503 271daef8 Iustin Pop
    # Cancel here if we were asked to
504 dc1e2262 Michael Hanselmann
    self._CheckCancel()
505 fbf0262f Michael Hanselmann
506 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
507 9bdab621 Michael Hanselmann
508 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
509 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
510 271daef8 Iustin Pop
511 271daef8 Iustin Pop
    # And finally replicate the job status
512 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
513 031a3e57 Michael Hanselmann
514 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
515 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
516 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
517 9bf5e01f Guido Trotter

518 9bf5e01f Guido Trotter
    """
519 9bf5e01f Guido Trotter
    self._job.log_serial += 1
520 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
521 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
522 9bf5e01f Guido Trotter
523 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
524 031a3e57 Michael Hanselmann
    """Append a log entry.
525 031a3e57 Michael Hanselmann

526 031a3e57 Michael Hanselmann
    """
527 031a3e57 Michael Hanselmann
    assert len(args) < 3
528 031a3e57 Michael Hanselmann
529 031a3e57 Michael Hanselmann
    if len(args) == 1:
530 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
531 031a3e57 Michael Hanselmann
      log_msg = args[0]
532 031a3e57 Michael Hanselmann
    else:
533 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
534 031a3e57 Michael Hanselmann
535 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
536 031a3e57 Michael Hanselmann
    # precision.
537 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
538 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
539 031a3e57 Michael Hanselmann
540 acf931b7 Michael Hanselmann
  def CheckCancel(self):
541 acf931b7 Michael Hanselmann
    """Check whether job has been cancelled.
542 ef2df7d3 Michael Hanselmann

543 ef2df7d3 Michael Hanselmann
    """
544 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
545 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
546 dc1e2262 Michael Hanselmann
547 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
548 dc1e2262 Michael Hanselmann
    self._CheckCancel()
549 dc1e2262 Michael Hanselmann
550 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
551 6a373640 Michael Hanselmann
    """Submits jobs for processing.
552 6a373640 Michael Hanselmann

553 6a373640 Michael Hanselmann
    See L{JobQueue.SubmitManyJobs}.
554 6a373640 Michael Hanselmann

555 6a373640 Michael Hanselmann
    """
556 6a373640 Michael Hanselmann
    # Locking is done in job queue
557 6a373640 Michael Hanselmann
    return self._queue.SubmitManyJobs(jobs)
558 6a373640 Michael Hanselmann
559 031a3e57 Michael Hanselmann
560 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
561 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
562 989a8bee Michael Hanselmann
    """Initializes this class.
563 6c2549d6 Guido Trotter

564 989a8bee Michael Hanselmann
    @type fields: list of strings
565 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
566 989a8bee Michael Hanselmann
    @type prev_job_info: string
567 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
568 989a8bee Michael Hanselmann
    @type prev_log_serial: string
569 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
570 6c2549d6 Guido Trotter

571 989a8bee Michael Hanselmann
    """
572 dc2879ea Michael Hanselmann
    self._squery = _SimpleJobQuery(fields)
573 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
574 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
575 6c2549d6 Guido Trotter
576 989a8bee Michael Hanselmann
  def __call__(self, job):
577 989a8bee Michael Hanselmann
    """Checks whether job has changed.
578 6c2549d6 Guido Trotter

579 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
580 989a8bee Michael Hanselmann
    @param job: Job object
581 6c2549d6 Guido Trotter

582 6c2549d6 Guido Trotter
    """
583 c0f6d0d8 Michael Hanselmann
    assert not job.writable, "Expected read-only job"
584 c0f6d0d8 Michael Hanselmann
585 989a8bee Michael Hanselmann
    status = job.CalcStatus()
586 dc2879ea Michael Hanselmann
    job_info = self._squery(job)
587 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
588 6c2549d6 Guido Trotter
589 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
590 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
591 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
592 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
593 6c2549d6 Guido Trotter
    # significantly different.
594 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
595 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
596 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
597 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
598 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
599 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
600 6c2549d6 Guido Trotter
601 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
602 6c2549d6 Guido Trotter
    # no changes.
603 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
604 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
605 47099cd1 Michael Hanselmann
                       constants.JOB_STATUS_WAITING) or
606 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
607 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
608 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
609 989a8bee Michael Hanselmann
      return (job_info, log_entries)
610 6c2549d6 Guido Trotter
611 989a8bee Michael Hanselmann
    return None
612 989a8bee Michael Hanselmann
613 989a8bee Michael Hanselmann
614 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
615 989a8bee Michael Hanselmann
  def __init__(self, filename):
616 989a8bee Michael Hanselmann
    """Initializes this class.
617 989a8bee Michael Hanselmann

618 989a8bee Michael Hanselmann
    @type filename: string
619 989a8bee Michael Hanselmann
    @param filename: Path to job file
620 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
621 6c2549d6 Guido Trotter

622 989a8bee Michael Hanselmann
    """
623 989a8bee Michael Hanselmann
    self._wm = pyinotify.WatchManager()
624 989a8bee Michael Hanselmann
    self._inotify_handler = \
625 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
626 989a8bee Michael Hanselmann
    self._notifier = \
627 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
628 989a8bee Michael Hanselmann
    try:
629 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
630 989a8bee Michael Hanselmann
    except Exception:
631 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
632 989a8bee Michael Hanselmann
      self._notifier.stop()
633 989a8bee Michael Hanselmann
      raise
634 989a8bee Michael Hanselmann
635 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
636 989a8bee Michael Hanselmann
    """Callback for inotify.
637 989a8bee Michael Hanselmann

638 989a8bee Michael Hanselmann
    """
639 6c2549d6 Guido Trotter
    if not notifier_enabled:
640 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
641 989a8bee Michael Hanselmann
642 989a8bee Michael Hanselmann
  def Wait(self, timeout):
643 989a8bee Michael Hanselmann
    """Waits for the job file to change.
644 989a8bee Michael Hanselmann

645 989a8bee Michael Hanselmann
    @type timeout: float
646 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
647 989a8bee Michael Hanselmann
    @return: Whether there have been events
648 989a8bee Michael Hanselmann

649 989a8bee Michael Hanselmann
    """
650 989a8bee Michael Hanselmann
    assert timeout >= 0
651 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
652 989a8bee Michael Hanselmann
    if have_events:
653 989a8bee Michael Hanselmann
      self._notifier.read_events()
654 989a8bee Michael Hanselmann
    self._notifier.process_events()
655 989a8bee Michael Hanselmann
    return have_events
656 989a8bee Michael Hanselmann
657 989a8bee Michael Hanselmann
  def Close(self):
658 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
659 989a8bee Michael Hanselmann

660 989a8bee Michael Hanselmann
    """
661 989a8bee Michael Hanselmann
    self._notifier.stop()
662 989a8bee Michael Hanselmann
663 989a8bee Michael Hanselmann
664 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
665 989a8bee Michael Hanselmann
  def __init__(self, filename):
666 989a8bee Michael Hanselmann
    """Initializes this class.
667 989a8bee Michael Hanselmann

668 989a8bee Michael Hanselmann
    @type filename: string
669 989a8bee Michael Hanselmann
    @param filename: Path to job file
670 989a8bee Michael Hanselmann

671 989a8bee Michael Hanselmann
    """
672 989a8bee Michael Hanselmann
    self._filewaiter = None
673 989a8bee Michael Hanselmann
    self._filename = filename
674 6c2549d6 Guido Trotter
675 989a8bee Michael Hanselmann
  def Wait(self, timeout):
676 989a8bee Michael Hanselmann
    """Waits for a job to change.
677 6c2549d6 Guido Trotter

678 989a8bee Michael Hanselmann
    @type timeout: float
679 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
680 989a8bee Michael Hanselmann
    @return: Whether there have been events
681 989a8bee Michael Hanselmann

682 989a8bee Michael Hanselmann
    """
683 989a8bee Michael Hanselmann
    if self._filewaiter:
684 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
685 989a8bee Michael Hanselmann
686 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
687 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
688 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
689 989a8bee Michael Hanselmann
    # race condition.
690 989a8bee Michael Hanselmann
    self._filewaiter = _JobFileChangesWaiter(self._filename)
691 989a8bee Michael Hanselmann
692 989a8bee Michael Hanselmann
    return True
693 989a8bee Michael Hanselmann
694 989a8bee Michael Hanselmann
  def Close(self):
695 989a8bee Michael Hanselmann
    """Closes underlying waiter.
696 989a8bee Michael Hanselmann

697 989a8bee Michael Hanselmann
    """
698 989a8bee Michael Hanselmann
    if self._filewaiter:
699 989a8bee Michael Hanselmann
      self._filewaiter.Close()
700 989a8bee Michael Hanselmann
701 989a8bee Michael Hanselmann
702 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
703 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
704 989a8bee Michael Hanselmann

705 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
706 989a8bee Michael Hanselmann
  the current job status has changed.
707 989a8bee Michael Hanselmann

708 989a8bee Michael Hanselmann
  """
709 989a8bee Michael Hanselmann
  @staticmethod
710 dfc8824a Michael Hanselmann
  def _CheckForChanges(counter, job_load_fn, check_fn):
711 dfc8824a Michael Hanselmann
    if counter.next() > 0:
712 dfc8824a Michael Hanselmann
      # If this isn't the first check the job is given some more time to change
713 dfc8824a Michael Hanselmann
      # again. This gives better performance for jobs generating many
714 dfc8824a Michael Hanselmann
      # changes/messages.
715 dfc8824a Michael Hanselmann
      time.sleep(0.1)
716 dfc8824a Michael Hanselmann
717 989a8bee Michael Hanselmann
    job = job_load_fn()
718 989a8bee Michael Hanselmann
    if not job:
719 989a8bee Michael Hanselmann
      raise errors.JobLost()
720 989a8bee Michael Hanselmann
721 989a8bee Michael Hanselmann
    result = check_fn(job)
722 989a8bee Michael Hanselmann
    if result is None:
723 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
724 989a8bee Michael Hanselmann
725 989a8bee Michael Hanselmann
    return result
726 989a8bee Michael Hanselmann
727 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
728 989a8bee Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout):
729 989a8bee Michael Hanselmann
    """Waits for changes on a job.
730 989a8bee Michael Hanselmann

731 989a8bee Michael Hanselmann
    @type filename: string
732 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
733 989a8bee Michael Hanselmann
    @type job_load_fn: callable
734 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
735 989a8bee Michael Hanselmann
    @type fields: list of strings
736 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
737 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
738 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
739 989a8bee Michael Hanselmann
    @type prev_log_serial: int
740 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
741 989a8bee Michael Hanselmann
    @type timeout: float
742 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
743 989a8bee Michael Hanselmann

744 989a8bee Michael Hanselmann
    """
745 dfc8824a Michael Hanselmann
    counter = itertools.count()
746 6c2549d6 Guido Trotter
    try:
747 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
748 989a8bee Michael Hanselmann
      waiter = _JobChangesWaiter(filename)
749 989a8bee Michael Hanselmann
      try:
750 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
751 dfc8824a Michael Hanselmann
                                          counter, job_load_fn, check_fn),
752 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
753 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
754 989a8bee Michael Hanselmann
      finally:
755 989a8bee Michael Hanselmann
        waiter.Close()
756 6c2549d6 Guido Trotter
    except (errors.InotifyError, errors.JobLost):
757 6c2549d6 Guido Trotter
      return None
758 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
759 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
760 6c2549d6 Guido Trotter
761 6c2549d6 Guido Trotter
762 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
763 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
764 6760e4ed Michael Hanselmann

765 6760e4ed Michael Hanselmann
  """
766 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
767 6760e4ed Michael Hanselmann
    to_encode = err
768 6760e4ed Michael Hanselmann
  else:
769 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
770 6760e4ed Michael Hanselmann
771 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
772 6760e4ed Michael Hanselmann
773 6760e4ed Michael Hanselmann
774 26d3fd2f Michael Hanselmann
class _TimeoutStrategyWrapper:
775 26d3fd2f Michael Hanselmann
  def __init__(self, fn):
776 26d3fd2f Michael Hanselmann
    """Initializes this class.
777 26d3fd2f Michael Hanselmann

778 26d3fd2f Michael Hanselmann
    """
779 26d3fd2f Michael Hanselmann
    self._fn = fn
780 26d3fd2f Michael Hanselmann
    self._next = None
781 26d3fd2f Michael Hanselmann
782 26d3fd2f Michael Hanselmann
  def _Advance(self):
783 26d3fd2f Michael Hanselmann
    """Gets the next timeout if necessary.
784 26d3fd2f Michael Hanselmann

785 26d3fd2f Michael Hanselmann
    """
786 26d3fd2f Michael Hanselmann
    if self._next is None:
787 26d3fd2f Michael Hanselmann
      self._next = self._fn()
788 26d3fd2f Michael Hanselmann
789 26d3fd2f Michael Hanselmann
  def Peek(self):
790 26d3fd2f Michael Hanselmann
    """Returns the next timeout.
791 26d3fd2f Michael Hanselmann

792 26d3fd2f Michael Hanselmann
    """
793 26d3fd2f Michael Hanselmann
    self._Advance()
794 26d3fd2f Michael Hanselmann
    return self._next
795 26d3fd2f Michael Hanselmann
796 26d3fd2f Michael Hanselmann
  def Next(self):
797 26d3fd2f Michael Hanselmann
    """Returns the current timeout and advances the internal state.
798 26d3fd2f Michael Hanselmann

799 26d3fd2f Michael Hanselmann
    """
800 26d3fd2f Michael Hanselmann
    self._Advance()
801 26d3fd2f Michael Hanselmann
    result = self._next
802 26d3fd2f Michael Hanselmann
    self._next = None
803 26d3fd2f Michael Hanselmann
    return result
804 26d3fd2f Michael Hanselmann
805 26d3fd2f Michael Hanselmann
806 b80cc518 Michael Hanselmann
class _OpExecContext:
807 26d3fd2f Michael Hanselmann
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
808 b80cc518 Michael Hanselmann
    """Initializes this class.
809 b80cc518 Michael Hanselmann

810 b80cc518 Michael Hanselmann
    """
811 b80cc518 Michael Hanselmann
    self.op = op
812 b80cc518 Michael Hanselmann
    self.index = index
813 b80cc518 Michael Hanselmann
    self.log_prefix = log_prefix
814 b80cc518 Michael Hanselmann
    self.summary = op.input.Summary()
815 b80cc518 Michael Hanselmann
816 b95479a5 Michael Hanselmann
    # Create local copy to modify
817 b95479a5 Michael Hanselmann
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
818 b95479a5 Michael Hanselmann
      self.jobdeps = op.input.depends[:]
819 b95479a5 Michael Hanselmann
    else:
820 b95479a5 Michael Hanselmann
      self.jobdeps = None
821 b95479a5 Michael Hanselmann
822 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = timeout_strategy_factory
823 26d3fd2f Michael Hanselmann
    self._ResetTimeoutStrategy()
824 26d3fd2f Michael Hanselmann
825 26d3fd2f Michael Hanselmann
  def _ResetTimeoutStrategy(self):
826 26d3fd2f Michael Hanselmann
    """Creates a new timeout strategy.
827 26d3fd2f Michael Hanselmann

828 26d3fd2f Michael Hanselmann
    """
829 26d3fd2f Michael Hanselmann
    self._timeout_strategy = \
830 26d3fd2f Michael Hanselmann
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
831 26d3fd2f Michael Hanselmann
832 26d3fd2f Michael Hanselmann
  def CheckPriorityIncrease(self):
833 26d3fd2f Michael Hanselmann
    """Checks whether priority can and should be increased.
834 26d3fd2f Michael Hanselmann

835 26d3fd2f Michael Hanselmann
    Called when locks couldn't be acquired.
836 26d3fd2f Michael Hanselmann

837 26d3fd2f Michael Hanselmann
    """
838 26d3fd2f Michael Hanselmann
    op = self.op
839 26d3fd2f Michael Hanselmann
840 26d3fd2f Michael Hanselmann
    # Exhausted all retries and next round should not use blocking acquire
841 26d3fd2f Michael Hanselmann
    # for locks?
842 26d3fd2f Michael Hanselmann
    if (self._timeout_strategy.Peek() is None and
843 26d3fd2f Michael Hanselmann
        op.priority > constants.OP_PRIO_HIGHEST):
844 26d3fd2f Michael Hanselmann
      logging.debug("Increasing priority")
845 26d3fd2f Michael Hanselmann
      op.priority -= 1
846 26d3fd2f Michael Hanselmann
      self._ResetTimeoutStrategy()
847 26d3fd2f Michael Hanselmann
      return True
848 26d3fd2f Michael Hanselmann
849 26d3fd2f Michael Hanselmann
    return False
850 26d3fd2f Michael Hanselmann
851 26d3fd2f Michael Hanselmann
  def GetNextLockTimeout(self):
852 26d3fd2f Michael Hanselmann
    """Returns the next lock acquire timeout.
853 26d3fd2f Michael Hanselmann

854 26d3fd2f Michael Hanselmann
    """
855 26d3fd2f Michael Hanselmann
    return self._timeout_strategy.Next()
856 26d3fd2f Michael Hanselmann
857 b80cc518 Michael Hanselmann
858 be760ba8 Michael Hanselmann
class _JobProcessor(object):
859 75d81fc8 Michael Hanselmann
  (DEFER,
860 75d81fc8 Michael Hanselmann
   WAITDEP,
861 75d81fc8 Michael Hanselmann
   FINISHED) = range(1, 4)
862 75d81fc8 Michael Hanselmann
863 26d3fd2f Michael Hanselmann
  def __init__(self, queue, opexec_fn, job,
864 26d3fd2f Michael Hanselmann
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
865 be760ba8 Michael Hanselmann
    """Initializes this class.
866 be760ba8 Michael Hanselmann

867 be760ba8 Michael Hanselmann
    """
868 be760ba8 Michael Hanselmann
    self.queue = queue
869 be760ba8 Michael Hanselmann
    self.opexec_fn = opexec_fn
870 be760ba8 Michael Hanselmann
    self.job = job
871 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = _timeout_strategy_factory
872 be760ba8 Michael Hanselmann
873 be760ba8 Michael Hanselmann
  @staticmethod
874 26d3fd2f Michael Hanselmann
  def _FindNextOpcode(job, timeout_strategy_factory):
875 be760ba8 Michael Hanselmann
    """Locates the next opcode to run.
876 be760ba8 Michael Hanselmann

877 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
878 be760ba8 Michael Hanselmann
    @param job: Job object
879 26d3fd2f Michael Hanselmann
    @param timeout_strategy_factory: Callable to create new timeout strategy
880 be760ba8 Michael Hanselmann

881 be760ba8 Michael Hanselmann
    """
882 be760ba8 Michael Hanselmann
    # Create some sort of a cache to speed up locating next opcode for future
883 be760ba8 Michael Hanselmann
    # lookups
884 be760ba8 Michael Hanselmann
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
885 be760ba8 Michael Hanselmann
    # pending and one for processed ops.
886 03b63608 Michael Hanselmann
    if job.ops_iter is None:
887 03b63608 Michael Hanselmann
      job.ops_iter = enumerate(job.ops)
888 be760ba8 Michael Hanselmann
889 be760ba8 Michael Hanselmann
    # Find next opcode to run
890 be760ba8 Michael Hanselmann
    while True:
891 be760ba8 Michael Hanselmann
      try:
892 03b63608 Michael Hanselmann
        (idx, op) = job.ops_iter.next()
893 be760ba8 Michael Hanselmann
      except StopIteration:
894 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for a finished job")
895 be760ba8 Michael Hanselmann
896 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_RUNNING:
897 be760ba8 Michael Hanselmann
        # Found an opcode already marked as running
898 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for job marked as running")
899 be760ba8 Michael Hanselmann
900 26d3fd2f Michael Hanselmann
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
901 26d3fd2f Michael Hanselmann
                             timeout_strategy_factory)
902 be760ba8 Michael Hanselmann
903 66bd7445 Michael Hanselmann
      if op.status not in constants.OPS_FINALIZED:
904 66bd7445 Michael Hanselmann
        return opctx
905 be760ba8 Michael Hanselmann
906 66bd7445 Michael Hanselmann
      # This is a job that was partially completed before master daemon
907 66bd7445 Michael Hanselmann
      # shutdown, so it can be expected that some opcodes are already
908 66bd7445 Michael Hanselmann
      # completed successfully (if any did error out, then the whole job
909 66bd7445 Michael Hanselmann
      # should have been aborted and not resubmitted for processing).
910 66bd7445 Michael Hanselmann
      logging.info("%s: opcode %s already processed, skipping",
911 66bd7445 Michael Hanselmann
                   opctx.log_prefix, opctx.summary)
912 be760ba8 Michael Hanselmann
913 be760ba8 Michael Hanselmann
  @staticmethod
914 be760ba8 Michael Hanselmann
  def _MarkWaitlock(job, op):
915 be760ba8 Michael Hanselmann
    """Marks an opcode as waiting for locks.
916 be760ba8 Michael Hanselmann

917 be760ba8 Michael Hanselmann
    The job's start timestamp is also set if necessary.
918 be760ba8 Michael Hanselmann

919 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
920 be760ba8 Michael Hanselmann
    @param job: Job object
921 a38e8674 Michael Hanselmann
    @type op: L{_QueuedOpCode}
922 a38e8674 Michael Hanselmann
    @param op: Opcode object
923 be760ba8 Michael Hanselmann

924 be760ba8 Michael Hanselmann
    """
925 be760ba8 Michael Hanselmann
    assert op in job.ops
926 5fd6b694 Michael Hanselmann
    assert op.status in (constants.OP_STATUS_QUEUED,
927 47099cd1 Michael Hanselmann
                         constants.OP_STATUS_WAITING)
928 5fd6b694 Michael Hanselmann
929 5fd6b694 Michael Hanselmann
    update = False
930 be760ba8 Michael Hanselmann
931 be760ba8 Michael Hanselmann
    op.result = None
932 5fd6b694 Michael Hanselmann
933 5fd6b694 Michael Hanselmann
    if op.status == constants.OP_STATUS_QUEUED:
934 47099cd1 Michael Hanselmann
      op.status = constants.OP_STATUS_WAITING
935 5fd6b694 Michael Hanselmann
      update = True
936 5fd6b694 Michael Hanselmann
937 5fd6b694 Michael Hanselmann
    if op.start_timestamp is None:
938 5fd6b694 Michael Hanselmann
      op.start_timestamp = TimeStampNow()
939 5fd6b694 Michael Hanselmann
      update = True
940 be760ba8 Michael Hanselmann
941 be760ba8 Michael Hanselmann
    if job.start_timestamp is None:
942 be760ba8 Michael Hanselmann
      job.start_timestamp = op.start_timestamp
943 5fd6b694 Michael Hanselmann
      update = True
944 5fd6b694 Michael Hanselmann
945 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
946 5fd6b694 Michael Hanselmann
947 5fd6b694 Michael Hanselmann
    return update
948 be760ba8 Michael Hanselmann
949 b95479a5 Michael Hanselmann
  @staticmethod
950 b95479a5 Michael Hanselmann
  def _CheckDependencies(queue, job, opctx):
951 b95479a5 Michael Hanselmann
    """Checks if an opcode has dependencies and if so, processes them.
952 b95479a5 Michael Hanselmann

953 b95479a5 Michael Hanselmann
    @type queue: L{JobQueue}
954 b95479a5 Michael Hanselmann
    @param queue: Queue object
955 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
956 b95479a5 Michael Hanselmann
    @param job: Job object
957 b95479a5 Michael Hanselmann
    @type opctx: L{_OpExecContext}
958 b95479a5 Michael Hanselmann
    @param opctx: Opcode execution context
959 b95479a5 Michael Hanselmann
    @rtype: bool
960 b95479a5 Michael Hanselmann
    @return: Whether opcode will be re-scheduled by dependency tracker
961 b95479a5 Michael Hanselmann

962 b95479a5 Michael Hanselmann
    """
963 b95479a5 Michael Hanselmann
    op = opctx.op
964 b95479a5 Michael Hanselmann
965 b95479a5 Michael Hanselmann
    result = False
966 b95479a5 Michael Hanselmann
967 b95479a5 Michael Hanselmann
    while opctx.jobdeps:
968 b95479a5 Michael Hanselmann
      (dep_job_id, dep_status) = opctx.jobdeps[0]
969 b95479a5 Michael Hanselmann
970 b95479a5 Michael Hanselmann
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
971 b95479a5 Michael Hanselmann
                                                          dep_status)
972 b95479a5 Michael Hanselmann
      assert ht.TNonEmptyString(depmsg), "No dependency message"
973 b95479a5 Michael Hanselmann
974 b95479a5 Michael Hanselmann
      logging.info("%s: %s", opctx.log_prefix, depmsg)
975 b95479a5 Michael Hanselmann
976 b95479a5 Michael Hanselmann
      if depresult == _JobDependencyManager.CONTINUE:
977 b95479a5 Michael Hanselmann
        # Remove dependency and continue
978 b95479a5 Michael Hanselmann
        opctx.jobdeps.pop(0)
979 b95479a5 Michael Hanselmann
980 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.WAIT:
981 b95479a5 Michael Hanselmann
        # Need to wait for notification, dependency tracker will re-add job
982 b95479a5 Michael Hanselmann
        # to workerpool
983 b95479a5 Michael Hanselmann
        result = True
984 b95479a5 Michael Hanselmann
        break
985 b95479a5 Michael Hanselmann
986 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.CANCEL:
987 b95479a5 Michael Hanselmann
        # Job was cancelled, cancel this job as well
988 b95479a5 Michael Hanselmann
        job.Cancel()
989 b95479a5 Michael Hanselmann
        assert op.status == constants.OP_STATUS_CANCELING
990 b95479a5 Michael Hanselmann
        break
991 b95479a5 Michael Hanselmann
992 b95479a5 Michael Hanselmann
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
993 b95479a5 Michael Hanselmann
                         _JobDependencyManager.ERROR):
994 b95479a5 Michael Hanselmann
        # Job failed or there was an error, this job must fail
995 b95479a5 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
996 b95479a5 Michael Hanselmann
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
997 b95479a5 Michael Hanselmann
        break
998 b95479a5 Michael Hanselmann
999 b95479a5 Michael Hanselmann
      else:
1000 b95479a5 Michael Hanselmann
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1001 b95479a5 Michael Hanselmann
                                     depresult)
1002 b95479a5 Michael Hanselmann
1003 b95479a5 Michael Hanselmann
    return result
1004 b95479a5 Michael Hanselmann
1005 b80cc518 Michael Hanselmann
  def _ExecOpCodeUnlocked(self, opctx):
1006 be760ba8 Michael Hanselmann
    """Processes one opcode and returns the result.
1007 be760ba8 Michael Hanselmann

1008 be760ba8 Michael Hanselmann
    """
1009 b80cc518 Michael Hanselmann
    op = opctx.op
1010 b80cc518 Michael Hanselmann
1011 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1012 be760ba8 Michael Hanselmann
1013 26d3fd2f Michael Hanselmann
    timeout = opctx.GetNextLockTimeout()
1014 26d3fd2f Michael Hanselmann
1015 be760ba8 Michael Hanselmann
    try:
1016 be760ba8 Michael Hanselmann
      # Make sure not to hold queue lock while calling ExecOpCode
1017 be760ba8 Michael Hanselmann
      result = self.opexec_fn(op.input,
1018 26d3fd2f Michael Hanselmann
                              _OpExecCallbacks(self.queue, self.job, op),
1019 f23db633 Michael Hanselmann
                              timeout=timeout, priority=op.priority)
1020 26d3fd2f Michael Hanselmann
    except mcpu.LockAcquireTimeout:
1021 26d3fd2f Michael Hanselmann
      assert timeout is not None, "Received timeout for blocking acquire"
1022 26d3fd2f Michael Hanselmann
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1023 9e49dfc5 Michael Hanselmann
1024 47099cd1 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_WAITING,
1025 9e49dfc5 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1026 9e49dfc5 Michael Hanselmann
1027 9e49dfc5 Michael Hanselmann
      # Was job cancelled while we were waiting for the lock?
1028 9e49dfc5 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELING:
1029 9e49dfc5 Michael Hanselmann
        return (constants.OP_STATUS_CANCELING, None)
1030 9e49dfc5 Michael Hanselmann
1031 5fd6b694 Michael Hanselmann
      # Stay in waitlock while trying to re-acquire lock
1032 47099cd1 Michael Hanselmann
      return (constants.OP_STATUS_WAITING, None)
1033 be760ba8 Michael Hanselmann
    except CancelJob:
1034 b80cc518 Michael Hanselmann
      logging.exception("%s: Canceling job", opctx.log_prefix)
1035 be760ba8 Michael Hanselmann
      assert op.status == constants.OP_STATUS_CANCELING
1036 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_CANCELING, None)
1037 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1038 b80cc518 Michael Hanselmann
      logging.exception("%s: Caught exception in %s",
1039 b80cc518 Michael Hanselmann
                        opctx.log_prefix, opctx.summary)
1040 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1041 be760ba8 Michael Hanselmann
    else:
1042 b80cc518 Michael Hanselmann
      logging.debug("%s: %s successful",
1043 b80cc518 Michael Hanselmann
                    opctx.log_prefix, opctx.summary)
1044 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_SUCCESS, result)
1045 be760ba8 Michael Hanselmann
1046 26d3fd2f Michael Hanselmann
  def __call__(self, _nextop_fn=None):
1047 be760ba8 Michael Hanselmann
    """Continues execution of a job.
1048 be760ba8 Michael Hanselmann

1049 26d3fd2f Michael Hanselmann
    @param _nextop_fn: Callback function for tests
1050 75d81fc8 Michael Hanselmann
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1051 75d81fc8 Michael Hanselmann
      be deferred and C{WAITDEP} if the dependency manager
1052 75d81fc8 Michael Hanselmann
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1053 be760ba8 Michael Hanselmann

1054 be760ba8 Michael Hanselmann
    """
1055 be760ba8 Michael Hanselmann
    queue = self.queue
1056 be760ba8 Michael Hanselmann
    job = self.job
1057 be760ba8 Michael Hanselmann
1058 be760ba8 Michael Hanselmann
    logging.debug("Processing job %s", job.id)
1059 be760ba8 Michael Hanselmann
1060 be760ba8 Michael Hanselmann
    queue.acquire(shared=1)
1061 be760ba8 Michael Hanselmann
    try:
1062 be760ba8 Michael Hanselmann
      opcount = len(job.ops)
1063 be760ba8 Michael Hanselmann
1064 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Expected writable job"
1065 c0f6d0d8 Michael Hanselmann
1066 66bd7445 Michael Hanselmann
      # Don't do anything for finalized jobs
1067 66bd7445 Michael Hanselmann
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1068 75d81fc8 Michael Hanselmann
        return self.FINISHED
1069 66bd7445 Michael Hanselmann
1070 26d3fd2f Michael Hanselmann
      # Is a previous opcode still pending?
1071 26d3fd2f Michael Hanselmann
      if job.cur_opctx:
1072 26d3fd2f Michael Hanselmann
        opctx = job.cur_opctx
1073 5fd6b694 Michael Hanselmann
        job.cur_opctx = None
1074 26d3fd2f Michael Hanselmann
      else:
1075 26d3fd2f Michael Hanselmann
        if __debug__ and _nextop_fn:
1076 26d3fd2f Michael Hanselmann
          _nextop_fn()
1077 26d3fd2f Michael Hanselmann
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1078 26d3fd2f Michael Hanselmann
1079 b80cc518 Michael Hanselmann
      op = opctx.op
1080 be760ba8 Michael Hanselmann
1081 be760ba8 Michael Hanselmann
      # Consistency check
1082 be760ba8 Michael Hanselmann
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1083 66bd7445 Michael Hanselmann
                                     constants.OP_STATUS_CANCELING)
1084 5fd6b694 Michael Hanselmann
                        for i in job.ops[opctx.index + 1:])
1085 be760ba8 Michael Hanselmann
1086 be760ba8 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_QUEUED,
1087 47099cd1 Michael Hanselmann
                           constants.OP_STATUS_WAITING,
1088 66bd7445 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1089 be760ba8 Michael Hanselmann
1090 26d3fd2f Michael Hanselmann
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1091 26d3fd2f Michael Hanselmann
              op.priority >= constants.OP_PRIO_HIGHEST)
1092 26d3fd2f Michael Hanselmann
1093 b95479a5 Michael Hanselmann
      waitjob = None
1094 b95479a5 Michael Hanselmann
1095 66bd7445 Michael Hanselmann
      if op.status != constants.OP_STATUS_CANCELING:
1096 30c945d0 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
1097 47099cd1 Michael Hanselmann
                             constants.OP_STATUS_WAITING)
1098 30c945d0 Michael Hanselmann
1099 be760ba8 Michael Hanselmann
        # Prepare to start opcode
1100 5fd6b694 Michael Hanselmann
        if self._MarkWaitlock(job, op):
1101 5fd6b694 Michael Hanselmann
          # Write to disk
1102 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1103 be760ba8 Michael Hanselmann
1104 47099cd1 Michael Hanselmann
        assert op.status == constants.OP_STATUS_WAITING
1105 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1106 5fd6b694 Michael Hanselmann
        assert job.start_timestamp and op.start_timestamp
1107 b95479a5 Michael Hanselmann
        assert waitjob is None
1108 b95479a5 Michael Hanselmann
1109 b95479a5 Michael Hanselmann
        # Check if waiting for a job is necessary
1110 b95479a5 Michael Hanselmann
        waitjob = self._CheckDependencies(queue, job, opctx)
1111 be760ba8 Michael Hanselmann
1112 47099cd1 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_WAITING,
1113 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_CANCELING,
1114 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_ERROR)
1115 be760ba8 Michael Hanselmann
1116 b95479a5 Michael Hanselmann
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1117 b95479a5 Michael Hanselmann
                                         constants.OP_STATUS_ERROR)):
1118 b95479a5 Michael Hanselmann
          logging.info("%s: opcode %s waiting for locks",
1119 b95479a5 Michael Hanselmann
                       opctx.log_prefix, opctx.summary)
1120 be760ba8 Michael Hanselmann
1121 b95479a5 Michael Hanselmann
          assert not opctx.jobdeps, "Not all dependencies were removed"
1122 b95479a5 Michael Hanselmann
1123 b95479a5 Michael Hanselmann
          queue.release()
1124 b95479a5 Michael Hanselmann
          try:
1125 b95479a5 Michael Hanselmann
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1126 b95479a5 Michael Hanselmann
          finally:
1127 b95479a5 Michael Hanselmann
            queue.acquire(shared=1)
1128 b95479a5 Michael Hanselmann
1129 b95479a5 Michael Hanselmann
          op.status = op_status
1130 b95479a5 Michael Hanselmann
          op.result = op_result
1131 b95479a5 Michael Hanselmann
1132 b95479a5 Michael Hanselmann
          assert not waitjob
1133 be760ba8 Michael Hanselmann
1134 47099cd1 Michael Hanselmann
        if op.status == constants.OP_STATUS_WAITING:
1135 26d3fd2f Michael Hanselmann
          # Couldn't get locks in time
1136 26d3fd2f Michael Hanselmann
          assert not op.end_timestamp
1137 be760ba8 Michael Hanselmann
        else:
1138 26d3fd2f Michael Hanselmann
          # Finalize opcode
1139 26d3fd2f Michael Hanselmann
          op.end_timestamp = TimeStampNow()
1140 be760ba8 Michael Hanselmann
1141 26d3fd2f Michael Hanselmann
          if op.status == constants.OP_STATUS_CANCELING:
1142 26d3fd2f Michael Hanselmann
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1143 26d3fd2f Michael Hanselmann
                                  for i in job.ops[opctx.index:])
1144 26d3fd2f Michael Hanselmann
          else:
1145 26d3fd2f Michael Hanselmann
            assert op.status in constants.OPS_FINALIZED
1146 be760ba8 Michael Hanselmann
1147 47099cd1 Michael Hanselmann
      if op.status == constants.OP_STATUS_WAITING or waitjob:
1148 be760ba8 Michael Hanselmann
        finalize = False
1149 be760ba8 Michael Hanselmann
1150 b95479a5 Michael Hanselmann
        if not waitjob and opctx.CheckPriorityIncrease():
1151 5fd6b694 Michael Hanselmann
          # Priority was changed, need to update on-disk file
1152 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1153 be760ba8 Michael Hanselmann
1154 26d3fd2f Michael Hanselmann
        # Keep around for another round
1155 26d3fd2f Michael Hanselmann
        job.cur_opctx = opctx
1156 be760ba8 Michael Hanselmann
1157 26d3fd2f Michael Hanselmann
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1158 26d3fd2f Michael Hanselmann
                op.priority >= constants.OP_PRIO_HIGHEST)
1159 be760ba8 Michael Hanselmann
1160 26d3fd2f Michael Hanselmann
        # In no case must the status be finalized here
1161 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1162 be760ba8 Michael Hanselmann
1163 be760ba8 Michael Hanselmann
      else:
1164 26d3fd2f Michael Hanselmann
        # Ensure all opcodes so far have been successful
1165 26d3fd2f Michael Hanselmann
        assert (opctx.index == 0 or
1166 26d3fd2f Michael Hanselmann
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1167 26d3fd2f Michael Hanselmann
                           for i in job.ops[:opctx.index]))
1168 26d3fd2f Michael Hanselmann
1169 26d3fd2f Michael Hanselmann
        # Reset context
1170 26d3fd2f Michael Hanselmann
        job.cur_opctx = None
1171 26d3fd2f Michael Hanselmann
1172 26d3fd2f Michael Hanselmann
        if op.status == constants.OP_STATUS_SUCCESS:
1173 26d3fd2f Michael Hanselmann
          finalize = False
1174 26d3fd2f Michael Hanselmann
1175 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_ERROR:
1176 26d3fd2f Michael Hanselmann
          # Ensure failed opcode has an exception as its result
1177 26d3fd2f Michael Hanselmann
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1178 26d3fd2f Michael Hanselmann
1179 26d3fd2f Michael Hanselmann
          to_encode = errors.OpExecError("Preceding opcode failed")
1180 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1181 26d3fd2f Michael Hanselmann
                                _EncodeOpError(to_encode))
1182 26d3fd2f Michael Hanselmann
          finalize = True
1183 be760ba8 Michael Hanselmann
1184 26d3fd2f Michael Hanselmann
          # Consistency check
1185 26d3fd2f Michael Hanselmann
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1186 26d3fd2f Michael Hanselmann
                            errors.GetEncodedError(i.result)
1187 26d3fd2f Michael Hanselmann
                            for i in job.ops[opctx.index:])
1188 be760ba8 Michael Hanselmann
1189 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELING:
1190 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1191 26d3fd2f Michael Hanselmann
                                "Job canceled by request")
1192 26d3fd2f Michael Hanselmann
          finalize = True
1193 26d3fd2f Michael Hanselmann
1194 26d3fd2f Michael Hanselmann
        else:
1195 26d3fd2f Michael Hanselmann
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1196 26d3fd2f Michael Hanselmann
1197 66bd7445 Michael Hanselmann
        if opctx.index == (opcount - 1):
1198 66bd7445 Michael Hanselmann
          # Finalize on last opcode
1199 66bd7445 Michael Hanselmann
          finalize = True
1200 66bd7445 Michael Hanselmann
1201 66bd7445 Michael Hanselmann
        if finalize:
1202 26d3fd2f Michael Hanselmann
          # All opcodes have been run, finalize job
1203 66bd7445 Michael Hanselmann
          job.Finalize()
1204 26d3fd2f Michael Hanselmann
1205 26d3fd2f Michael Hanselmann
        # Write to disk. If the job status is final, this is the final write
1206 26d3fd2f Michael Hanselmann
        # allowed. Once the file has been written, it can be archived anytime.
1207 26d3fd2f Michael Hanselmann
        queue.UpdateJobUnlocked(job)
1208 be760ba8 Michael Hanselmann
1209 b95479a5 Michael Hanselmann
        assert not waitjob
1210 b95479a5 Michael Hanselmann
1211 66bd7445 Michael Hanselmann
        if finalize:
1212 26d3fd2f Michael Hanselmann
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1213 75d81fc8 Michael Hanselmann
          return self.FINISHED
1214 be760ba8 Michael Hanselmann
1215 b95479a5 Michael Hanselmann
      assert not waitjob or queue.depmgr.JobWaiting(job)
1216 b95479a5 Michael Hanselmann
1217 75d81fc8 Michael Hanselmann
      if waitjob:
1218 75d81fc8 Michael Hanselmann
        return self.WAITDEP
1219 75d81fc8 Michael Hanselmann
      else:
1220 75d81fc8 Michael Hanselmann
        return self.DEFER
1221 be760ba8 Michael Hanselmann
    finally:
1222 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Job became read-only while being processed"
1223 be760ba8 Michael Hanselmann
      queue.release()
1224 be760ba8 Michael Hanselmann
1225 be760ba8 Michael Hanselmann
1226 df5a5730 Michael Hanselmann
def _EvaluateJobProcessorResult(depmgr, job, result):
1227 df5a5730 Michael Hanselmann
  """Looks at a result from L{_JobProcessor} for a job.
1228 df5a5730 Michael Hanselmann

1229 df5a5730 Michael Hanselmann
  To be used in a L{_JobQueueWorker}.
1230 df5a5730 Michael Hanselmann

1231 df5a5730 Michael Hanselmann
  """
1232 df5a5730 Michael Hanselmann
  if result == _JobProcessor.FINISHED:
1233 df5a5730 Michael Hanselmann
    # Notify waiting jobs
1234 df5a5730 Michael Hanselmann
    depmgr.NotifyWaiters(job.id)
1235 df5a5730 Michael Hanselmann
1236 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.DEFER:
1237 df5a5730 Michael Hanselmann
    # Schedule again
1238 df5a5730 Michael Hanselmann
    raise workerpool.DeferTask(priority=job.CalcPriority())
1239 df5a5730 Michael Hanselmann
1240 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.WAITDEP:
1241 df5a5730 Michael Hanselmann
    # No-op, dependency manager will re-schedule
1242 df5a5730 Michael Hanselmann
    pass
1243 df5a5730 Michael Hanselmann
1244 df5a5730 Michael Hanselmann
  else:
1245 df5a5730 Michael Hanselmann
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1246 df5a5730 Michael Hanselmann
                                 (result, ))
1247 df5a5730 Michael Hanselmann
1248 df5a5730 Michael Hanselmann
1249 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
1250 031a3e57 Michael Hanselmann
  """The actual job workers.
1251 031a3e57 Michael Hanselmann

1252 031a3e57 Michael Hanselmann
  """
1253 b459a848 Andrea Spadaccini
  def RunTask(self, job): # pylint: disable=W0221
1254 e2715f69 Michael Hanselmann
    """Job executor.
1255 e2715f69 Michael Hanselmann

1256 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1257 ea03467c Iustin Pop
    @param job: the job to be processed
1258 ea03467c Iustin Pop

1259 e2715f69 Michael Hanselmann
    """
1260 f8a4adfa Michael Hanselmann
    assert job.writable, "Expected writable job"
1261 f8a4adfa Michael Hanselmann
1262 b95479a5 Michael Hanselmann
    # Ensure only one worker is active on a single job. If a job registers for
1263 b95479a5 Michael Hanselmann
    # a dependency job, and the other job notifies before the first worker is
1264 b95479a5 Michael Hanselmann
    # done, the job can end up in the tasklist more than once.
1265 b95479a5 Michael Hanselmann
    job.processor_lock.acquire()
1266 b95479a5 Michael Hanselmann
    try:
1267 b95479a5 Michael Hanselmann
      return self._RunTaskInner(job)
1268 b95479a5 Michael Hanselmann
    finally:
1269 b95479a5 Michael Hanselmann
      job.processor_lock.release()
1270 b95479a5 Michael Hanselmann
1271 b95479a5 Michael Hanselmann
  def _RunTaskInner(self, job):
1272 b95479a5 Michael Hanselmann
    """Executes a job.
1273 b95479a5 Michael Hanselmann

1274 b95479a5 Michael Hanselmann
    Must be called with per-job lock acquired.
1275 b95479a5 Michael Hanselmann

1276 b95479a5 Michael Hanselmann
    """
1277 be760ba8 Michael Hanselmann
    queue = job.queue
1278 be760ba8 Michael Hanselmann
    assert queue == self.pool.queue
1279 be760ba8 Michael Hanselmann
1280 0aeeb6e3 Michael Hanselmann
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1281 0aeeb6e3 Michael Hanselmann
    setname_fn(None)
1282 daba67c7 Michael Hanselmann
1283 be760ba8 Michael Hanselmann
    proc = mcpu.Processor(queue.context, job.id)
1284 be760ba8 Michael Hanselmann
1285 0aeeb6e3 Michael Hanselmann
    # Create wrapper for setting thread name
1286 0aeeb6e3 Michael Hanselmann
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1287 0aeeb6e3 Michael Hanselmann
                                    proc.ExecOpCode)
1288 0aeeb6e3 Michael Hanselmann
1289 df5a5730 Michael Hanselmann
    _EvaluateJobProcessorResult(queue.depmgr, job,
1290 df5a5730 Michael Hanselmann
                                _JobProcessor(queue, wrap_execop_fn, job)())
1291 75d81fc8 Michael Hanselmann
1292 0aeeb6e3 Michael Hanselmann
  @staticmethod
1293 0aeeb6e3 Michael Hanselmann
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1294 0aeeb6e3 Michael Hanselmann
    """Updates the worker thread name to include a short summary of the opcode.
1295 0aeeb6e3 Michael Hanselmann

1296 0aeeb6e3 Michael Hanselmann
    @param setname_fn: Callable setting worker thread name
1297 0aeeb6e3 Michael Hanselmann
    @param execop_fn: Callable for executing opcode (usually
1298 0aeeb6e3 Michael Hanselmann
                      L{mcpu.Processor.ExecOpCode})
1299 0aeeb6e3 Michael Hanselmann

1300 0aeeb6e3 Michael Hanselmann
    """
1301 0aeeb6e3 Michael Hanselmann
    setname_fn(op)
1302 0aeeb6e3 Michael Hanselmann
    try:
1303 0aeeb6e3 Michael Hanselmann
      return execop_fn(op, *args, **kwargs)
1304 0aeeb6e3 Michael Hanselmann
    finally:
1305 0aeeb6e3 Michael Hanselmann
      setname_fn(None)
1306 0aeeb6e3 Michael Hanselmann
1307 0aeeb6e3 Michael Hanselmann
  @staticmethod
1308 0aeeb6e3 Michael Hanselmann
  def _GetWorkerName(job, op):
1309 0aeeb6e3 Michael Hanselmann
    """Sets the worker thread name.
1310 0aeeb6e3 Michael Hanselmann

1311 0aeeb6e3 Michael Hanselmann
    @type job: L{_QueuedJob}
1312 0aeeb6e3 Michael Hanselmann
    @type op: L{opcodes.OpCode}
1313 0aeeb6e3 Michael Hanselmann

1314 0aeeb6e3 Michael Hanselmann
    """
1315 0aeeb6e3 Michael Hanselmann
    parts = ["Job%s" % job.id]
1316 0aeeb6e3 Michael Hanselmann
1317 0aeeb6e3 Michael Hanselmann
    if op:
1318 0aeeb6e3 Michael Hanselmann
      parts.append(op.TinySummary())
1319 0aeeb6e3 Michael Hanselmann
1320 0aeeb6e3 Michael Hanselmann
    return "/".join(parts)
1321 0aeeb6e3 Michael Hanselmann
1322 e2715f69 Michael Hanselmann
1323 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
1324 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
1325 ea03467c Iustin Pop

1326 ea03467c Iustin Pop
  """
1327 5bdce580 Michael Hanselmann
  def __init__(self, queue):
1328 0aeeb6e3 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("Jq",
1329 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
1330 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
1331 5bdce580 Michael Hanselmann
    self.queue = queue
1332 e2715f69 Michael Hanselmann
1333 e2715f69 Michael Hanselmann
1334 b95479a5 Michael Hanselmann
class _JobDependencyManager:
1335 b95479a5 Michael Hanselmann
  """Keeps track of job dependencies.
1336 b95479a5 Michael Hanselmann

1337 b95479a5 Michael Hanselmann
  """
1338 b95479a5 Michael Hanselmann
  (WAIT,
1339 b95479a5 Michael Hanselmann
   ERROR,
1340 b95479a5 Michael Hanselmann
   CANCEL,
1341 b95479a5 Michael Hanselmann
   CONTINUE,
1342 b95479a5 Michael Hanselmann
   WRONGSTATUS) = range(1, 6)
1343 b95479a5 Michael Hanselmann
1344 b95479a5 Michael Hanselmann
  def __init__(self, getstatus_fn, enqueue_fn):
1345 b95479a5 Michael Hanselmann
    """Initializes this class.
1346 b95479a5 Michael Hanselmann

1347 b95479a5 Michael Hanselmann
    """
1348 b95479a5 Michael Hanselmann
    self._getstatus_fn = getstatus_fn
1349 b95479a5 Michael Hanselmann
    self._enqueue_fn = enqueue_fn
1350 b95479a5 Michael Hanselmann
1351 b95479a5 Michael Hanselmann
    self._waiters = {}
1352 b95479a5 Michael Hanselmann
    self._lock = locking.SharedLock("JobDepMgr")
1353 b95479a5 Michael Hanselmann
1354 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1355 b459a848 Andrea Spadaccini
  def GetLockInfo(self, requested): # pylint: disable=W0613
1356 fcb21ad7 Michael Hanselmann
    """Retrieves information about waiting jobs.
1357 fcb21ad7 Michael Hanselmann

1358 fcb21ad7 Michael Hanselmann
    @type requested: set
1359 fcb21ad7 Michael Hanselmann
    @param requested: Requested information, see C{query.LQ_*}
1360 fcb21ad7 Michael Hanselmann

1361 fcb21ad7 Michael Hanselmann
    """
1362 fcb21ad7 Michael Hanselmann
    # No need to sort here, that's being done by the lock manager and query
1363 fcb21ad7 Michael Hanselmann
    # library. There are no priorities for notifying jobs, hence all show up as
1364 fcb21ad7 Michael Hanselmann
    # one item under "pending".
1365 fcb21ad7 Michael Hanselmann
    return [("job/%s" % job_id, None, None,
1366 fcb21ad7 Michael Hanselmann
             [("job", [job.id for job in waiters])])
1367 fcb21ad7 Michael Hanselmann
            for job_id, waiters in self._waiters.items()
1368 fcb21ad7 Michael Hanselmann
            if waiters]
1369 fcb21ad7 Michael Hanselmann
1370 fcb21ad7 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1371 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
1372 b95479a5 Michael Hanselmann
    """Checks if a job is waiting.
1373 b95479a5 Michael Hanselmann

1374 b95479a5 Michael Hanselmann
    """
1375 b95479a5 Michael Hanselmann
    return compat.any(job in jobs
1376 b95479a5 Michael Hanselmann
                      for jobs in self._waiters.values())
1377 b95479a5 Michael Hanselmann
1378 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1379 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1380 b95479a5 Michael Hanselmann
    """Checks if a dependency job has the requested status.
1381 b95479a5 Michael Hanselmann

1382 b95479a5 Michael Hanselmann
    If the other job is not yet in a finalized status, the calling job will be
1383 b95479a5 Michael Hanselmann
    notified (re-added to the workerpool) at a later point.
1384 b95479a5 Michael Hanselmann

1385 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1386 b95479a5 Michael Hanselmann
    @param job: Job object
1387 b95479a5 Michael Hanselmann
    @type dep_job_id: string
1388 b95479a5 Michael Hanselmann
    @param dep_job_id: ID of dependency job
1389 b95479a5 Michael Hanselmann
    @type dep_status: list
1390 b95479a5 Michael Hanselmann
    @param dep_status: Required status
1391 b95479a5 Michael Hanselmann

1392 b95479a5 Michael Hanselmann
    """
1393 b95479a5 Michael Hanselmann
    assert ht.TString(job.id)
1394 b95479a5 Michael Hanselmann
    assert ht.TString(dep_job_id)
1395 b95479a5 Michael Hanselmann
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1396 b95479a5 Michael Hanselmann
1397 b95479a5 Michael Hanselmann
    if job.id == dep_job_id:
1398 b95479a5 Michael Hanselmann
      return (self.ERROR, "Job can't depend on itself")
1399 b95479a5 Michael Hanselmann
1400 b95479a5 Michael Hanselmann
    # Get status of dependency job
1401 b95479a5 Michael Hanselmann
    try:
1402 b95479a5 Michael Hanselmann
      status = self._getstatus_fn(dep_job_id)
1403 b95479a5 Michael Hanselmann
    except errors.JobLost, err:
1404 b95479a5 Michael Hanselmann
      return (self.ERROR, "Dependency error: %s" % err)
1405 b95479a5 Michael Hanselmann
1406 b95479a5 Michael Hanselmann
    assert status in constants.JOB_STATUS_ALL
1407 b95479a5 Michael Hanselmann
1408 b95479a5 Michael Hanselmann
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1409 b95479a5 Michael Hanselmann
1410 b95479a5 Michael Hanselmann
    if status not in constants.JOBS_FINALIZED:
1411 b95479a5 Michael Hanselmann
      # Register for notification and wait for job to finish
1412 b95479a5 Michael Hanselmann
      job_id_waiters.add(job)
1413 b95479a5 Michael Hanselmann
      return (self.WAIT,
1414 b95479a5 Michael Hanselmann
              "Need to wait for job %s, wanted status '%s'" %
1415 b95479a5 Michael Hanselmann
              (dep_job_id, dep_status))
1416 b95479a5 Michael Hanselmann
1417 b95479a5 Michael Hanselmann
    # Remove from waiters list
1418 b95479a5 Michael Hanselmann
    if job in job_id_waiters:
1419 b95479a5 Michael Hanselmann
      job_id_waiters.remove(job)
1420 b95479a5 Michael Hanselmann
1421 b95479a5 Michael Hanselmann
    if (status == constants.JOB_STATUS_CANCELED and
1422 b95479a5 Michael Hanselmann
        constants.JOB_STATUS_CANCELED not in dep_status):
1423 b95479a5 Michael Hanselmann
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1424 b95479a5 Michael Hanselmann
1425 b95479a5 Michael Hanselmann
    elif not dep_status or status in dep_status:
1426 b95479a5 Michael Hanselmann
      return (self.CONTINUE,
1427 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s'" %
1428 b95479a5 Michael Hanselmann
              (dep_job_id, status))
1429 b95479a5 Michael Hanselmann
1430 b95479a5 Michael Hanselmann
    else:
1431 b95479a5 Michael Hanselmann
      return (self.WRONGSTATUS,
1432 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s',"
1433 b95479a5 Michael Hanselmann
              " not one of '%s' as required" %
1434 b95479a5 Michael Hanselmann
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1435 b95479a5 Michael Hanselmann
1436 37d76f1e Michael Hanselmann
  def _RemoveEmptyWaitersUnlocked(self):
1437 37d76f1e Michael Hanselmann
    """Remove all jobs without actual waiters.
1438 37d76f1e Michael Hanselmann

1439 37d76f1e Michael Hanselmann
    """
1440 37d76f1e Michael Hanselmann
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1441 37d76f1e Michael Hanselmann
                   if not waiters]:
1442 37d76f1e Michael Hanselmann
      del self._waiters[job_id]
1443 37d76f1e Michael Hanselmann
1444 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
1445 b95479a5 Michael Hanselmann
    """Notifies all jobs waiting for a certain job ID.
1446 b95479a5 Michael Hanselmann

1447 1316ebc2 Michael Hanselmann
    @attention: Do not call until L{CheckAndRegister} returned a status other
1448 1316ebc2 Michael Hanselmann
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1449 b95479a5 Michael Hanselmann
    @type job_id: string
1450 b95479a5 Michael Hanselmann
    @param job_id: Job ID
1451 b95479a5 Michael Hanselmann

1452 b95479a5 Michael Hanselmann
    """
1453 b95479a5 Michael Hanselmann
    assert ht.TString(job_id)
1454 b95479a5 Michael Hanselmann
1455 37d76f1e Michael Hanselmann
    self._lock.acquire()
1456 37d76f1e Michael Hanselmann
    try:
1457 37d76f1e Michael Hanselmann
      self._RemoveEmptyWaitersUnlocked()
1458 37d76f1e Michael Hanselmann
1459 37d76f1e Michael Hanselmann
      jobs = self._waiters.pop(job_id, None)
1460 37d76f1e Michael Hanselmann
    finally:
1461 37d76f1e Michael Hanselmann
      self._lock.release()
1462 37d76f1e Michael Hanselmann
1463 b95479a5 Michael Hanselmann
    if jobs:
1464 b95479a5 Michael Hanselmann
      # Re-add jobs to workerpool
1465 b95479a5 Michael Hanselmann
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1466 b95479a5 Michael Hanselmann
                    len(jobs), job_id)
1467 b95479a5 Michael Hanselmann
      self._enqueue_fn(jobs)
1468 b95479a5 Michael Hanselmann
1469 b95479a5 Michael Hanselmann
1470 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
1471 6c881c52 Iustin Pop
  """Decorator for "public" functions.
1472 ea03467c Iustin Pop

1473 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
1474 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
1475 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
1476 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
1477 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
1478 ea03467c Iustin Pop

1479 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
1480 f1da30e6 Michael Hanselmann

1481 6c881c52 Iustin Pop
  Example::
1482 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
1483 6c881c52 Iustin Pop
    @_RequireOpenQueue
1484 6c881c52 Iustin Pop
    def Example(self):
1485 6c881c52 Iustin Pop
      pass
1486 db37da70 Michael Hanselmann

1487 6c881c52 Iustin Pop
  """
1488 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
1489 b459a848 Andrea Spadaccini
    # pylint: disable=W0212
1490 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
1491 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
1492 6c881c52 Iustin Pop
  return wrapper
1493 db37da70 Michael Hanselmann
1494 db37da70 Michael Hanselmann
1495 c8d0be94 Michael Hanselmann
def _RequireNonDrainedQueue(fn):
1496 c8d0be94 Michael Hanselmann
  """Decorator checking for a non-drained queue.
1497 c8d0be94 Michael Hanselmann

1498 c8d0be94 Michael Hanselmann
  To be used with functions submitting new jobs.
1499 c8d0be94 Michael Hanselmann

1500 c8d0be94 Michael Hanselmann
  """
1501 c8d0be94 Michael Hanselmann
  def wrapper(self, *args, **kwargs):
1502 c8d0be94 Michael Hanselmann
    """Wrapper function.
1503 c8d0be94 Michael Hanselmann

1504 c8d0be94 Michael Hanselmann
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1505 c8d0be94 Michael Hanselmann

1506 c8d0be94 Michael Hanselmann
    """
1507 c8d0be94 Michael Hanselmann
    # Ok when sharing the big job queue lock, as the drain file is created when
1508 c8d0be94 Michael Hanselmann
    # the lock is exclusive.
1509 c8d0be94 Michael Hanselmann
    # Needs access to protected member, pylint: disable=W0212
1510 c8d0be94 Michael Hanselmann
    if self._drained:
1511 c8d0be94 Michael Hanselmann
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1512 6d5ea385 Michael Hanselmann
1513 6d5ea385 Michael Hanselmann
    if not self._accepting_jobs:
1514 6d5ea385 Michael Hanselmann
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1515 6d5ea385 Michael Hanselmann
1516 c8d0be94 Michael Hanselmann
    return fn(self, *args, **kwargs)
1517 c8d0be94 Michael Hanselmann
  return wrapper
1518 c8d0be94 Michael Hanselmann
1519 c8d0be94 Michael Hanselmann
1520 6c881c52 Iustin Pop
class JobQueue(object):
1521 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
1522 db37da70 Michael Hanselmann

1523 6c881c52 Iustin Pop
  """
1524 85f03e0d Michael Hanselmann
  def __init__(self, context):
1525 ea03467c Iustin Pop
    """Constructor for JobQueue.
1526 ea03467c Iustin Pop

1527 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
1528 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
1529 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
1530 ea03467c Iustin Pop
    running).
1531 ea03467c Iustin Pop

1532 ea03467c Iustin Pop
    @type context: GanetiContext
1533 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
1534 ea03467c Iustin Pop
        data and other ganeti objects
1535 ea03467c Iustin Pop

1536 ea03467c Iustin Pop
    """
1537 5bdce580 Michael Hanselmann
    self.context = context
1538 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
1539 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
1540 f1da30e6 Michael Hanselmann
1541 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
1542 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
1543 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
1544 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
1545 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
1546 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
1547 ebb80afa Guido Trotter
1548 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
1549 ebb80afa Guido Trotter
    self.release = self._lock.release
1550 85f03e0d Michael Hanselmann
1551 6d5ea385 Michael Hanselmann
    # Accept jobs by default
1552 6d5ea385 Michael Hanselmann
    self._accepting_jobs = True
1553 6d5ea385 Michael Hanselmann
1554 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
1555 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
1556 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1557 f1da30e6 Michael Hanselmann
1558 04ab05ce Michael Hanselmann
    # Read serial file
1559 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
1560 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
1561 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
1562 c4beba1c Iustin Pop
1563 23752136 Michael Hanselmann
    # Get initial list of nodes
1564 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
1565 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
1566 59303563 Iustin Pop
                       if n.master_candidate)
1567 8e00939c Michael Hanselmann
1568 8e00939c Michael Hanselmann
    # Remove master node
1569 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
1570 23752136 Michael Hanselmann
1571 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
1572 23752136 Michael Hanselmann
1573 6d5ea385 Michael Hanselmann
    self._queue_size = None
1574 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1575 6d5ea385 Michael Hanselmann
    assert ht.TInt(self._queue_size)
1576 ff699aa9 Michael Hanselmann
    self._drained = jstore.CheckDrainFlag()
1577 20571a26 Guido Trotter
1578 b95479a5 Michael Hanselmann
    # Job dependencies
1579 b95479a5 Michael Hanselmann
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1580 b95479a5 Michael Hanselmann
                                        self._EnqueueJobs)
1581 fcb21ad7 Michael Hanselmann
    self.context.glm.AddToLockMonitor(self.depmgr)
1582 b95479a5 Michael Hanselmann
1583 85f03e0d Michael Hanselmann
    # Setup worker pool
1584 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
1585 85f03e0d Michael Hanselmann
    try:
1586 de9d02c7 Michael Hanselmann
      self._InspectQueue()
1587 de9d02c7 Michael Hanselmann
    except:
1588 de9d02c7 Michael Hanselmann
      self._wpool.TerminateWorkers()
1589 de9d02c7 Michael Hanselmann
      raise
1590 711b5124 Michael Hanselmann
1591 de9d02c7 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1592 de9d02c7 Michael Hanselmann
  @_RequireOpenQueue
1593 de9d02c7 Michael Hanselmann
  def _InspectQueue(self):
1594 de9d02c7 Michael Hanselmann
    """Loads the whole job queue and resumes unfinished jobs.
1595 de9d02c7 Michael Hanselmann

1596 de9d02c7 Michael Hanselmann
    This function needs the lock here because WorkerPool.AddTask() may start a
1597 de9d02c7 Michael Hanselmann
    job while we're still doing our work.
1598 711b5124 Michael Hanselmann

1599 de9d02c7 Michael Hanselmann
    """
1600 de9d02c7 Michael Hanselmann
    logging.info("Inspecting job queue")
1601 de9d02c7 Michael Hanselmann
1602 7b5c4a69 Michael Hanselmann
    restartjobs = []
1603 7b5c4a69 Michael Hanselmann
1604 de9d02c7 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked()
1605 de9d02c7 Michael Hanselmann
    jobs_count = len(all_job_ids)
1606 de9d02c7 Michael Hanselmann
    lastinfo = time.time()
1607 de9d02c7 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1608 de9d02c7 Michael Hanselmann
      # Give an update every 1000 jobs or 10 seconds
1609 de9d02c7 Michael Hanselmann
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1610 de9d02c7 Michael Hanselmann
          idx == (jobs_count - 1)):
1611 de9d02c7 Michael Hanselmann
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1612 de9d02c7 Michael Hanselmann
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1613 711b5124 Michael Hanselmann
        lastinfo = time.time()
1614 94ed59a5 Iustin Pop
1615 de9d02c7 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1616 85f03e0d Michael Hanselmann
1617 de9d02c7 Michael Hanselmann
      # a failure in loading the job can cause 'None' to be returned
1618 de9d02c7 Michael Hanselmann
      if job is None:
1619 de9d02c7 Michael Hanselmann
        continue
1620 85f03e0d Michael Hanselmann
1621 de9d02c7 Michael Hanselmann
      status = job.CalcStatus()
1622 711b5124 Michael Hanselmann
1623 320d1daf Michael Hanselmann
      if status == constants.JOB_STATUS_QUEUED:
1624 7b5c4a69 Michael Hanselmann
        restartjobs.append(job)
1625 de9d02c7 Michael Hanselmann
1626 de9d02c7 Michael Hanselmann
      elif status in (constants.JOB_STATUS_RUNNING,
1627 47099cd1 Michael Hanselmann
                      constants.JOB_STATUS_WAITING,
1628 de9d02c7 Michael Hanselmann
                      constants.JOB_STATUS_CANCELING):
1629 de9d02c7 Michael Hanselmann
        logging.warning("Unfinished job %s found: %s", job.id, job)
1630 320d1daf Michael Hanselmann
1631 47099cd1 Michael Hanselmann
        if status == constants.JOB_STATUS_WAITING:
1632 320d1daf Michael Hanselmann
          # Restart job
1633 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1634 320d1daf Michael Hanselmann
          restartjobs.append(job)
1635 320d1daf Michael Hanselmann
        else:
1636 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1637 320d1daf Michael Hanselmann
                                "Unclean master daemon shutdown")
1638 45df0793 Michael Hanselmann
          job.Finalize()
1639 320d1daf Michael Hanselmann
1640 de9d02c7 Michael Hanselmann
        self.UpdateJobUnlocked(job)
1641 de9d02c7 Michael Hanselmann
1642 7b5c4a69 Michael Hanselmann
    if restartjobs:
1643 7b5c4a69 Michael Hanselmann
      logging.info("Restarting %s jobs", len(restartjobs))
1644 75d81fc8 Michael Hanselmann
      self._EnqueueJobsUnlocked(restartjobs)
1645 7b5c4a69 Michael Hanselmann
1646 de9d02c7 Michael Hanselmann
    logging.info("Job queue inspection finished")
1647 85f03e0d Michael Hanselmann
1648 fb1ffbca Michael Hanselmann
  def _GetRpc(self, address_list):
1649 fb1ffbca Michael Hanselmann
    """Gets RPC runner with context.
1650 fb1ffbca Michael Hanselmann

1651 fb1ffbca Michael Hanselmann
    """
1652 fb1ffbca Michael Hanselmann
    return rpc.JobQueueRunner(self.context, address_list)
1653 fb1ffbca Michael Hanselmann
1654 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1655 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1656 99aabbed Iustin Pop
  def AddNode(self, node):
1657 99aabbed Iustin Pop
    """Register a new node with the queue.
1658 99aabbed Iustin Pop

1659 99aabbed Iustin Pop
    @type node: L{objects.Node}
1660 99aabbed Iustin Pop
    @param node: the node object to be added
1661 99aabbed Iustin Pop

1662 99aabbed Iustin Pop
    """
1663 99aabbed Iustin Pop
    node_name = node.name
1664 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1665 23752136 Michael Hanselmann
1666 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1667 fb1ffbca Michael Hanselmann
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1668 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1669 c8457ce7 Iustin Pop
    if msg:
1670 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1671 c8457ce7 Iustin Pop
                      node_name, msg)
1672 23752136 Michael Hanselmann
1673 59303563 Iustin Pop
    if not node.master_candidate:
1674 59303563 Iustin Pop
      # remove if existing, ignoring errors
1675 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1676 59303563 Iustin Pop
      # and skip the replication of the job ids
1677 59303563 Iustin Pop
      return
1678 59303563 Iustin Pop
1679 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1680 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1681 23752136 Michael Hanselmann
1682 d2e03a33 Michael Hanselmann
    # Upload current serial file
1683 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1684 d2e03a33 Michael Hanselmann
1685 fb1ffbca Michael Hanselmann
    # Static address list
1686 fb1ffbca Michael Hanselmann
    addrs = [node.primary_ip]
1687 fb1ffbca Michael Hanselmann
1688 d2e03a33 Michael Hanselmann
    for file_name in files:
1689 9f774ee8 Michael Hanselmann
      # Read file content
1690 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1691 9f774ee8 Michael Hanselmann
1692 fb1ffbca Michael Hanselmann
      result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1693 fb1ffbca Michael Hanselmann
                                                        content)
1694 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1695 c8457ce7 Iustin Pop
      if msg:
1696 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1697 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1698 d2e03a33 Michael Hanselmann
1699 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1700 d2e03a33 Michael Hanselmann
1701 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1702 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1703 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1704 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1705 ea03467c Iustin Pop

1706 ea03467c Iustin Pop
    @type node_name: str
1707 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1708 ea03467c Iustin Pop

1709 ea03467c Iustin Pop
    """
1710 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1711 23752136 Michael Hanselmann
1712 7e950d31 Iustin Pop
  @staticmethod
1713 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1714 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1715 ea03467c Iustin Pop

1716 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1717 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1718 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1719 ea03467c Iustin Pop

1720 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1721 ea03467c Iustin Pop
    @type nodes: list
1722 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1723 ea03467c Iustin Pop
    @type failmsg: str
1724 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1725 ea03467c Iustin Pop

1726 ea03467c Iustin Pop
    """
1727 e74798c1 Michael Hanselmann
    failed = []
1728 e74798c1 Michael Hanselmann
    success = []
1729 e74798c1 Michael Hanselmann
1730 e74798c1 Michael Hanselmann
    for node in nodes:
1731 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1732 c8457ce7 Iustin Pop
      if msg:
1733 e74798c1 Michael Hanselmann
        failed.append(node)
1734 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1735 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1736 c8457ce7 Iustin Pop
      else:
1737 c8457ce7 Iustin Pop
        success.append(node)
1738 e74798c1 Michael Hanselmann
1739 e74798c1 Michael Hanselmann
    # +1 for the master node
1740 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1741 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1742 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1743 e74798c1 Michael Hanselmann
1744 99aabbed Iustin Pop
  def _GetNodeIp(self):
1745 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1746 99aabbed Iustin Pop

1747 ea03467c Iustin Pop
    @rtype: (list, list)
1748 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1749 ea03467c Iustin Pop
        names and the second one with the node addresses
1750 ea03467c Iustin Pop

1751 99aabbed Iustin Pop
    """
1752 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1753 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1754 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1755 99aabbed Iustin Pop
    return name_list, addr_list
1756 99aabbed Iustin Pop
1757 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1758 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1759 8e00939c Michael Hanselmann

1760 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1761 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1762 ea03467c Iustin Pop

1763 ea03467c Iustin Pop
    @type file_name: str
1764 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1765 ea03467c Iustin Pop
    @type data: str
1766 ea03467c Iustin Pop
    @param data: the new contents of the file
1767 4c36bdf5 Guido Trotter
    @type replicate: boolean
1768 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1769 ea03467c Iustin Pop

1770 8e00939c Michael Hanselmann
    """
1771 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1772 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1773 82b22e19 René Nussbaumer
                    gid=getents.masterd_gid)
1774 8e00939c Michael Hanselmann
1775 4c36bdf5 Guido Trotter
    if replicate:
1776 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1777 fb1ffbca Michael Hanselmann
      result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1778 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1779 23752136 Michael Hanselmann
1780 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1781 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1782 ea03467c Iustin Pop

1783 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1784 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1785 ea03467c Iustin Pop

1786 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1787 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1788 ea03467c Iustin Pop

1789 ea03467c Iustin Pop
    """
1790 dd875d32 Michael Hanselmann
    # Rename them locally
1791 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1792 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1793 abc1f2ce Michael Hanselmann
1794 dd875d32 Michael Hanselmann
    # ... and on all nodes
1795 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1796 fb1ffbca Michael Hanselmann
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1797 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1798 abc1f2ce Michael Hanselmann
1799 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1800 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1801 f1da30e6 Michael Hanselmann

1802 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1803 f1da30e6 Michael Hanselmann

1804 009e73d0 Iustin Pop
    @type count: integer
1805 009e73d0 Iustin Pop
    @param count: how many serials to return
1806 ea03467c Iustin Pop
    @rtype: str
1807 ea03467c Iustin Pop
    @return: a string representing the job identifier.
1808 f1da30e6 Michael Hanselmann

1809 f1da30e6 Michael Hanselmann
    """
1810 719f8fba Michael Hanselmann
    assert ht.TPositiveInt(count)
1811 719f8fba Michael Hanselmann
1812 f1da30e6 Michael Hanselmann
    # New number
1813 009e73d0 Iustin Pop
    serial = self._last_serial + count
1814 f1da30e6 Michael Hanselmann
1815 f1da30e6 Michael Hanselmann
    # Write to file
1816 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1817 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1818 f1da30e6 Michael Hanselmann
1819 1410a389 Michael Hanselmann
    result = [jstore.FormatJobID(v)
1820 3c88bf36 Michael Hanselmann
              for v in range(self._last_serial + 1, serial + 1)]
1821 3c88bf36 Michael Hanselmann
1822 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1823 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1824 f1da30e6 Michael Hanselmann
1825 3c88bf36 Michael Hanselmann
    assert len(result) == count
1826 3c88bf36 Michael Hanselmann
1827 009e73d0 Iustin Pop
    return result
1828 f1da30e6 Michael Hanselmann
1829 85f03e0d Michael Hanselmann
  @staticmethod
1830 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1831 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1832 ea03467c Iustin Pop

1833 ea03467c Iustin Pop
    @type job_id: str
1834 ea03467c Iustin Pop
    @param job_id: the job identifier
1835 ea03467c Iustin Pop
    @rtype: str
1836 ea03467c Iustin Pop
    @return: the path to the job file
1837 ea03467c Iustin Pop

1838 ea03467c Iustin Pop
    """
1839 c4feafe8 Iustin Pop
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1840 f1da30e6 Michael Hanselmann
1841 1410a389 Michael Hanselmann
  @staticmethod
1842 1410a389 Michael Hanselmann
  def _GetArchivedJobPath(job_id):
1843 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1844 ea03467c Iustin Pop

1845 ea03467c Iustin Pop
    @type job_id: str
1846 ea03467c Iustin Pop
    @param job_id: the job identifier
1847 ea03467c Iustin Pop
    @rtype: str
1848 ea03467c Iustin Pop
    @return: the path to the archived job file
1849 ea03467c Iustin Pop

1850 ea03467c Iustin Pop
    """
1851 0411c011 Iustin Pop
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1852 1410a389 Michael Hanselmann
                          jstore.GetArchiveDirectory(job_id),
1853 1410a389 Michael Hanselmann
                          "job-%s" % job_id)
1854 0cb94105 Michael Hanselmann
1855 cb66225d Michael Hanselmann
  @staticmethod
1856 cb66225d Michael Hanselmann
  def _GetJobIDsUnlocked(sort=True):
1857 911a495b Iustin Pop
    """Return all known job IDs.
1858 911a495b Iustin Pop

1859 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1860 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1861 ac0930b9 Iustin Pop
    extra IDs).
1862 ac0930b9 Iustin Pop

1863 85a1c57d Guido Trotter
    @type sort: boolean
1864 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1865 ea03467c Iustin Pop
    @rtype: list
1866 ea03467c Iustin Pop
    @return: the list of job IDs
1867 ea03467c Iustin Pop

1868 911a495b Iustin Pop
    """
1869 85a1c57d Guido Trotter
    jlist = []
1870 b5b8309d Guido Trotter
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1871 cb66225d Michael Hanselmann
      m = constants.JOB_FILE_RE.match(filename)
1872 85a1c57d Guido Trotter
      if m:
1873 85a1c57d Guido Trotter
        jlist.append(m.group(1))
1874 85a1c57d Guido Trotter
    if sort:
1875 85a1c57d Guido Trotter
      jlist = utils.NiceSort(jlist)
1876 f0d874fe Iustin Pop
    return jlist
1877 911a495b Iustin Pop
1878 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1879 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1880 ea03467c Iustin Pop

1881 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1882 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1883 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1884 ea03467c Iustin Pop

1885 ea03467c Iustin Pop
    @param job_id: the job id
1886 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1887 ea03467c Iustin Pop
    @return: either None or the job object
1888 ea03467c Iustin Pop

1889 ea03467c Iustin Pop
    """
1890 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
1891 5685c1a5 Michael Hanselmann
    if job:
1892 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
1893 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Found read-only job in memcache"
1894 5685c1a5 Michael Hanselmann
      return job
1895 ac0930b9 Iustin Pop
1896 3d6c5566 Guido Trotter
    try:
1897 194c8ca4 Michael Hanselmann
      job = self._LoadJobFromDisk(job_id, False)
1898 aa9f8167 Iustin Pop
      if job is None:
1899 aa9f8167 Iustin Pop
        return job
1900 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
1901 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
1902 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
1903 3d6c5566 Guido Trotter
      if old_path == new_path:
1904 3d6c5566 Guido Trotter
        # job already archived (future case)
1905 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
1906 3d6c5566 Guido Trotter
      else:
1907 3d6c5566 Guido Trotter
        # non-archived case
1908 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
1909 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
1910 3d6c5566 Guido Trotter
      return None
1911 162c8636 Guido Trotter
1912 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Job just loaded is not writable"
1913 c0f6d0d8 Michael Hanselmann
1914 162c8636 Guido Trotter
    self._memcache[job_id] = job
1915 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
1916 162c8636 Guido Trotter
    return job
1917 162c8636 Guido Trotter
1918 c0f6d0d8 Michael Hanselmann
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1919 162c8636 Guido Trotter
    """Load the given job file from disk.
1920 162c8636 Guido Trotter

1921 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1922 162c8636 Guido Trotter

1923 162c8636 Guido Trotter
    @type job_id: string
1924 162c8636 Guido Trotter
    @param job_id: job identifier
1925 194c8ca4 Michael Hanselmann
    @type try_archived: bool
1926 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
1927 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
1928 162c8636 Guido Trotter
    @return: either None or the job object
1929 162c8636 Guido Trotter

1930 162c8636 Guido Trotter
    """
1931 c0f6d0d8 Michael Hanselmann
    path_functions = [(self._GetJobPath, True)]
1932 194c8ca4 Michael Hanselmann
1933 194c8ca4 Michael Hanselmann
    if try_archived:
1934 c0f6d0d8 Michael Hanselmann
      path_functions.append((self._GetArchivedJobPath, False))
1935 194c8ca4 Michael Hanselmann
1936 194c8ca4 Michael Hanselmann
    raw_data = None
1937 c0f6d0d8 Michael Hanselmann
    writable_default = None
1938 194c8ca4 Michael Hanselmann
1939 c0f6d0d8 Michael Hanselmann
    for (fn, writable_default) in path_functions:
1940 194c8ca4 Michael Hanselmann
      filepath = fn(job_id)
1941 194c8ca4 Michael Hanselmann
      logging.debug("Loading job from %s", filepath)
1942 194c8ca4 Michael Hanselmann
      try:
1943 194c8ca4 Michael Hanselmann
        raw_data = utils.ReadFile(filepath)
1944 194c8ca4 Michael Hanselmann
      except EnvironmentError, err:
1945 194c8ca4 Michael Hanselmann
        if err.errno != errno.ENOENT:
1946 194c8ca4 Michael Hanselmann
          raise
1947 194c8ca4 Michael Hanselmann
      else:
1948 194c8ca4 Michael Hanselmann
        break
1949 194c8ca4 Michael Hanselmann
1950 194c8ca4 Michael Hanselmann
    if not raw_data:
1951 194c8ca4 Michael Hanselmann
      return None
1952 13998ef2 Michael Hanselmann
1953 c0f6d0d8 Michael Hanselmann
    if writable is None:
1954 c0f6d0d8 Michael Hanselmann
      writable = writable_default
1955 c0f6d0d8 Michael Hanselmann
1956 94ed59a5 Iustin Pop
    try:
1957 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
1958 c0f6d0d8 Michael Hanselmann
      job = _QueuedJob.Restore(self, data, writable)
1959 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1960 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
1961 94ed59a5 Iustin Pop
1962 ac0930b9 Iustin Pop
    return job
1963 f1da30e6 Michael Hanselmann
1964 c0f6d0d8 Michael Hanselmann
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1965 0f9c08dc Guido Trotter
    """Load the given job file from disk.
1966 0f9c08dc Guido Trotter

1967 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1968 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
1969 0f9c08dc Guido Trotter
    exception is logged.
1970 0f9c08dc Guido Trotter

1971 0f9c08dc Guido Trotter
    @type job_id: string
1972 0f9c08dc Guido Trotter
    @param job_id: job identifier
1973 194c8ca4 Michael Hanselmann
    @type try_archived: bool
1974 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
1975 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
1976 0f9c08dc Guido Trotter
    @return: either None or the job object
1977 0f9c08dc Guido Trotter

1978 0f9c08dc Guido Trotter
    """
1979 0f9c08dc Guido Trotter
    try:
1980 c0f6d0d8 Michael Hanselmann
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1981 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
1982 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
1983 0f9c08dc Guido Trotter
      return None
1984 0f9c08dc Guido Trotter
1985 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
1986 20571a26 Guido Trotter
    """Update the queue size.
1987 20571a26 Guido Trotter

1988 20571a26 Guido Trotter
    """
1989 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1990 20571a26 Guido Trotter
1991 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1992 20571a26 Guido Trotter
  @_RequireOpenQueue
1993 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
1994 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
1995 3ccafd0e Iustin Pop

1996 ea03467c Iustin Pop
    @type drain_flag: boolean
1997 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
1998 ea03467c Iustin Pop

1999 3ccafd0e Iustin Pop
    """
2000 ff699aa9 Michael Hanselmann
    jstore.SetDrainFlag(drain_flag)
2001 20571a26 Guido Trotter
2002 20571a26 Guido Trotter
    self._drained = drain_flag
2003 20571a26 Guido Trotter
2004 3ccafd0e Iustin Pop
    return True
2005 3ccafd0e Iustin Pop
2006 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2007 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
2008 85f03e0d Michael Hanselmann
    """Create and store a new job.
2009 f1da30e6 Michael Hanselmann

2010 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
2011 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
2012 c3f0a12f Iustin Pop

2013 009e73d0 Iustin Pop
    @type job_id: job ID
2014 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
2015 c3f0a12f Iustin Pop
    @type ops: list
2016 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
2017 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
2018 7beb1e53 Guido Trotter
    @return: the job object to be queued
2019 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2020 e71c8147 Michael Hanselmann
    @raise errors.GenericError: If an opcode is not valid
2021 c3f0a12f Iustin Pop

2022 c3f0a12f Iustin Pop
    """
2023 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2024 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
2025 f87b405e Michael Hanselmann
2026 c0f6d0d8 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops, True)
2027 f1da30e6 Michael Hanselmann
2028 e71c8147 Michael Hanselmann
    # Check priority
2029 e71c8147 Michael Hanselmann
    for idx, op in enumerate(job.ops):
2030 e71c8147 Michael Hanselmann
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2031 e71c8147 Michael Hanselmann
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2032 e71c8147 Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2033 e71c8147 Michael Hanselmann
                                  " are %s" % (idx, op.priority, allowed))
2034 e71c8147 Michael Hanselmann
2035 b247c6fc Michael Hanselmann
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2036 b247c6fc Michael Hanselmann
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2037 b247c6fc Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2038 b247c6fc Michael Hanselmann
                                  " match %s: %s" %
2039 b247c6fc Michael Hanselmann
                                  (idx, opcodes.TNoRelativeJobDependencies,
2040 b247c6fc Michael Hanselmann
                                   dependencies))
2041 b247c6fc Michael Hanselmann
2042 f1da30e6 Michael Hanselmann
    # Write to disk
2043 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
2044 f1da30e6 Michael Hanselmann
2045 20571a26 Guido Trotter
    self._queue_size += 1
2046 20571a26 Guido Trotter
2047 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
2048 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
2049 ac0930b9 Iustin Pop
2050 7beb1e53 Guido Trotter
    return job
2051 f1da30e6 Michael Hanselmann
2052 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2053 2971c913 Iustin Pop
  @_RequireOpenQueue
2054 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2055 2971c913 Iustin Pop
  def SubmitJob(self, ops):
2056 2971c913 Iustin Pop
    """Create and store a new job.
2057 2971c913 Iustin Pop

2058 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2059 2971c913 Iustin Pop

2060 2971c913 Iustin Pop
    """
2061 b247c6fc Michael Hanselmann
    (job_id, ) = self._NewSerialsUnlocked(1)
2062 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2063 7beb1e53 Guido Trotter
    return job_id
2064 2971c913 Iustin Pop
2065 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2066 2971c913 Iustin Pop
  @_RequireOpenQueue
2067 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2068 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
2069 2971c913 Iustin Pop
    """Create and store multiple jobs.
2070 2971c913 Iustin Pop

2071 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2072 2971c913 Iustin Pop

2073 2971c913 Iustin Pop
    """
2074 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2075 b247c6fc Michael Hanselmann
2076 b247c6fc Michael Hanselmann
    (results, added_jobs) = \
2077 b247c6fc Michael Hanselmann
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2078 7b5c4a69 Michael Hanselmann
2079 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked(added_jobs)
2080 2971c913 Iustin Pop
2081 2971c913 Iustin Pop
    return results
2082 2971c913 Iustin Pop
2083 b247c6fc Michael Hanselmann
  @staticmethod
2084 b247c6fc Michael Hanselmann
  def _FormatSubmitError(msg, ops):
2085 b247c6fc Michael Hanselmann
    """Formats errors which occurred while submitting a job.
2086 b247c6fc Michael Hanselmann

2087 b247c6fc Michael Hanselmann
    """
2088 b247c6fc Michael Hanselmann
    return ("%s; opcodes %s" %
2089 b247c6fc Michael Hanselmann
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2090 b247c6fc Michael Hanselmann
2091 b247c6fc Michael Hanselmann
  @staticmethod
2092 b247c6fc Michael Hanselmann
  def _ResolveJobDependencies(resolve_fn, deps):
2093 b247c6fc Michael Hanselmann
    """Resolves relative job IDs in dependencies.
2094 b247c6fc Michael Hanselmann

2095 b247c6fc Michael Hanselmann
    @type resolve_fn: callable
2096 b247c6fc Michael Hanselmann
    @param resolve_fn: Function to resolve a relative job ID
2097 b247c6fc Michael Hanselmann
    @type deps: list
2098 b247c6fc Michael Hanselmann
    @param deps: Dependencies
2099 b247c6fc Michael Hanselmann
    @rtype: list
2100 b247c6fc Michael Hanselmann
    @return: Resolved dependencies
2101 b247c6fc Michael Hanselmann

2102 b247c6fc Michael Hanselmann
    """
2103 b247c6fc Michael Hanselmann
    result = []
2104 b247c6fc Michael Hanselmann
2105 b247c6fc Michael Hanselmann
    for (dep_job_id, dep_status) in deps:
2106 b247c6fc Michael Hanselmann
      if ht.TRelativeJobId(dep_job_id):
2107 b247c6fc Michael Hanselmann
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2108 b247c6fc Michael Hanselmann
        try:
2109 b247c6fc Michael Hanselmann
          job_id = resolve_fn(dep_job_id)
2110 b247c6fc Michael Hanselmann
        except IndexError:
2111 b247c6fc Michael Hanselmann
          # Abort
2112 b247c6fc Michael Hanselmann
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2113 b247c6fc Michael Hanselmann
      else:
2114 b247c6fc Michael Hanselmann
        job_id = dep_job_id
2115 b247c6fc Michael Hanselmann
2116 b247c6fc Michael Hanselmann
      result.append((job_id, dep_status))
2117 b247c6fc Michael Hanselmann
2118 b247c6fc Michael Hanselmann
    return (True, result)
2119 b247c6fc Michael Hanselmann
2120 b247c6fc Michael Hanselmann
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2121 b247c6fc Michael Hanselmann
    """Create and store multiple jobs.
2122 b247c6fc Michael Hanselmann

2123 b247c6fc Michael Hanselmann
    @see: L{_SubmitJobUnlocked}
2124 b247c6fc Michael Hanselmann

2125 b247c6fc Michael Hanselmann
    """
2126 b247c6fc Michael Hanselmann
    results = []
2127 b247c6fc Michael Hanselmann
    added_jobs = []
2128 b247c6fc Michael Hanselmann
2129 b247c6fc Michael Hanselmann
    def resolve_fn(job_idx, reljobid):
2130 b247c6fc Michael Hanselmann
      assert reljobid < 0
2131 b247c6fc Michael Hanselmann
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2132 b247c6fc Michael Hanselmann
2133 b247c6fc Michael Hanselmann
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2134 b247c6fc Michael Hanselmann
      for op in ops:
2135 b247c6fc Michael Hanselmann
        if getattr(op, opcodes.DEPEND_ATTR, None):
2136 b247c6fc Michael Hanselmann
          (status, data) = \
2137 b247c6fc Michael Hanselmann
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2138 b247c6fc Michael Hanselmann
                                         op.depends)
2139 b247c6fc Michael Hanselmann
          if not status:
2140 b247c6fc Michael Hanselmann
            # Abort resolving dependencies
2141 b247c6fc Michael Hanselmann
            assert ht.TNonEmptyString(data), "No error message"
2142 b247c6fc Michael Hanselmann
            break
2143 b247c6fc Michael Hanselmann
          # Use resolved dependencies
2144 b247c6fc Michael Hanselmann
          op.depends = data
2145 b247c6fc Michael Hanselmann
      else:
2146 b247c6fc Michael Hanselmann
        try:
2147 b247c6fc Michael Hanselmann
          job = self._SubmitJobUnlocked(job_id, ops)
2148 b247c6fc Michael Hanselmann
        except errors.GenericError, err:
2149 b247c6fc Michael Hanselmann
          status = False
2150 b247c6fc Michael Hanselmann
          data = self._FormatSubmitError(str(err), ops)
2151 b247c6fc Michael Hanselmann
        else:
2152 b247c6fc Michael Hanselmann
          status = True
2153 b247c6fc Michael Hanselmann
          data = job_id
2154 b247c6fc Michael Hanselmann
          added_jobs.append(job)
2155 b247c6fc Michael Hanselmann
2156 b247c6fc Michael Hanselmann
      results.append((status, data))
2157 b247c6fc Michael Hanselmann
2158 b247c6fc Michael Hanselmann
    return (results, added_jobs)
2159 b247c6fc Michael Hanselmann
2160 75d81fc8 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2161 7b5c4a69 Michael Hanselmann
  def _EnqueueJobs(self, jobs):
2162 7b5c4a69 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2163 7b5c4a69 Michael Hanselmann

2164 7b5c4a69 Michael Hanselmann
    @type jobs: list
2165 7b5c4a69 Michael Hanselmann
    @param jobs: List of all jobs
2166 7b5c4a69 Michael Hanselmann

2167 7b5c4a69 Michael Hanselmann
    """
2168 75d81fc8 Michael Hanselmann
    return self._EnqueueJobsUnlocked(jobs)
2169 75d81fc8 Michael Hanselmann
2170 75d81fc8 Michael Hanselmann
  def _EnqueueJobsUnlocked(self, jobs):
2171 75d81fc8 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2172 75d81fc8 Michael Hanselmann

2173 75d81fc8 Michael Hanselmann
    @type jobs: list
2174 75d81fc8 Michael Hanselmann
    @param jobs: List of all jobs
2175 75d81fc8 Michael Hanselmann

2176 75d81fc8 Michael Hanselmann
    """
2177 75d81fc8 Michael Hanselmann
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2178 7b5c4a69 Michael Hanselmann
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2179 7b5c4a69 Michael Hanselmann
                             priority=[job.CalcPriority() for job in jobs])
2180 7b5c4a69 Michael Hanselmann
2181 b95479a5 Michael Hanselmann
  def _GetJobStatusForDependencies(self, job_id):
2182 b95479a5 Michael Hanselmann
    """Gets the status of a job for dependencies.
2183 b95479a5 Michael Hanselmann

2184 b95479a5 Michael Hanselmann
    @type job_id: string
2185 b95479a5 Michael Hanselmann
    @param job_id: Job ID
2186 b95479a5 Michael Hanselmann
    @raise errors.JobLost: If job can't be found
2187 b95479a5 Michael Hanselmann

2188 b95479a5 Michael Hanselmann
    """
2189 b95479a5 Michael Hanselmann
    if not isinstance(job_id, basestring):
2190 1410a389 Michael Hanselmann
      job_id = jstore.FormatJobID(job_id)
2191 b95479a5 Michael Hanselmann
2192 b95479a5 Michael Hanselmann
    # Not using in-memory cache as doing so would require an exclusive lock
2193 b95479a5 Michael Hanselmann
2194 b95479a5 Michael Hanselmann
    # Try to load from disk
2195 c0f6d0d8 Michael Hanselmann
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2196 c0f6d0d8 Michael Hanselmann
2197 17385bd2 Andrea Spadaccini
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2198 b95479a5 Michael Hanselmann
2199 b95479a5 Michael Hanselmann
    if job:
2200 b95479a5 Michael Hanselmann
      return job.CalcStatus()
2201 b95479a5 Michael Hanselmann
2202 b95479a5 Michael Hanselmann
    raise errors.JobLost("Job %s not found" % job_id)
2203 b95479a5 Michael Hanselmann
2204 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2205 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
2206 ea03467c Iustin Pop
    """Update a job's on disk storage.
2207 ea03467c Iustin Pop

2208 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
2209 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
2210 ea03467c Iustin Pop
    nodes.
2211 ea03467c Iustin Pop

2212 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
2213 ea03467c Iustin Pop
    @param job: the changed job
2214 4c36bdf5 Guido Trotter
    @type replicate: boolean
2215 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
2216 ea03467c Iustin Pop

2217 ea03467c Iustin Pop
    """
2218 66bd7445 Michael Hanselmann
    if __debug__:
2219 66bd7445 Michael Hanselmann
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2220 66bd7445 Michael Hanselmann
      assert (finalized ^ (job.end_timestamp is None))
2221 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't update read-only job"
2222 66bd7445 Michael Hanselmann
2223 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
2224 a182a3ed Michael Hanselmann
    data = serializer.DumpJson(job.Serialize())
2225 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
2226 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
2227 ac0930b9 Iustin Pop
2228 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2229 5c735209 Iustin Pop
                        timeout):
2230 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
2231 6c5a7090 Michael Hanselmann

2232 6c5a7090 Michael Hanselmann
    @type job_id: string
2233 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
2234 6c5a7090 Michael Hanselmann
    @type fields: list of strings
2235 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
2236 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
2237 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
2238 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
2239 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
2240 5c735209 Iustin Pop
    @type timeout: float
2241 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
2242 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
2243 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
2244 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
2245 ea03467c Iustin Pop

2246 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
2247 ea03467c Iustin Pop
        we instead return a special value,
2248 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2249 ea03467c Iustin Pop
        as such by the clients
2250 6c5a7090 Michael Hanselmann

2251 6c5a7090 Michael Hanselmann
    """
2252 c0f6d0d8 Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2253 c0f6d0d8 Michael Hanselmann
                             writable=False)
2254 989a8bee Michael Hanselmann
2255 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
2256 989a8bee Michael Hanselmann
2257 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
2258 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
2259 dfe57c22 Michael Hanselmann
2260 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2261 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2262 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
2263 188c5e0a Michael Hanselmann
    """Cancels a job.
2264 188c5e0a Michael Hanselmann

2265 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
2266 ea03467c Iustin Pop

2267 188c5e0a Michael Hanselmann
    @type job_id: string
2268 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
2269 188c5e0a Michael Hanselmann

2270 188c5e0a Michael Hanselmann
    """
2271 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
2272 188c5e0a Michael Hanselmann
2273 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2274 188c5e0a Michael Hanselmann
    if not job:
2275 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2276 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
2277 fbf0262f Michael Hanselmann
2278 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Can't cancel read-only job"
2279 c0f6d0d8 Michael Hanselmann
2280 099b2870 Michael Hanselmann
    (success, msg) = job.Cancel()
2281 188c5e0a Michael Hanselmann
2282 099b2870 Michael Hanselmann
    if success:
2283 66bd7445 Michael Hanselmann
      # If the job was finalized (e.g. cancelled), this is the final write
2284 66bd7445 Michael Hanselmann
      # allowed. The job can be archived anytime.
2285 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
2286 fbf0262f Michael Hanselmann
2287 099b2870 Michael Hanselmann
    return (success, msg)
2288 fbf0262f Michael Hanselmann
2289 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
2290 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
2291 d7fd1f28 Michael Hanselmann
    """Archives jobs.
2292 c609f802 Michael Hanselmann

2293 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
2294 25e7b43f Iustin Pop
    @param jobs: Job objects
2295 d7fd1f28 Michael Hanselmann
    @rtype: int
2296 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
2297 c609f802 Michael Hanselmann

2298 c609f802 Michael Hanselmann
    """
2299 d7fd1f28 Michael Hanselmann
    archive_jobs = []
2300 d7fd1f28 Michael Hanselmann
    rename_files = []
2301 d7fd1f28 Michael Hanselmann
    for job in jobs:
2302 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't archive read-only job"
2303 c0f6d0d8 Michael Hanselmann
2304 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2305 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
2306 d7fd1f28 Michael Hanselmann
        continue
2307 c609f802 Michael Hanselmann
2308 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
2309 c609f802 Michael Hanselmann
2310 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
2311 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
2312 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
2313 c609f802 Michael Hanselmann
2314 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
2315 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
2316 f1da30e6 Michael Hanselmann
2317 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
2318 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
2319 d7fd1f28 Michael Hanselmann
2320 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2321 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
2322 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
2323 20571a26 Guido Trotter
    # archived jobs to fix this.
2324 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
2325 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
2326 78d12585 Michael Hanselmann
2327 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2328 07cd723a Iustin Pop
  @_RequireOpenQueue
2329 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
2330 07cd723a Iustin Pop
    """Archives a job.
2331 07cd723a Iustin Pop

2332 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2333 ea03467c Iustin Pop

2334 07cd723a Iustin Pop
    @type job_id: string
2335 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
2336 78d12585 Michael Hanselmann
    @rtype: bool
2337 78d12585 Michael Hanselmann
    @return: Whether job was archived
2338 07cd723a Iustin Pop

2339 07cd723a Iustin Pop
    """
2340 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
2341 78d12585 Michael Hanselmann
2342 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2343 78d12585 Michael Hanselmann
    if not job:
2344 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2345 78d12585 Michael Hanselmann
      return False
2346 78d12585 Michael Hanselmann
2347 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
2348 07cd723a Iustin Pop
2349 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2350 07cd723a Iustin Pop
  @_RequireOpenQueue
2351 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
2352 07cd723a Iustin Pop
    """Archives all jobs based on age.
2353 07cd723a Iustin Pop

2354 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
2355 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
2356 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
2357 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
2358 07cd723a Iustin Pop

2359 07cd723a Iustin Pop
    @type age: int
2360 07cd723a Iustin Pop
    @param age: the minimum age in seconds
2361 07cd723a Iustin Pop

2362 07cd723a Iustin Pop
    """
2363 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
2364 07cd723a Iustin Pop
2365 07cd723a Iustin Pop
    now = time.time()
2366 f8ad5591 Michael Hanselmann
    end_time = now + timeout
2367 f8ad5591 Michael Hanselmann
    archived_count = 0
2368 f8ad5591 Michael Hanselmann
    last_touched = 0
2369 f8ad5591 Michael Hanselmann
2370 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
2371 d7fd1f28 Michael Hanselmann
    pending = []
2372 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
2373 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
2374 f8ad5591 Michael Hanselmann
2375 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
2376 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
2377 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
2378 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
2379 f8ad5591 Michael Hanselmann
        break
2380 f8ad5591 Michael Hanselmann
2381 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
2382 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
2383 f8ad5591 Michael Hanselmann
      if job:
2384 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
2385 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
2386 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
2387 f8ad5591 Michael Hanselmann
          else:
2388 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
2389 07cd723a Iustin Pop
        else:
2390 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
2391 f8ad5591 Michael Hanselmann
2392 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
2393 d7fd1f28 Michael Hanselmann
          pending.append(job)
2394 d7fd1f28 Michael Hanselmann
2395 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
2396 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
2397 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
2398 d7fd1f28 Michael Hanselmann
            pending = []
2399 f8ad5591 Michael Hanselmann
2400 d7fd1f28 Michael Hanselmann
    if pending:
2401 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
2402 07cd723a Iustin Pop
2403 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
2404 07cd723a Iustin Pop
2405 e07f7f7a Michael Hanselmann
  def _Query(self, fields, qfilter):
2406 e07f7f7a Michael Hanselmann
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2407 e07f7f7a Michael Hanselmann
                       namefield="id")
2408 e07f7f7a Michael Hanselmann
2409 e07f7f7a Michael Hanselmann
    job_ids = qobj.RequestedNames()
2410 e07f7f7a Michael Hanselmann
2411 e07f7f7a Michael Hanselmann
    list_all = (job_ids is None)
2412 e07f7f7a Michael Hanselmann
2413 e07f7f7a Michael Hanselmann
    if list_all:
2414 e07f7f7a Michael Hanselmann
      # Since files are added to/removed from the queue atomically, there's no
2415 e07f7f7a Michael Hanselmann
      # risk of getting the job ids in an inconsistent state.
2416 e07f7f7a Michael Hanselmann
      job_ids = self._GetJobIDsUnlocked()
2417 e07f7f7a Michael Hanselmann
2418 e07f7f7a Michael Hanselmann
    jobs = []
2419 e07f7f7a Michael Hanselmann
2420 e07f7f7a Michael Hanselmann
    for job_id in job_ids:
2421 e07f7f7a Michael Hanselmann
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2422 e07f7f7a Michael Hanselmann
      if job is not None or not list_all:
2423 e07f7f7a Michael Hanselmann
        jobs.append((job_id, job))
2424 e07f7f7a Michael Hanselmann
2425 e07f7f7a Michael Hanselmann
    return (qobj, jobs, list_all)
2426 e07f7f7a Michael Hanselmann
2427 e07f7f7a Michael Hanselmann
  def QueryJobs(self, fields, qfilter):
2428 e07f7f7a Michael Hanselmann
    """Returns a list of jobs in queue.
2429 e07f7f7a Michael Hanselmann

2430 e07f7f7a Michael Hanselmann
    @type fields: sequence
2431 e07f7f7a Michael Hanselmann
    @param fields: List of wanted fields
2432 e07f7f7a Michael Hanselmann
    @type qfilter: None or query2 filter (list)
2433 e07f7f7a Michael Hanselmann
    @param qfilter: Query filter
2434 e07f7f7a Michael Hanselmann

2435 e07f7f7a Michael Hanselmann
    """
2436 e07f7f7a Michael Hanselmann
    (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2437 e07f7f7a Michael Hanselmann
2438 e07f7f7a Michael Hanselmann
    return query.GetQueryResponse(qobj, ctx, sort_by_name=sort_by_name)
2439 e07f7f7a Michael Hanselmann
2440 e07f7f7a Michael Hanselmann
  def OldStyleQueryJobs(self, job_ids, fields):
2441 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
2442 e2715f69 Michael Hanselmann

2443 ea03467c Iustin Pop
    @type job_ids: list
2444 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
2445 ea03467c Iustin Pop
    @type fields: list
2446 ea03467c Iustin Pop
    @param fields: names of fields to return
2447 ea03467c Iustin Pop
    @rtype: list
2448 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
2449 ea03467c Iustin Pop
        the requested fields
2450 e2715f69 Michael Hanselmann

2451 e2715f69 Michael Hanselmann
    """
2452 e07f7f7a Michael Hanselmann
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2453 e2715f69 Michael Hanselmann
2454 e07f7f7a Michael Hanselmann
    (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2455 e2715f69 Michael Hanselmann
2456 e07f7f7a Michael Hanselmann
    return qobj.OldStyleQuery(ctx, sort_by_name=sort_by_name)
2457 e2715f69 Michael Hanselmann
2458 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2459 6d5ea385 Michael Hanselmann
  def PrepareShutdown(self):
2460 6d5ea385 Michael Hanselmann
    """Prepare to stop the job queue.
2461 6d5ea385 Michael Hanselmann

2462 6d5ea385 Michael Hanselmann
    Disables execution of jobs in the workerpool and returns whether there are
2463 6d5ea385 Michael Hanselmann
    any jobs currently running. If the latter is the case, the job queue is not
2464 6d5ea385 Michael Hanselmann
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2465 6d5ea385 Michael Hanselmann
    be called without interfering with any job. Queued and unfinished jobs will
2466 6d5ea385 Michael Hanselmann
    be resumed next time.
2467 6d5ea385 Michael Hanselmann

2468 6d5ea385 Michael Hanselmann
    Once this function has been called no new job submissions will be accepted
2469 6d5ea385 Michael Hanselmann
    (see L{_RequireNonDrainedQueue}).
2470 6d5ea385 Michael Hanselmann

2471 6d5ea385 Michael Hanselmann
    @rtype: bool
2472 6d5ea385 Michael Hanselmann
    @return: Whether there are any running jobs
2473 6d5ea385 Michael Hanselmann

2474 6d5ea385 Michael Hanselmann
    """
2475 6d5ea385 Michael Hanselmann
    if self._accepting_jobs:
2476 6d5ea385 Michael Hanselmann
      self._accepting_jobs = False
2477 6d5ea385 Michael Hanselmann
2478 6d5ea385 Michael Hanselmann
      # Tell worker pool to stop processing pending tasks
2479 6d5ea385 Michael Hanselmann
      self._wpool.SetActive(False)
2480 6d5ea385 Michael Hanselmann
2481 6d5ea385 Michael Hanselmann
    return self._wpool.HasRunningTasks()
2482 6d5ea385 Michael Hanselmann
2483 6d5ea385 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2484 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2485 e2715f69 Michael Hanselmann
  def Shutdown(self):
2486 e2715f69 Michael Hanselmann
    """Stops the job queue.
2487 e2715f69 Michael Hanselmann

2488 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
2489 ea03467c Iustin Pop

2490 e2715f69 Michael Hanselmann
    """
2491 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
2492 85f03e0d Michael Hanselmann
2493 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
2494 a71f9c7d Guido Trotter
    self._queue_filelock = None