Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ cffbbae7

History | View | Annotate | Download (72.8 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 76b62028 Iustin Pop
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 e2715f69 Michael Hanselmann
import logging
33 f1da30e6 Michael Hanselmann
import errno
34 f1048938 Iustin Pop
import time
35 5685c1a5 Michael Hanselmann
import weakref
36 b95479a5 Michael Hanselmann
import threading
37 dfc8824a Michael Hanselmann
import itertools
38 498ae1cc Iustin Pop
39 6c2549d6 Guido Trotter
try:
40 b459a848 Andrea Spadaccini
  # pylint: disable=E0611
41 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
42 6c2549d6 Guido Trotter
except ImportError:
43 6c2549d6 Guido Trotter
  import pyinotify
44 6c2549d6 Guido Trotter
45 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
46 e2715f69 Michael Hanselmann
from ganeti import constants
47 f1da30e6 Michael Hanselmann
from ganeti import serializer
48 e2715f69 Michael Hanselmann
from ganeti import workerpool
49 99bd4f0a Guido Trotter
from ganeti import locking
50 f1da30e6 Michael Hanselmann
from ganeti import opcodes
51 7a1ecaed Iustin Pop
from ganeti import errors
52 e2715f69 Michael Hanselmann
from ganeti import mcpu
53 7996a135 Iustin Pop
from ganeti import utils
54 04ab05ce Michael Hanselmann
from ganeti import jstore
55 c3f0a12f Iustin Pop
from ganeti import rpc
56 82b22e19 René Nussbaumer
from ganeti import runtime
57 a744b676 Manuel Franceschini
from ganeti import netutils
58 989a8bee Michael Hanselmann
from ganeti import compat
59 b95479a5 Michael Hanselmann
from ganeti import ht
60 a06c6ae8 Michael Hanselmann
from ganeti import query
61 a06c6ae8 Michael Hanselmann
from ganeti import qlang
62 e2b4a7ba Michael Hanselmann
from ganeti import pathutils
63 cffbbae7 Michael Hanselmann
from ganeti import vcluster
64 e2715f69 Michael Hanselmann
65 fbf0262f Michael Hanselmann
66 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
67 e2715f69 Michael Hanselmann
68 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
69 ebb80afa Guido Trotter
_LOCK = "_lock"
70 ebb80afa Guido Trotter
_QUEUE = "_queue"
71 99bd4f0a Guido Trotter
72 498ae1cc Iustin Pop
73 9728ae5d Iustin Pop
class CancelJob(Exception):
74 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
75 fbf0262f Michael Hanselmann

76 fbf0262f Michael Hanselmann
  """
77 fbf0262f Michael Hanselmann
78 fbf0262f Michael Hanselmann
79 70552c46 Michael Hanselmann
def TimeStampNow():
80 ea03467c Iustin Pop
  """Returns the current timestamp.
81 ea03467c Iustin Pop

82 ea03467c Iustin Pop
  @rtype: tuple
83 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
84 ea03467c Iustin Pop

85 ea03467c Iustin Pop
  """
86 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
87 70552c46 Michael Hanselmann
88 70552c46 Michael Hanselmann
89 cffbbae7 Michael Hanselmann
def _CallJqUpdate(runner, names, file_name, content):
90 cffbbae7 Michael Hanselmann
  """Updates job queue file after virtualizing filename.
91 cffbbae7 Michael Hanselmann

92 cffbbae7 Michael Hanselmann
  """
93 cffbbae7 Michael Hanselmann
  virt_file_name = vcluster.MakeVirtualPath(file_name)
94 cffbbae7 Michael Hanselmann
  return runner.call_jobqueue_update(names, virt_file_name, content)
95 cffbbae7 Michael Hanselmann
96 cffbbae7 Michael Hanselmann
97 a06c6ae8 Michael Hanselmann
class _SimpleJobQuery:
98 a06c6ae8 Michael Hanselmann
  """Wrapper for job queries.
99 a06c6ae8 Michael Hanselmann

100 a06c6ae8 Michael Hanselmann
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
101 a06c6ae8 Michael Hanselmann

102 a06c6ae8 Michael Hanselmann
  """
103 a06c6ae8 Michael Hanselmann
  def __init__(self, fields):
104 a06c6ae8 Michael Hanselmann
    """Initializes this class.
105 a06c6ae8 Michael Hanselmann

106 a06c6ae8 Michael Hanselmann
    """
107 a06c6ae8 Michael Hanselmann
    self._query = query.Query(query.JOB_FIELDS, fields)
108 a06c6ae8 Michael Hanselmann
109 a06c6ae8 Michael Hanselmann
  def __call__(self, job):
110 a06c6ae8 Michael Hanselmann
    """Executes a job query using cached field list.
111 a06c6ae8 Michael Hanselmann

112 a06c6ae8 Michael Hanselmann
    """
113 a06c6ae8 Michael Hanselmann
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
114 a06c6ae8 Michael Hanselmann
115 a06c6ae8 Michael Hanselmann
116 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
117 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
118 e2715f69 Michael Hanselmann

119 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
120 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
121 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
122 ea03467c Iustin Pop
  @ivar status: the current status
123 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
124 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
125 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
126 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
127 f1048938 Iustin Pop

128 e2715f69 Michael Hanselmann
  """
129 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
130 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
131 66d895a8 Iustin Pop
               "__weakref__"]
132 66d895a8 Iustin Pop
133 85f03e0d Michael Hanselmann
  def __init__(self, op):
134 66abb9ff Michael Hanselmann
    """Initializes instances of this class.
135 ea03467c Iustin Pop

136 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
137 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
138 ea03467c Iustin Pop

139 ea03467c Iustin Pop
    """
140 85f03e0d Michael Hanselmann
    self.input = op
141 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
142 85f03e0d Michael Hanselmann
    self.result = None
143 85f03e0d Michael Hanselmann
    self.log = []
144 70552c46 Michael Hanselmann
    self.start_timestamp = None
145 b9b5abcb Iustin Pop
    self.exec_timestamp = None
146 70552c46 Michael Hanselmann
    self.end_timestamp = None
147 f1da30e6 Michael Hanselmann
148 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
149 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
150 8f5c488d Michael Hanselmann
151 f1da30e6 Michael Hanselmann
  @classmethod
152 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
153 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
154 ea03467c Iustin Pop

155 ea03467c Iustin Pop
    @type state: dict
156 ea03467c Iustin Pop
    @param state: the serialized state
157 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
158 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
159 ea03467c Iustin Pop

160 ea03467c Iustin Pop
    """
161 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
162 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
163 85f03e0d Michael Hanselmann
    obj.status = state["status"]
164 85f03e0d Michael Hanselmann
    obj.result = state["result"]
165 85f03e0d Michael Hanselmann
    obj.log = state["log"]
166 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
167 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
168 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
169 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
170 f1da30e6 Michael Hanselmann
    return obj
171 f1da30e6 Michael Hanselmann
172 f1da30e6 Michael Hanselmann
  def Serialize(self):
173 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
174 ea03467c Iustin Pop

175 ea03467c Iustin Pop
    @rtype: dict
176 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
177 ea03467c Iustin Pop

178 ea03467c Iustin Pop
    """
179 6c5a7090 Michael Hanselmann
    return {
180 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
181 6c5a7090 Michael Hanselmann
      "status": self.status,
182 6c5a7090 Michael Hanselmann
      "result": self.result,
183 6c5a7090 Michael Hanselmann
      "log": self.log,
184 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
185 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
186 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
187 8f5c488d Michael Hanselmann
      "priority": self.priority,
188 6c5a7090 Michael Hanselmann
      }
189 f1048938 Iustin Pop
190 e2715f69 Michael Hanselmann
191 e2715f69 Michael Hanselmann
class _QueuedJob(object):
192 e2715f69 Michael Hanselmann
  """In-memory job representation.
193 e2715f69 Michael Hanselmann

194 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
195 ea03467c Iustin Pop
  be taken care of by users of this class.
196 ea03467c Iustin Pop

197 ea03467c Iustin Pop
  @type queue: L{JobQueue}
198 ea03467c Iustin Pop
  @ivar queue: the parent queue
199 ea03467c Iustin Pop
  @ivar id: the job ID
200 ea03467c Iustin Pop
  @type ops: list
201 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
202 ea03467c Iustin Pop
  @type log_serial: int
203 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
204 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
205 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
206 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
207 c0f6d0d8 Michael Hanselmann
  @ivar writable: Whether the job is allowed to be modified
208 e2715f69 Michael Hanselmann

209 e2715f69 Michael Hanselmann
  """
210 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
211 26d3fd2f Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
212 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
213 c0f6d0d8 Michael Hanselmann
               "__weakref__", "processor_lock", "writable"]
214 66d895a8 Iustin Pop
215 c0f6d0d8 Michael Hanselmann
  def __init__(self, queue, job_id, ops, writable):
216 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
217 ea03467c Iustin Pop

218 ea03467c Iustin Pop
    @type queue: L{JobQueue}
219 ea03467c Iustin Pop
    @param queue: our parent queue
220 ea03467c Iustin Pop
    @type job_id: job_id
221 ea03467c Iustin Pop
    @param job_id: our job id
222 ea03467c Iustin Pop
    @type ops: list
223 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
224 ea03467c Iustin Pop
        in _QueuedOpCodes
225 c0f6d0d8 Michael Hanselmann
    @type writable: bool
226 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
227 ea03467c Iustin Pop

228 ea03467c Iustin Pop
    """
229 e2715f69 Michael Hanselmann
    if not ops:
230 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
231 e2715f69 Michael Hanselmann
232 85f03e0d Michael Hanselmann
    self.queue = queue
233 76b62028 Iustin Pop
    self.id = int(job_id)
234 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
235 6c5a7090 Michael Hanselmann
    self.log_serial = 0
236 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
237 c56ec146 Iustin Pop
    self.start_timestamp = None
238 c56ec146 Iustin Pop
    self.end_timestamp = None
239 6c5a7090 Michael Hanselmann
240 c0f6d0d8 Michael Hanselmann
    self._InitInMemory(self, writable)
241 fa4aa6b4 Michael Hanselmann
242 fa4aa6b4 Michael Hanselmann
  @staticmethod
243 c0f6d0d8 Michael Hanselmann
  def _InitInMemory(obj, writable):
244 fa4aa6b4 Michael Hanselmann
    """Initializes in-memory variables.
245 fa4aa6b4 Michael Hanselmann

246 fa4aa6b4 Michael Hanselmann
    """
247 c0f6d0d8 Michael Hanselmann
    obj.writable = writable
248 03b63608 Michael Hanselmann
    obj.ops_iter = None
249 26d3fd2f Michael Hanselmann
    obj.cur_opctx = None
250 f8a4adfa Michael Hanselmann
251 f8a4adfa Michael Hanselmann
    # Read-only jobs are not processed and therefore don't need a lock
252 f8a4adfa Michael Hanselmann
    if writable:
253 f8a4adfa Michael Hanselmann
      obj.processor_lock = threading.Lock()
254 f8a4adfa Michael Hanselmann
    else:
255 f8a4adfa Michael Hanselmann
      obj.processor_lock = None
256 be760ba8 Michael Hanselmann
257 9fa2e150 Michael Hanselmann
  def __repr__(self):
258 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
259 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
260 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
261 9fa2e150 Michael Hanselmann
262 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
263 9fa2e150 Michael Hanselmann
264 f1da30e6 Michael Hanselmann
  @classmethod
265 c0f6d0d8 Michael Hanselmann
  def Restore(cls, queue, state, writable):
266 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
267 ea03467c Iustin Pop

268 ea03467c Iustin Pop
    @type queue: L{JobQueue}
269 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
270 ea03467c Iustin Pop
    @type state: dict
271 ea03467c Iustin Pop
    @param state: the serialized state
272 c0f6d0d8 Michael Hanselmann
    @type writable: bool
273 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
274 ea03467c Iustin Pop
    @rtype: _JobQueue
275 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
276 ea03467c Iustin Pop

277 ea03467c Iustin Pop
    """
278 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
279 85f03e0d Michael Hanselmann
    obj.queue = queue
280 76b62028 Iustin Pop
    obj.id = int(state["id"])
281 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
282 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
283 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
284 6c5a7090 Michael Hanselmann
285 6c5a7090 Michael Hanselmann
    obj.ops = []
286 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
287 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
288 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
289 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
290 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
291 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
292 6c5a7090 Michael Hanselmann
293 c0f6d0d8 Michael Hanselmann
    cls._InitInMemory(obj, writable)
294 be760ba8 Michael Hanselmann
295 f1da30e6 Michael Hanselmann
    return obj
296 f1da30e6 Michael Hanselmann
297 f1da30e6 Michael Hanselmann
  def Serialize(self):
298 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
299 ea03467c Iustin Pop

300 ea03467c Iustin Pop
    @rtype: dict
301 ea03467c Iustin Pop
    @return: the serialized state
302 ea03467c Iustin Pop

303 ea03467c Iustin Pop
    """
304 f1da30e6 Michael Hanselmann
    return {
305 f1da30e6 Michael Hanselmann
      "id": self.id,
306 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
307 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
308 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
309 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
310 f1da30e6 Michael Hanselmann
      }
311 f1da30e6 Michael Hanselmann
312 85f03e0d Michael Hanselmann
  def CalcStatus(self):
313 ea03467c Iustin Pop
    """Compute the status of this job.
314 ea03467c Iustin Pop

315 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
316 ea03467c Iustin Pop
    based on their status, computes the job status.
317 ea03467c Iustin Pop

318 ea03467c Iustin Pop
    The algorithm is:
319 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
320 ea03467c Iustin Pop
        status will be the same
321 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
322 ea03467c Iustin Pop
          - waitlock
323 fbf0262f Michael Hanselmann
          - canceling
324 ea03467c Iustin Pop
          - running
325 ea03467c Iustin Pop

326 ea03467c Iustin Pop
        will determine the job status
327 ea03467c Iustin Pop

328 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
329 ea03467c Iustin Pop
        and the job status will be the same
330 ea03467c Iustin Pop

331 ea03467c Iustin Pop
    @return: the job status
332 ea03467c Iustin Pop

333 ea03467c Iustin Pop
    """
334 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
335 e2715f69 Michael Hanselmann
336 e2715f69 Michael Hanselmann
    all_success = True
337 85f03e0d Michael Hanselmann
    for op in self.ops:
338 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
339 e2715f69 Michael Hanselmann
        continue
340 e2715f69 Michael Hanselmann
341 e2715f69 Michael Hanselmann
      all_success = False
342 e2715f69 Michael Hanselmann
343 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
344 e2715f69 Michael Hanselmann
        pass
345 47099cd1 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING:
346 47099cd1 Michael Hanselmann
        status = constants.JOB_STATUS_WAITING
347 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
348 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
349 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
350 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
351 fbf0262f Michael Hanselmann
        break
352 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
353 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
354 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
355 f1da30e6 Michael Hanselmann
        break
356 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
357 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
358 4cb1d919 Michael Hanselmann
        break
359 e2715f69 Michael Hanselmann
360 e2715f69 Michael Hanselmann
    if all_success:
361 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
362 e2715f69 Michael Hanselmann
363 e2715f69 Michael Hanselmann
    return status
364 e2715f69 Michael Hanselmann
365 8f5c488d Michael Hanselmann
  def CalcPriority(self):
366 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
367 8f5c488d Michael Hanselmann

368 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
369 8f5c488d Michael Hanselmann
    priority is used.
370 8f5c488d Michael Hanselmann

371 8f5c488d Michael Hanselmann
    @rtype: int
372 8f5c488d Michael Hanselmann

373 8f5c488d Michael Hanselmann
    """
374 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
375 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
376 8f5c488d Michael Hanselmann
377 8f5c488d Michael Hanselmann
    if not priorities:
378 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
379 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
380 8f5c488d Michael Hanselmann
381 8f5c488d Michael Hanselmann
    return min(priorities)
382 8f5c488d Michael Hanselmann
383 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
384 ea03467c Iustin Pop
    """Selectively returns the log entries.
385 ea03467c Iustin Pop

386 ea03467c Iustin Pop
    @type newer_than: None or int
387 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
388 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
389 ea03467c Iustin Pop
        than this value
390 ea03467c Iustin Pop
    @rtype: list
391 ea03467c Iustin Pop
    @return: the list of the log entries selected
392 ea03467c Iustin Pop

393 ea03467c Iustin Pop
    """
394 6c5a7090 Michael Hanselmann
    if newer_than is None:
395 6c5a7090 Michael Hanselmann
      serial = -1
396 6c5a7090 Michael Hanselmann
    else:
397 6c5a7090 Michael Hanselmann
      serial = newer_than
398 6c5a7090 Michael Hanselmann
399 6c5a7090 Michael Hanselmann
    entries = []
400 6c5a7090 Michael Hanselmann
    for op in self.ops:
401 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
402 6c5a7090 Michael Hanselmann
403 6c5a7090 Michael Hanselmann
    return entries
404 6c5a7090 Michael Hanselmann
405 6a290889 Guido Trotter
  def GetInfo(self, fields):
406 6a290889 Guido Trotter
    """Returns information about a job.
407 6a290889 Guido Trotter

408 6a290889 Guido Trotter
    @type fields: list
409 6a290889 Guido Trotter
    @param fields: names of fields to return
410 6a290889 Guido Trotter
    @rtype: list
411 6a290889 Guido Trotter
    @return: list with one element for each field
412 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
413 6a290889 Guido Trotter
        has been passed
414 6a290889 Guido Trotter

415 6a290889 Guido Trotter
    """
416 a06c6ae8 Michael Hanselmann
    return _SimpleJobQuery(fields)(self)
417 6a290889 Guido Trotter
418 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
419 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
420 34327f51 Iustin Pop

421 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
422 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
423 34327f51 Iustin Pop
    finalised are not changed.
424 34327f51 Iustin Pop

425 34327f51 Iustin Pop
    @param status: a given opcode status
426 34327f51 Iustin Pop
    @param result: the opcode result
427 34327f51 Iustin Pop

428 34327f51 Iustin Pop
    """
429 747f6113 Michael Hanselmann
    not_marked = True
430 747f6113 Michael Hanselmann
    for op in self.ops:
431 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
432 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
433 747f6113 Michael Hanselmann
        continue
434 747f6113 Michael Hanselmann
      op.status = status
435 747f6113 Michael Hanselmann
      op.result = result
436 747f6113 Michael Hanselmann
      not_marked = False
437 34327f51 Iustin Pop
438 66bd7445 Michael Hanselmann
  def Finalize(self):
439 66bd7445 Michael Hanselmann
    """Marks the job as finalized.
440 66bd7445 Michael Hanselmann

441 66bd7445 Michael Hanselmann
    """
442 66bd7445 Michael Hanselmann
    self.end_timestamp = TimeStampNow()
443 66bd7445 Michael Hanselmann
444 099b2870 Michael Hanselmann
  def Cancel(self):
445 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
446 a0d2fe2c Michael Hanselmann

447 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
448 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
449 a0d2fe2c Michael Hanselmann
      as canceling and a text message
450 a0d2fe2c Michael Hanselmann

451 a0d2fe2c Michael Hanselmann
    """
452 099b2870 Michael Hanselmann
    status = self.CalcStatus()
453 099b2870 Michael Hanselmann
454 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
455 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
456 099b2870 Michael Hanselmann
                             "Job canceled by request")
457 66bd7445 Michael Hanselmann
      self.Finalize()
458 86b16e9d Michael Hanselmann
      return (True, "Job %s canceled" % self.id)
459 099b2870 Michael Hanselmann
460 47099cd1 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITING:
461 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
462 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
463 86b16e9d Michael Hanselmann
      return (True, "Job %s will be canceled" % self.id)
464 099b2870 Michael Hanselmann
465 86b16e9d Michael Hanselmann
    else:
466 86b16e9d Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
467 86b16e9d Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
468 099b2870 Michael Hanselmann
469 f1048938 Iustin Pop
470 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
471 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
472 031a3e57 Michael Hanselmann
    """Initializes this class.
473 ea03467c Iustin Pop

474 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
475 031a3e57 Michael Hanselmann
    @param queue: Job queue
476 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
477 031a3e57 Michael Hanselmann
    @param job: Job object
478 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
479 031a3e57 Michael Hanselmann
    @param op: OpCode
480 031a3e57 Michael Hanselmann

481 031a3e57 Michael Hanselmann
    """
482 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
483 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
484 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
485 031a3e57 Michael Hanselmann
486 031a3e57 Michael Hanselmann
    self._queue = queue
487 031a3e57 Michael Hanselmann
    self._job = job
488 031a3e57 Michael Hanselmann
    self._op = op
489 031a3e57 Michael Hanselmann
490 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
491 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
492 dc1e2262 Michael Hanselmann

493 dc1e2262 Michael Hanselmann
    """
494 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
495 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
496 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
497 dc1e2262 Michael Hanselmann
      raise CancelJob()
498 dc1e2262 Michael Hanselmann
499 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
500 031a3e57 Michael Hanselmann
  def NotifyStart(self):
501 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
502 e92376d7 Iustin Pop

503 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
504 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
505 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
506 47099cd1 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
507 e92376d7 Iustin Pop

508 e92376d7 Iustin Pop
    """
509 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
510 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
511 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
512 fbf0262f Michael Hanselmann
513 271daef8 Iustin Pop
    # Cancel here if we were asked to
514 dc1e2262 Michael Hanselmann
    self._CheckCancel()
515 fbf0262f Michael Hanselmann
516 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
517 9bdab621 Michael Hanselmann
518 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
519 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
520 271daef8 Iustin Pop
521 271daef8 Iustin Pop
    # And finally replicate the job status
522 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
523 031a3e57 Michael Hanselmann
524 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
525 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
526 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
527 9bf5e01f Guido Trotter

528 9bf5e01f Guido Trotter
    """
529 9bf5e01f Guido Trotter
    self._job.log_serial += 1
530 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
531 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
532 9bf5e01f Guido Trotter
533 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
534 031a3e57 Michael Hanselmann
    """Append a log entry.
535 031a3e57 Michael Hanselmann

536 031a3e57 Michael Hanselmann
    """
537 031a3e57 Michael Hanselmann
    assert len(args) < 3
538 031a3e57 Michael Hanselmann
539 031a3e57 Michael Hanselmann
    if len(args) == 1:
540 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
541 031a3e57 Michael Hanselmann
      log_msg = args[0]
542 031a3e57 Michael Hanselmann
    else:
543 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
544 031a3e57 Michael Hanselmann
545 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
546 031a3e57 Michael Hanselmann
    # precision.
547 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
548 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
549 031a3e57 Michael Hanselmann
550 acf931b7 Michael Hanselmann
  def CheckCancel(self):
551 acf931b7 Michael Hanselmann
    """Check whether job has been cancelled.
552 ef2df7d3 Michael Hanselmann

553 ef2df7d3 Michael Hanselmann
    """
554 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
555 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
556 dc1e2262 Michael Hanselmann
557 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
558 dc1e2262 Michael Hanselmann
    self._CheckCancel()
559 dc1e2262 Michael Hanselmann
560 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
561 6a373640 Michael Hanselmann
    """Submits jobs for processing.
562 6a373640 Michael Hanselmann

563 6a373640 Michael Hanselmann
    See L{JobQueue.SubmitManyJobs}.
564 6a373640 Michael Hanselmann

565 6a373640 Michael Hanselmann
    """
566 6a373640 Michael Hanselmann
    # Locking is done in job queue
567 6a373640 Michael Hanselmann
    return self._queue.SubmitManyJobs(jobs)
568 6a373640 Michael Hanselmann
569 031a3e57 Michael Hanselmann
570 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
571 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
572 989a8bee Michael Hanselmann
    """Initializes this class.
573 6c2549d6 Guido Trotter

574 989a8bee Michael Hanselmann
    @type fields: list of strings
575 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
576 989a8bee Michael Hanselmann
    @type prev_job_info: string
577 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
578 989a8bee Michael Hanselmann
    @type prev_log_serial: string
579 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
580 6c2549d6 Guido Trotter

581 989a8bee Michael Hanselmann
    """
582 dc2879ea Michael Hanselmann
    self._squery = _SimpleJobQuery(fields)
583 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
584 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
585 6c2549d6 Guido Trotter
586 989a8bee Michael Hanselmann
  def __call__(self, job):
587 989a8bee Michael Hanselmann
    """Checks whether job has changed.
588 6c2549d6 Guido Trotter

589 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
590 989a8bee Michael Hanselmann
    @param job: Job object
591 6c2549d6 Guido Trotter

592 6c2549d6 Guido Trotter
    """
593 c0f6d0d8 Michael Hanselmann
    assert not job.writable, "Expected read-only job"
594 c0f6d0d8 Michael Hanselmann
595 989a8bee Michael Hanselmann
    status = job.CalcStatus()
596 dc2879ea Michael Hanselmann
    job_info = self._squery(job)
597 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
598 6c2549d6 Guido Trotter
599 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
600 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
601 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
602 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
603 6c2549d6 Guido Trotter
    # significantly different.
604 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
605 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
606 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
607 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
608 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
609 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
610 6c2549d6 Guido Trotter
611 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
612 6c2549d6 Guido Trotter
    # no changes.
613 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
614 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
615 47099cd1 Michael Hanselmann
                       constants.JOB_STATUS_WAITING) or
616 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
617 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
618 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
619 989a8bee Michael Hanselmann
      return (job_info, log_entries)
620 6c2549d6 Guido Trotter
621 989a8bee Michael Hanselmann
    return None
622 989a8bee Michael Hanselmann
623 989a8bee Michael Hanselmann
624 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
625 989a8bee Michael Hanselmann
  def __init__(self, filename):
626 989a8bee Michael Hanselmann
    """Initializes this class.
627 989a8bee Michael Hanselmann

628 989a8bee Michael Hanselmann
    @type filename: string
629 989a8bee Michael Hanselmann
    @param filename: Path to job file
630 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
631 6c2549d6 Guido Trotter

632 989a8bee Michael Hanselmann
    """
633 989a8bee Michael Hanselmann
    self._wm = pyinotify.WatchManager()
634 989a8bee Michael Hanselmann
    self._inotify_handler = \
635 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
636 989a8bee Michael Hanselmann
    self._notifier = \
637 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
638 989a8bee Michael Hanselmann
    try:
639 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
640 989a8bee Michael Hanselmann
    except Exception:
641 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
642 989a8bee Michael Hanselmann
      self._notifier.stop()
643 989a8bee Michael Hanselmann
      raise
644 989a8bee Michael Hanselmann
645 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
646 989a8bee Michael Hanselmann
    """Callback for inotify.
647 989a8bee Michael Hanselmann

648 989a8bee Michael Hanselmann
    """
649 6c2549d6 Guido Trotter
    if not notifier_enabled:
650 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
651 989a8bee Michael Hanselmann
652 989a8bee Michael Hanselmann
  def Wait(self, timeout):
653 989a8bee Michael Hanselmann
    """Waits for the job file to change.
654 989a8bee Michael Hanselmann

655 989a8bee Michael Hanselmann
    @type timeout: float
656 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
657 989a8bee Michael Hanselmann
    @return: Whether there have been events
658 989a8bee Michael Hanselmann

659 989a8bee Michael Hanselmann
    """
660 989a8bee Michael Hanselmann
    assert timeout >= 0
661 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
662 989a8bee Michael Hanselmann
    if have_events:
663 989a8bee Michael Hanselmann
      self._notifier.read_events()
664 989a8bee Michael Hanselmann
    self._notifier.process_events()
665 989a8bee Michael Hanselmann
    return have_events
666 989a8bee Michael Hanselmann
667 989a8bee Michael Hanselmann
  def Close(self):
668 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
669 989a8bee Michael Hanselmann

670 989a8bee Michael Hanselmann
    """
671 989a8bee Michael Hanselmann
    self._notifier.stop()
672 989a8bee Michael Hanselmann
673 989a8bee Michael Hanselmann
674 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
675 989a8bee Michael Hanselmann
  def __init__(self, filename):
676 989a8bee Michael Hanselmann
    """Initializes this class.
677 989a8bee Michael Hanselmann

678 989a8bee Michael Hanselmann
    @type filename: string
679 989a8bee Michael Hanselmann
    @param filename: Path to job file
680 989a8bee Michael Hanselmann

681 989a8bee Michael Hanselmann
    """
682 989a8bee Michael Hanselmann
    self._filewaiter = None
683 989a8bee Michael Hanselmann
    self._filename = filename
684 6c2549d6 Guido Trotter
685 989a8bee Michael Hanselmann
  def Wait(self, timeout):
686 989a8bee Michael Hanselmann
    """Waits for a job to change.
687 6c2549d6 Guido Trotter

688 989a8bee Michael Hanselmann
    @type timeout: float
689 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
690 989a8bee Michael Hanselmann
    @return: Whether there have been events
691 989a8bee Michael Hanselmann

692 989a8bee Michael Hanselmann
    """
693 989a8bee Michael Hanselmann
    if self._filewaiter:
694 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
695 989a8bee Michael Hanselmann
696 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
697 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
698 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
699 989a8bee Michael Hanselmann
    # race condition.
700 989a8bee Michael Hanselmann
    self._filewaiter = _JobFileChangesWaiter(self._filename)
701 989a8bee Michael Hanselmann
702 989a8bee Michael Hanselmann
    return True
703 989a8bee Michael Hanselmann
704 989a8bee Michael Hanselmann
  def Close(self):
705 989a8bee Michael Hanselmann
    """Closes underlying waiter.
706 989a8bee Michael Hanselmann

707 989a8bee Michael Hanselmann
    """
708 989a8bee Michael Hanselmann
    if self._filewaiter:
709 989a8bee Michael Hanselmann
      self._filewaiter.Close()
710 989a8bee Michael Hanselmann
711 989a8bee Michael Hanselmann
712 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
713 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
714 989a8bee Michael Hanselmann

715 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
716 989a8bee Michael Hanselmann
  the current job status has changed.
717 989a8bee Michael Hanselmann

718 989a8bee Michael Hanselmann
  """
719 989a8bee Michael Hanselmann
  @staticmethod
720 dfc8824a Michael Hanselmann
  def _CheckForChanges(counter, job_load_fn, check_fn):
721 dfc8824a Michael Hanselmann
    if counter.next() > 0:
722 dfc8824a Michael Hanselmann
      # If this isn't the first check the job is given some more time to change
723 dfc8824a Michael Hanselmann
      # again. This gives better performance for jobs generating many
724 dfc8824a Michael Hanselmann
      # changes/messages.
725 dfc8824a Michael Hanselmann
      time.sleep(0.1)
726 dfc8824a Michael Hanselmann
727 989a8bee Michael Hanselmann
    job = job_load_fn()
728 989a8bee Michael Hanselmann
    if not job:
729 989a8bee Michael Hanselmann
      raise errors.JobLost()
730 989a8bee Michael Hanselmann
731 989a8bee Michael Hanselmann
    result = check_fn(job)
732 989a8bee Michael Hanselmann
    if result is None:
733 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
734 989a8bee Michael Hanselmann
735 989a8bee Michael Hanselmann
    return result
736 989a8bee Michael Hanselmann
737 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
738 989a8bee Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout):
739 989a8bee Michael Hanselmann
    """Waits for changes on a job.
740 989a8bee Michael Hanselmann

741 989a8bee Michael Hanselmann
    @type filename: string
742 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
743 989a8bee Michael Hanselmann
    @type job_load_fn: callable
744 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
745 989a8bee Michael Hanselmann
    @type fields: list of strings
746 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
747 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
748 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
749 989a8bee Michael Hanselmann
    @type prev_log_serial: int
750 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
751 989a8bee Michael Hanselmann
    @type timeout: float
752 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
753 989a8bee Michael Hanselmann

754 989a8bee Michael Hanselmann
    """
755 dfc8824a Michael Hanselmann
    counter = itertools.count()
756 6c2549d6 Guido Trotter
    try:
757 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
758 989a8bee Michael Hanselmann
      waiter = _JobChangesWaiter(filename)
759 989a8bee Michael Hanselmann
      try:
760 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
761 dfc8824a Michael Hanselmann
                                          counter, job_load_fn, check_fn),
762 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
763 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
764 989a8bee Michael Hanselmann
      finally:
765 989a8bee Michael Hanselmann
        waiter.Close()
766 6c2549d6 Guido Trotter
    except (errors.InotifyError, errors.JobLost):
767 6c2549d6 Guido Trotter
      return None
768 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
769 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
770 6c2549d6 Guido Trotter
771 6c2549d6 Guido Trotter
772 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
773 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
774 6760e4ed Michael Hanselmann

775 6760e4ed Michael Hanselmann
  """
776 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
777 6760e4ed Michael Hanselmann
    to_encode = err
778 6760e4ed Michael Hanselmann
  else:
779 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
780 6760e4ed Michael Hanselmann
781 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
782 6760e4ed Michael Hanselmann
783 6760e4ed Michael Hanselmann
784 26d3fd2f Michael Hanselmann
class _TimeoutStrategyWrapper:
785 26d3fd2f Michael Hanselmann
  def __init__(self, fn):
786 26d3fd2f Michael Hanselmann
    """Initializes this class.
787 26d3fd2f Michael Hanselmann

788 26d3fd2f Michael Hanselmann
    """
789 26d3fd2f Michael Hanselmann
    self._fn = fn
790 26d3fd2f Michael Hanselmann
    self._next = None
791 26d3fd2f Michael Hanselmann
792 26d3fd2f Michael Hanselmann
  def _Advance(self):
793 26d3fd2f Michael Hanselmann
    """Gets the next timeout if necessary.
794 26d3fd2f Michael Hanselmann

795 26d3fd2f Michael Hanselmann
    """
796 26d3fd2f Michael Hanselmann
    if self._next is None:
797 26d3fd2f Michael Hanselmann
      self._next = self._fn()
798 26d3fd2f Michael Hanselmann
799 26d3fd2f Michael Hanselmann
  def Peek(self):
800 26d3fd2f Michael Hanselmann
    """Returns the next timeout.
801 26d3fd2f Michael Hanselmann

802 26d3fd2f Michael Hanselmann
    """
803 26d3fd2f Michael Hanselmann
    self._Advance()
804 26d3fd2f Michael Hanselmann
    return self._next
805 26d3fd2f Michael Hanselmann
806 26d3fd2f Michael Hanselmann
  def Next(self):
807 26d3fd2f Michael Hanselmann
    """Returns the current timeout and advances the internal state.
808 26d3fd2f Michael Hanselmann

809 26d3fd2f Michael Hanselmann
    """
810 26d3fd2f Michael Hanselmann
    self._Advance()
811 26d3fd2f Michael Hanselmann
    result = self._next
812 26d3fd2f Michael Hanselmann
    self._next = None
813 26d3fd2f Michael Hanselmann
    return result
814 26d3fd2f Michael Hanselmann
815 26d3fd2f Michael Hanselmann
816 b80cc518 Michael Hanselmann
class _OpExecContext:
817 26d3fd2f Michael Hanselmann
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
818 b80cc518 Michael Hanselmann
    """Initializes this class.
819 b80cc518 Michael Hanselmann

820 b80cc518 Michael Hanselmann
    """
821 b80cc518 Michael Hanselmann
    self.op = op
822 b80cc518 Michael Hanselmann
    self.index = index
823 b80cc518 Michael Hanselmann
    self.log_prefix = log_prefix
824 b80cc518 Michael Hanselmann
    self.summary = op.input.Summary()
825 b80cc518 Michael Hanselmann
826 b95479a5 Michael Hanselmann
    # Create local copy to modify
827 b95479a5 Michael Hanselmann
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
828 b95479a5 Michael Hanselmann
      self.jobdeps = op.input.depends[:]
829 b95479a5 Michael Hanselmann
    else:
830 b95479a5 Michael Hanselmann
      self.jobdeps = None
831 b95479a5 Michael Hanselmann
832 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = timeout_strategy_factory
833 26d3fd2f Michael Hanselmann
    self._ResetTimeoutStrategy()
834 26d3fd2f Michael Hanselmann
835 26d3fd2f Michael Hanselmann
  def _ResetTimeoutStrategy(self):
836 26d3fd2f Michael Hanselmann
    """Creates a new timeout strategy.
837 26d3fd2f Michael Hanselmann

838 26d3fd2f Michael Hanselmann
    """
839 26d3fd2f Michael Hanselmann
    self._timeout_strategy = \
840 26d3fd2f Michael Hanselmann
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
841 26d3fd2f Michael Hanselmann
842 26d3fd2f Michael Hanselmann
  def CheckPriorityIncrease(self):
843 26d3fd2f Michael Hanselmann
    """Checks whether priority can and should be increased.
844 26d3fd2f Michael Hanselmann

845 26d3fd2f Michael Hanselmann
    Called when locks couldn't be acquired.
846 26d3fd2f Michael Hanselmann

847 26d3fd2f Michael Hanselmann
    """
848 26d3fd2f Michael Hanselmann
    op = self.op
849 26d3fd2f Michael Hanselmann
850 26d3fd2f Michael Hanselmann
    # Exhausted all retries and next round should not use blocking acquire
851 26d3fd2f Michael Hanselmann
    # for locks?
852 26d3fd2f Michael Hanselmann
    if (self._timeout_strategy.Peek() is None and
853 26d3fd2f Michael Hanselmann
        op.priority > constants.OP_PRIO_HIGHEST):
854 26d3fd2f Michael Hanselmann
      logging.debug("Increasing priority")
855 26d3fd2f Michael Hanselmann
      op.priority -= 1
856 26d3fd2f Michael Hanselmann
      self._ResetTimeoutStrategy()
857 26d3fd2f Michael Hanselmann
      return True
858 26d3fd2f Michael Hanselmann
859 26d3fd2f Michael Hanselmann
    return False
860 26d3fd2f Michael Hanselmann
861 26d3fd2f Michael Hanselmann
  def GetNextLockTimeout(self):
862 26d3fd2f Michael Hanselmann
    """Returns the next lock acquire timeout.
863 26d3fd2f Michael Hanselmann

864 26d3fd2f Michael Hanselmann
    """
865 26d3fd2f Michael Hanselmann
    return self._timeout_strategy.Next()
866 26d3fd2f Michael Hanselmann
867 b80cc518 Michael Hanselmann
868 be760ba8 Michael Hanselmann
class _JobProcessor(object):
869 75d81fc8 Michael Hanselmann
  (DEFER,
870 75d81fc8 Michael Hanselmann
   WAITDEP,
871 75d81fc8 Michael Hanselmann
   FINISHED) = range(1, 4)
872 75d81fc8 Michael Hanselmann
873 26d3fd2f Michael Hanselmann
  def __init__(self, queue, opexec_fn, job,
874 26d3fd2f Michael Hanselmann
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
875 be760ba8 Michael Hanselmann
    """Initializes this class.
876 be760ba8 Michael Hanselmann

877 be760ba8 Michael Hanselmann
    """
878 be760ba8 Michael Hanselmann
    self.queue = queue
879 be760ba8 Michael Hanselmann
    self.opexec_fn = opexec_fn
880 be760ba8 Michael Hanselmann
    self.job = job
881 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = _timeout_strategy_factory
882 be760ba8 Michael Hanselmann
883 be760ba8 Michael Hanselmann
  @staticmethod
884 26d3fd2f Michael Hanselmann
  def _FindNextOpcode(job, timeout_strategy_factory):
885 be760ba8 Michael Hanselmann
    """Locates the next opcode to run.
886 be760ba8 Michael Hanselmann

887 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
888 be760ba8 Michael Hanselmann
    @param job: Job object
889 26d3fd2f Michael Hanselmann
    @param timeout_strategy_factory: Callable to create new timeout strategy
890 be760ba8 Michael Hanselmann

891 be760ba8 Michael Hanselmann
    """
892 be760ba8 Michael Hanselmann
    # Create some sort of a cache to speed up locating next opcode for future
893 be760ba8 Michael Hanselmann
    # lookups
894 be760ba8 Michael Hanselmann
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
895 be760ba8 Michael Hanselmann
    # pending and one for processed ops.
896 03b63608 Michael Hanselmann
    if job.ops_iter is None:
897 03b63608 Michael Hanselmann
      job.ops_iter = enumerate(job.ops)
898 be760ba8 Michael Hanselmann
899 be760ba8 Michael Hanselmann
    # Find next opcode to run
900 be760ba8 Michael Hanselmann
    while True:
901 be760ba8 Michael Hanselmann
      try:
902 03b63608 Michael Hanselmann
        (idx, op) = job.ops_iter.next()
903 be760ba8 Michael Hanselmann
      except StopIteration:
904 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for a finished job")
905 be760ba8 Michael Hanselmann
906 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_RUNNING:
907 be760ba8 Michael Hanselmann
        # Found an opcode already marked as running
908 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for job marked as running")
909 be760ba8 Michael Hanselmann
910 26d3fd2f Michael Hanselmann
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
911 26d3fd2f Michael Hanselmann
                             timeout_strategy_factory)
912 be760ba8 Michael Hanselmann
913 66bd7445 Michael Hanselmann
      if op.status not in constants.OPS_FINALIZED:
914 66bd7445 Michael Hanselmann
        return opctx
915 be760ba8 Michael Hanselmann
916 66bd7445 Michael Hanselmann
      # This is a job that was partially completed before master daemon
917 66bd7445 Michael Hanselmann
      # shutdown, so it can be expected that some opcodes are already
918 66bd7445 Michael Hanselmann
      # completed successfully (if any did error out, then the whole job
919 66bd7445 Michael Hanselmann
      # should have been aborted and not resubmitted for processing).
920 66bd7445 Michael Hanselmann
      logging.info("%s: opcode %s already processed, skipping",
921 66bd7445 Michael Hanselmann
                   opctx.log_prefix, opctx.summary)
922 be760ba8 Michael Hanselmann
923 be760ba8 Michael Hanselmann
  @staticmethod
924 be760ba8 Michael Hanselmann
  def _MarkWaitlock(job, op):
925 be760ba8 Michael Hanselmann
    """Marks an opcode as waiting for locks.
926 be760ba8 Michael Hanselmann

927 be760ba8 Michael Hanselmann
    The job's start timestamp is also set if necessary.
928 be760ba8 Michael Hanselmann

929 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
930 be760ba8 Michael Hanselmann
    @param job: Job object
931 a38e8674 Michael Hanselmann
    @type op: L{_QueuedOpCode}
932 a38e8674 Michael Hanselmann
    @param op: Opcode object
933 be760ba8 Michael Hanselmann

934 be760ba8 Michael Hanselmann
    """
935 be760ba8 Michael Hanselmann
    assert op in job.ops
936 5fd6b694 Michael Hanselmann
    assert op.status in (constants.OP_STATUS_QUEUED,
937 47099cd1 Michael Hanselmann
                         constants.OP_STATUS_WAITING)
938 5fd6b694 Michael Hanselmann
939 5fd6b694 Michael Hanselmann
    update = False
940 be760ba8 Michael Hanselmann
941 be760ba8 Michael Hanselmann
    op.result = None
942 5fd6b694 Michael Hanselmann
943 5fd6b694 Michael Hanselmann
    if op.status == constants.OP_STATUS_QUEUED:
944 47099cd1 Michael Hanselmann
      op.status = constants.OP_STATUS_WAITING
945 5fd6b694 Michael Hanselmann
      update = True
946 5fd6b694 Michael Hanselmann
947 5fd6b694 Michael Hanselmann
    if op.start_timestamp is None:
948 5fd6b694 Michael Hanselmann
      op.start_timestamp = TimeStampNow()
949 5fd6b694 Michael Hanselmann
      update = True
950 be760ba8 Michael Hanselmann
951 be760ba8 Michael Hanselmann
    if job.start_timestamp is None:
952 be760ba8 Michael Hanselmann
      job.start_timestamp = op.start_timestamp
953 5fd6b694 Michael Hanselmann
      update = True
954 5fd6b694 Michael Hanselmann
955 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
956 5fd6b694 Michael Hanselmann
957 5fd6b694 Michael Hanselmann
    return update
958 be760ba8 Michael Hanselmann
959 b95479a5 Michael Hanselmann
  @staticmethod
960 b95479a5 Michael Hanselmann
  def _CheckDependencies(queue, job, opctx):
961 b95479a5 Michael Hanselmann
    """Checks if an opcode has dependencies and if so, processes them.
962 b95479a5 Michael Hanselmann

963 b95479a5 Michael Hanselmann
    @type queue: L{JobQueue}
964 b95479a5 Michael Hanselmann
    @param queue: Queue object
965 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
966 b95479a5 Michael Hanselmann
    @param job: Job object
967 b95479a5 Michael Hanselmann
    @type opctx: L{_OpExecContext}
968 b95479a5 Michael Hanselmann
    @param opctx: Opcode execution context
969 b95479a5 Michael Hanselmann
    @rtype: bool
970 b95479a5 Michael Hanselmann
    @return: Whether opcode will be re-scheduled by dependency tracker
971 b95479a5 Michael Hanselmann

972 b95479a5 Michael Hanselmann
    """
973 b95479a5 Michael Hanselmann
    op = opctx.op
974 b95479a5 Michael Hanselmann
975 b95479a5 Michael Hanselmann
    result = False
976 b95479a5 Michael Hanselmann
977 b95479a5 Michael Hanselmann
    while opctx.jobdeps:
978 b95479a5 Michael Hanselmann
      (dep_job_id, dep_status) = opctx.jobdeps[0]
979 b95479a5 Michael Hanselmann
980 b95479a5 Michael Hanselmann
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
981 b95479a5 Michael Hanselmann
                                                          dep_status)
982 b95479a5 Michael Hanselmann
      assert ht.TNonEmptyString(depmsg), "No dependency message"
983 b95479a5 Michael Hanselmann
984 b95479a5 Michael Hanselmann
      logging.info("%s: %s", opctx.log_prefix, depmsg)
985 b95479a5 Michael Hanselmann
986 b95479a5 Michael Hanselmann
      if depresult == _JobDependencyManager.CONTINUE:
987 b95479a5 Michael Hanselmann
        # Remove dependency and continue
988 b95479a5 Michael Hanselmann
        opctx.jobdeps.pop(0)
989 b95479a5 Michael Hanselmann
990 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.WAIT:
991 b95479a5 Michael Hanselmann
        # Need to wait for notification, dependency tracker will re-add job
992 b95479a5 Michael Hanselmann
        # to workerpool
993 b95479a5 Michael Hanselmann
        result = True
994 b95479a5 Michael Hanselmann
        break
995 b95479a5 Michael Hanselmann
996 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.CANCEL:
997 b95479a5 Michael Hanselmann
        # Job was cancelled, cancel this job as well
998 b95479a5 Michael Hanselmann
        job.Cancel()
999 b95479a5 Michael Hanselmann
        assert op.status == constants.OP_STATUS_CANCELING
1000 b95479a5 Michael Hanselmann
        break
1001 b95479a5 Michael Hanselmann
1002 b95479a5 Michael Hanselmann
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1003 b95479a5 Michael Hanselmann
                         _JobDependencyManager.ERROR):
1004 b95479a5 Michael Hanselmann
        # Job failed or there was an error, this job must fail
1005 b95479a5 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1006 b95479a5 Michael Hanselmann
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1007 b95479a5 Michael Hanselmann
        break
1008 b95479a5 Michael Hanselmann
1009 b95479a5 Michael Hanselmann
      else:
1010 b95479a5 Michael Hanselmann
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1011 b95479a5 Michael Hanselmann
                                     depresult)
1012 b95479a5 Michael Hanselmann
1013 b95479a5 Michael Hanselmann
    return result
1014 b95479a5 Michael Hanselmann
1015 b80cc518 Michael Hanselmann
  def _ExecOpCodeUnlocked(self, opctx):
1016 be760ba8 Michael Hanselmann
    """Processes one opcode and returns the result.
1017 be760ba8 Michael Hanselmann

1018 be760ba8 Michael Hanselmann
    """
1019 b80cc518 Michael Hanselmann
    op = opctx.op
1020 b80cc518 Michael Hanselmann
1021 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1022 be760ba8 Michael Hanselmann
1023 26d3fd2f Michael Hanselmann
    timeout = opctx.GetNextLockTimeout()
1024 26d3fd2f Michael Hanselmann
1025 be760ba8 Michael Hanselmann
    try:
1026 be760ba8 Michael Hanselmann
      # Make sure not to hold queue lock while calling ExecOpCode
1027 be760ba8 Michael Hanselmann
      result = self.opexec_fn(op.input,
1028 26d3fd2f Michael Hanselmann
                              _OpExecCallbacks(self.queue, self.job, op),
1029 f23db633 Michael Hanselmann
                              timeout=timeout, priority=op.priority)
1030 26d3fd2f Michael Hanselmann
    except mcpu.LockAcquireTimeout:
1031 26d3fd2f Michael Hanselmann
      assert timeout is not None, "Received timeout for blocking acquire"
1032 26d3fd2f Michael Hanselmann
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1033 9e49dfc5 Michael Hanselmann
1034 47099cd1 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_WAITING,
1035 9e49dfc5 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1036 9e49dfc5 Michael Hanselmann
1037 9e49dfc5 Michael Hanselmann
      # Was job cancelled while we were waiting for the lock?
1038 9e49dfc5 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELING:
1039 9e49dfc5 Michael Hanselmann
        return (constants.OP_STATUS_CANCELING, None)
1040 9e49dfc5 Michael Hanselmann
1041 5fd6b694 Michael Hanselmann
      # Stay in waitlock while trying to re-acquire lock
1042 47099cd1 Michael Hanselmann
      return (constants.OP_STATUS_WAITING, None)
1043 be760ba8 Michael Hanselmann
    except CancelJob:
1044 b80cc518 Michael Hanselmann
      logging.exception("%s: Canceling job", opctx.log_prefix)
1045 be760ba8 Michael Hanselmann
      assert op.status == constants.OP_STATUS_CANCELING
1046 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_CANCELING, None)
1047 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1048 b80cc518 Michael Hanselmann
      logging.exception("%s: Caught exception in %s",
1049 b80cc518 Michael Hanselmann
                        opctx.log_prefix, opctx.summary)
1050 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1051 be760ba8 Michael Hanselmann
    else:
1052 b80cc518 Michael Hanselmann
      logging.debug("%s: %s successful",
1053 b80cc518 Michael Hanselmann
                    opctx.log_prefix, opctx.summary)
1054 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_SUCCESS, result)
1055 be760ba8 Michael Hanselmann
1056 26d3fd2f Michael Hanselmann
  def __call__(self, _nextop_fn=None):
1057 be760ba8 Michael Hanselmann
    """Continues execution of a job.
1058 be760ba8 Michael Hanselmann

1059 26d3fd2f Michael Hanselmann
    @param _nextop_fn: Callback function for tests
1060 75d81fc8 Michael Hanselmann
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1061 75d81fc8 Michael Hanselmann
      be deferred and C{WAITDEP} if the dependency manager
1062 75d81fc8 Michael Hanselmann
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1063 be760ba8 Michael Hanselmann

1064 be760ba8 Michael Hanselmann
    """
1065 be760ba8 Michael Hanselmann
    queue = self.queue
1066 be760ba8 Michael Hanselmann
    job = self.job
1067 be760ba8 Michael Hanselmann
1068 be760ba8 Michael Hanselmann
    logging.debug("Processing job %s", job.id)
1069 be760ba8 Michael Hanselmann
1070 be760ba8 Michael Hanselmann
    queue.acquire(shared=1)
1071 be760ba8 Michael Hanselmann
    try:
1072 be760ba8 Michael Hanselmann
      opcount = len(job.ops)
1073 be760ba8 Michael Hanselmann
1074 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Expected writable job"
1075 c0f6d0d8 Michael Hanselmann
1076 66bd7445 Michael Hanselmann
      # Don't do anything for finalized jobs
1077 66bd7445 Michael Hanselmann
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1078 75d81fc8 Michael Hanselmann
        return self.FINISHED
1079 66bd7445 Michael Hanselmann
1080 26d3fd2f Michael Hanselmann
      # Is a previous opcode still pending?
1081 26d3fd2f Michael Hanselmann
      if job.cur_opctx:
1082 26d3fd2f Michael Hanselmann
        opctx = job.cur_opctx
1083 5fd6b694 Michael Hanselmann
        job.cur_opctx = None
1084 26d3fd2f Michael Hanselmann
      else:
1085 26d3fd2f Michael Hanselmann
        if __debug__ and _nextop_fn:
1086 26d3fd2f Michael Hanselmann
          _nextop_fn()
1087 26d3fd2f Michael Hanselmann
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1088 26d3fd2f Michael Hanselmann
1089 b80cc518 Michael Hanselmann
      op = opctx.op
1090 be760ba8 Michael Hanselmann
1091 be760ba8 Michael Hanselmann
      # Consistency check
1092 be760ba8 Michael Hanselmann
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1093 66bd7445 Michael Hanselmann
                                     constants.OP_STATUS_CANCELING)
1094 5fd6b694 Michael Hanselmann
                        for i in job.ops[opctx.index + 1:])
1095 be760ba8 Michael Hanselmann
1096 be760ba8 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_QUEUED,
1097 47099cd1 Michael Hanselmann
                           constants.OP_STATUS_WAITING,
1098 66bd7445 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1099 be760ba8 Michael Hanselmann
1100 26d3fd2f Michael Hanselmann
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1101 26d3fd2f Michael Hanselmann
              op.priority >= constants.OP_PRIO_HIGHEST)
1102 26d3fd2f Michael Hanselmann
1103 b95479a5 Michael Hanselmann
      waitjob = None
1104 b95479a5 Michael Hanselmann
1105 66bd7445 Michael Hanselmann
      if op.status != constants.OP_STATUS_CANCELING:
1106 30c945d0 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
1107 47099cd1 Michael Hanselmann
                             constants.OP_STATUS_WAITING)
1108 30c945d0 Michael Hanselmann
1109 be760ba8 Michael Hanselmann
        # Prepare to start opcode
1110 5fd6b694 Michael Hanselmann
        if self._MarkWaitlock(job, op):
1111 5fd6b694 Michael Hanselmann
          # Write to disk
1112 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1113 be760ba8 Michael Hanselmann
1114 47099cd1 Michael Hanselmann
        assert op.status == constants.OP_STATUS_WAITING
1115 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1116 5fd6b694 Michael Hanselmann
        assert job.start_timestamp and op.start_timestamp
1117 b95479a5 Michael Hanselmann
        assert waitjob is None
1118 b95479a5 Michael Hanselmann
1119 b95479a5 Michael Hanselmann
        # Check if waiting for a job is necessary
1120 b95479a5 Michael Hanselmann
        waitjob = self._CheckDependencies(queue, job, opctx)
1121 be760ba8 Michael Hanselmann
1122 47099cd1 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_WAITING,
1123 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_CANCELING,
1124 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_ERROR)
1125 be760ba8 Michael Hanselmann
1126 b95479a5 Michael Hanselmann
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1127 b95479a5 Michael Hanselmann
                                         constants.OP_STATUS_ERROR)):
1128 b95479a5 Michael Hanselmann
          logging.info("%s: opcode %s waiting for locks",
1129 b95479a5 Michael Hanselmann
                       opctx.log_prefix, opctx.summary)
1130 be760ba8 Michael Hanselmann
1131 b95479a5 Michael Hanselmann
          assert not opctx.jobdeps, "Not all dependencies were removed"
1132 b95479a5 Michael Hanselmann
1133 b95479a5 Michael Hanselmann
          queue.release()
1134 b95479a5 Michael Hanselmann
          try:
1135 b95479a5 Michael Hanselmann
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1136 b95479a5 Michael Hanselmann
          finally:
1137 b95479a5 Michael Hanselmann
            queue.acquire(shared=1)
1138 b95479a5 Michael Hanselmann
1139 b95479a5 Michael Hanselmann
          op.status = op_status
1140 b95479a5 Michael Hanselmann
          op.result = op_result
1141 b95479a5 Michael Hanselmann
1142 b95479a5 Michael Hanselmann
          assert not waitjob
1143 be760ba8 Michael Hanselmann
1144 47099cd1 Michael Hanselmann
        if op.status == constants.OP_STATUS_WAITING:
1145 26d3fd2f Michael Hanselmann
          # Couldn't get locks in time
1146 26d3fd2f Michael Hanselmann
          assert not op.end_timestamp
1147 be760ba8 Michael Hanselmann
        else:
1148 26d3fd2f Michael Hanselmann
          # Finalize opcode
1149 26d3fd2f Michael Hanselmann
          op.end_timestamp = TimeStampNow()
1150 be760ba8 Michael Hanselmann
1151 26d3fd2f Michael Hanselmann
          if op.status == constants.OP_STATUS_CANCELING:
1152 26d3fd2f Michael Hanselmann
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1153 26d3fd2f Michael Hanselmann
                                  for i in job.ops[opctx.index:])
1154 26d3fd2f Michael Hanselmann
          else:
1155 26d3fd2f Michael Hanselmann
            assert op.status in constants.OPS_FINALIZED
1156 be760ba8 Michael Hanselmann
1157 47099cd1 Michael Hanselmann
      if op.status == constants.OP_STATUS_WAITING or waitjob:
1158 be760ba8 Michael Hanselmann
        finalize = False
1159 be760ba8 Michael Hanselmann
1160 b95479a5 Michael Hanselmann
        if not waitjob and opctx.CheckPriorityIncrease():
1161 5fd6b694 Michael Hanselmann
          # Priority was changed, need to update on-disk file
1162 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1163 be760ba8 Michael Hanselmann
1164 26d3fd2f Michael Hanselmann
        # Keep around for another round
1165 26d3fd2f Michael Hanselmann
        job.cur_opctx = opctx
1166 be760ba8 Michael Hanselmann
1167 26d3fd2f Michael Hanselmann
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1168 26d3fd2f Michael Hanselmann
                op.priority >= constants.OP_PRIO_HIGHEST)
1169 be760ba8 Michael Hanselmann
1170 26d3fd2f Michael Hanselmann
        # In no case must the status be finalized here
1171 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1172 be760ba8 Michael Hanselmann
1173 be760ba8 Michael Hanselmann
      else:
1174 26d3fd2f Michael Hanselmann
        # Ensure all opcodes so far have been successful
1175 26d3fd2f Michael Hanselmann
        assert (opctx.index == 0 or
1176 26d3fd2f Michael Hanselmann
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1177 26d3fd2f Michael Hanselmann
                           for i in job.ops[:opctx.index]))
1178 26d3fd2f Michael Hanselmann
1179 26d3fd2f Michael Hanselmann
        # Reset context
1180 26d3fd2f Michael Hanselmann
        job.cur_opctx = None
1181 26d3fd2f Michael Hanselmann
1182 26d3fd2f Michael Hanselmann
        if op.status == constants.OP_STATUS_SUCCESS:
1183 26d3fd2f Michael Hanselmann
          finalize = False
1184 26d3fd2f Michael Hanselmann
1185 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_ERROR:
1186 26d3fd2f Michael Hanselmann
          # Ensure failed opcode has an exception as its result
1187 26d3fd2f Michael Hanselmann
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1188 26d3fd2f Michael Hanselmann
1189 26d3fd2f Michael Hanselmann
          to_encode = errors.OpExecError("Preceding opcode failed")
1190 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1191 26d3fd2f Michael Hanselmann
                                _EncodeOpError(to_encode))
1192 26d3fd2f Michael Hanselmann
          finalize = True
1193 be760ba8 Michael Hanselmann
1194 26d3fd2f Michael Hanselmann
          # Consistency check
1195 26d3fd2f Michael Hanselmann
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1196 26d3fd2f Michael Hanselmann
                            errors.GetEncodedError(i.result)
1197 26d3fd2f Michael Hanselmann
                            for i in job.ops[opctx.index:])
1198 be760ba8 Michael Hanselmann
1199 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELING:
1200 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1201 26d3fd2f Michael Hanselmann
                                "Job canceled by request")
1202 26d3fd2f Michael Hanselmann
          finalize = True
1203 26d3fd2f Michael Hanselmann
1204 26d3fd2f Michael Hanselmann
        else:
1205 26d3fd2f Michael Hanselmann
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1206 26d3fd2f Michael Hanselmann
1207 66bd7445 Michael Hanselmann
        if opctx.index == (opcount - 1):
1208 66bd7445 Michael Hanselmann
          # Finalize on last opcode
1209 66bd7445 Michael Hanselmann
          finalize = True
1210 66bd7445 Michael Hanselmann
1211 66bd7445 Michael Hanselmann
        if finalize:
1212 26d3fd2f Michael Hanselmann
          # All opcodes have been run, finalize job
1213 66bd7445 Michael Hanselmann
          job.Finalize()
1214 26d3fd2f Michael Hanselmann
1215 26d3fd2f Michael Hanselmann
        # Write to disk. If the job status is final, this is the final write
1216 26d3fd2f Michael Hanselmann
        # allowed. Once the file has been written, it can be archived anytime.
1217 26d3fd2f Michael Hanselmann
        queue.UpdateJobUnlocked(job)
1218 be760ba8 Michael Hanselmann
1219 b95479a5 Michael Hanselmann
        assert not waitjob
1220 b95479a5 Michael Hanselmann
1221 66bd7445 Michael Hanselmann
        if finalize:
1222 26d3fd2f Michael Hanselmann
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1223 75d81fc8 Michael Hanselmann
          return self.FINISHED
1224 be760ba8 Michael Hanselmann
1225 b95479a5 Michael Hanselmann
      assert not waitjob or queue.depmgr.JobWaiting(job)
1226 b95479a5 Michael Hanselmann
1227 75d81fc8 Michael Hanselmann
      if waitjob:
1228 75d81fc8 Michael Hanselmann
        return self.WAITDEP
1229 75d81fc8 Michael Hanselmann
      else:
1230 75d81fc8 Michael Hanselmann
        return self.DEFER
1231 be760ba8 Michael Hanselmann
    finally:
1232 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Job became read-only while being processed"
1233 be760ba8 Michael Hanselmann
      queue.release()
1234 be760ba8 Michael Hanselmann
1235 be760ba8 Michael Hanselmann
1236 df5a5730 Michael Hanselmann
def _EvaluateJobProcessorResult(depmgr, job, result):
1237 df5a5730 Michael Hanselmann
  """Looks at a result from L{_JobProcessor} for a job.
1238 df5a5730 Michael Hanselmann

1239 df5a5730 Michael Hanselmann
  To be used in a L{_JobQueueWorker}.
1240 df5a5730 Michael Hanselmann

1241 df5a5730 Michael Hanselmann
  """
1242 df5a5730 Michael Hanselmann
  if result == _JobProcessor.FINISHED:
1243 df5a5730 Michael Hanselmann
    # Notify waiting jobs
1244 df5a5730 Michael Hanselmann
    depmgr.NotifyWaiters(job.id)
1245 df5a5730 Michael Hanselmann
1246 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.DEFER:
1247 df5a5730 Michael Hanselmann
    # Schedule again
1248 df5a5730 Michael Hanselmann
    raise workerpool.DeferTask(priority=job.CalcPriority())
1249 df5a5730 Michael Hanselmann
1250 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.WAITDEP:
1251 df5a5730 Michael Hanselmann
    # No-op, dependency manager will re-schedule
1252 df5a5730 Michael Hanselmann
    pass
1253 df5a5730 Michael Hanselmann
1254 df5a5730 Michael Hanselmann
  else:
1255 df5a5730 Michael Hanselmann
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1256 df5a5730 Michael Hanselmann
                                 (result, ))
1257 df5a5730 Michael Hanselmann
1258 df5a5730 Michael Hanselmann
1259 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
1260 031a3e57 Michael Hanselmann
  """The actual job workers.
1261 031a3e57 Michael Hanselmann

1262 031a3e57 Michael Hanselmann
  """
1263 b459a848 Andrea Spadaccini
  def RunTask(self, job): # pylint: disable=W0221
1264 e2715f69 Michael Hanselmann
    """Job executor.
1265 e2715f69 Michael Hanselmann

1266 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1267 ea03467c Iustin Pop
    @param job: the job to be processed
1268 ea03467c Iustin Pop

1269 e2715f69 Michael Hanselmann
    """
1270 f8a4adfa Michael Hanselmann
    assert job.writable, "Expected writable job"
1271 f8a4adfa Michael Hanselmann
1272 b95479a5 Michael Hanselmann
    # Ensure only one worker is active on a single job. If a job registers for
1273 b95479a5 Michael Hanselmann
    # a dependency job, and the other job notifies before the first worker is
1274 b95479a5 Michael Hanselmann
    # done, the job can end up in the tasklist more than once.
1275 b95479a5 Michael Hanselmann
    job.processor_lock.acquire()
1276 b95479a5 Michael Hanselmann
    try:
1277 b95479a5 Michael Hanselmann
      return self._RunTaskInner(job)
1278 b95479a5 Michael Hanselmann
    finally:
1279 b95479a5 Michael Hanselmann
      job.processor_lock.release()
1280 b95479a5 Michael Hanselmann
1281 b95479a5 Michael Hanselmann
  def _RunTaskInner(self, job):
1282 b95479a5 Michael Hanselmann
    """Executes a job.
1283 b95479a5 Michael Hanselmann

1284 b95479a5 Michael Hanselmann
    Must be called with per-job lock acquired.
1285 b95479a5 Michael Hanselmann

1286 b95479a5 Michael Hanselmann
    """
1287 be760ba8 Michael Hanselmann
    queue = job.queue
1288 be760ba8 Michael Hanselmann
    assert queue == self.pool.queue
1289 be760ba8 Michael Hanselmann
1290 0aeeb6e3 Michael Hanselmann
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1291 0aeeb6e3 Michael Hanselmann
    setname_fn(None)
1292 daba67c7 Michael Hanselmann
1293 be760ba8 Michael Hanselmann
    proc = mcpu.Processor(queue.context, job.id)
1294 be760ba8 Michael Hanselmann
1295 0aeeb6e3 Michael Hanselmann
    # Create wrapper for setting thread name
1296 0aeeb6e3 Michael Hanselmann
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1297 0aeeb6e3 Michael Hanselmann
                                    proc.ExecOpCode)
1298 0aeeb6e3 Michael Hanselmann
1299 df5a5730 Michael Hanselmann
    _EvaluateJobProcessorResult(queue.depmgr, job,
1300 df5a5730 Michael Hanselmann
                                _JobProcessor(queue, wrap_execop_fn, job)())
1301 75d81fc8 Michael Hanselmann
1302 0aeeb6e3 Michael Hanselmann
  @staticmethod
1303 0aeeb6e3 Michael Hanselmann
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1304 0aeeb6e3 Michael Hanselmann
    """Updates the worker thread name to include a short summary of the opcode.
1305 0aeeb6e3 Michael Hanselmann

1306 0aeeb6e3 Michael Hanselmann
    @param setname_fn: Callable setting worker thread name
1307 0aeeb6e3 Michael Hanselmann
    @param execop_fn: Callable for executing opcode (usually
1308 0aeeb6e3 Michael Hanselmann
                      L{mcpu.Processor.ExecOpCode})
1309 0aeeb6e3 Michael Hanselmann

1310 0aeeb6e3 Michael Hanselmann
    """
1311 0aeeb6e3 Michael Hanselmann
    setname_fn(op)
1312 0aeeb6e3 Michael Hanselmann
    try:
1313 0aeeb6e3 Michael Hanselmann
      return execop_fn(op, *args, **kwargs)
1314 0aeeb6e3 Michael Hanselmann
    finally:
1315 0aeeb6e3 Michael Hanselmann
      setname_fn(None)
1316 0aeeb6e3 Michael Hanselmann
1317 0aeeb6e3 Michael Hanselmann
  @staticmethod
1318 0aeeb6e3 Michael Hanselmann
  def _GetWorkerName(job, op):
1319 0aeeb6e3 Michael Hanselmann
    """Sets the worker thread name.
1320 0aeeb6e3 Michael Hanselmann

1321 0aeeb6e3 Michael Hanselmann
    @type job: L{_QueuedJob}
1322 0aeeb6e3 Michael Hanselmann
    @type op: L{opcodes.OpCode}
1323 0aeeb6e3 Michael Hanselmann

1324 0aeeb6e3 Michael Hanselmann
    """
1325 0aeeb6e3 Michael Hanselmann
    parts = ["Job%s" % job.id]
1326 0aeeb6e3 Michael Hanselmann
1327 0aeeb6e3 Michael Hanselmann
    if op:
1328 0aeeb6e3 Michael Hanselmann
      parts.append(op.TinySummary())
1329 0aeeb6e3 Michael Hanselmann
1330 0aeeb6e3 Michael Hanselmann
    return "/".join(parts)
1331 0aeeb6e3 Michael Hanselmann
1332 e2715f69 Michael Hanselmann
1333 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
1334 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
1335 ea03467c Iustin Pop

1336 ea03467c Iustin Pop
  """
1337 5bdce580 Michael Hanselmann
  def __init__(self, queue):
1338 0aeeb6e3 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("Jq",
1339 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
1340 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
1341 5bdce580 Michael Hanselmann
    self.queue = queue
1342 e2715f69 Michael Hanselmann
1343 e2715f69 Michael Hanselmann
1344 b95479a5 Michael Hanselmann
class _JobDependencyManager:
1345 b95479a5 Michael Hanselmann
  """Keeps track of job dependencies.
1346 b95479a5 Michael Hanselmann

1347 b95479a5 Michael Hanselmann
  """
1348 b95479a5 Michael Hanselmann
  (WAIT,
1349 b95479a5 Michael Hanselmann
   ERROR,
1350 b95479a5 Michael Hanselmann
   CANCEL,
1351 b95479a5 Michael Hanselmann
   CONTINUE,
1352 b95479a5 Michael Hanselmann
   WRONGSTATUS) = range(1, 6)
1353 b95479a5 Michael Hanselmann
1354 b95479a5 Michael Hanselmann
  def __init__(self, getstatus_fn, enqueue_fn):
1355 b95479a5 Michael Hanselmann
    """Initializes this class.
1356 b95479a5 Michael Hanselmann

1357 b95479a5 Michael Hanselmann
    """
1358 b95479a5 Michael Hanselmann
    self._getstatus_fn = getstatus_fn
1359 b95479a5 Michael Hanselmann
    self._enqueue_fn = enqueue_fn
1360 b95479a5 Michael Hanselmann
1361 b95479a5 Michael Hanselmann
    self._waiters = {}
1362 b95479a5 Michael Hanselmann
    self._lock = locking.SharedLock("JobDepMgr")
1363 b95479a5 Michael Hanselmann
1364 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1365 b459a848 Andrea Spadaccini
  def GetLockInfo(self, requested): # pylint: disable=W0613
1366 fcb21ad7 Michael Hanselmann
    """Retrieves information about waiting jobs.
1367 fcb21ad7 Michael Hanselmann

1368 fcb21ad7 Michael Hanselmann
    @type requested: set
1369 fcb21ad7 Michael Hanselmann
    @param requested: Requested information, see C{query.LQ_*}
1370 fcb21ad7 Michael Hanselmann

1371 fcb21ad7 Michael Hanselmann
    """
1372 fcb21ad7 Michael Hanselmann
    # No need to sort here, that's being done by the lock manager and query
1373 fcb21ad7 Michael Hanselmann
    # library. There are no priorities for notifying jobs, hence all show up as
1374 fcb21ad7 Michael Hanselmann
    # one item under "pending".
1375 fcb21ad7 Michael Hanselmann
    return [("job/%s" % job_id, None, None,
1376 fcb21ad7 Michael Hanselmann
             [("job", [job.id for job in waiters])])
1377 fcb21ad7 Michael Hanselmann
            for job_id, waiters in self._waiters.items()
1378 fcb21ad7 Michael Hanselmann
            if waiters]
1379 fcb21ad7 Michael Hanselmann
1380 fcb21ad7 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1381 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
1382 b95479a5 Michael Hanselmann
    """Checks if a job is waiting.
1383 b95479a5 Michael Hanselmann

1384 b95479a5 Michael Hanselmann
    """
1385 b95479a5 Michael Hanselmann
    return compat.any(job in jobs
1386 b95479a5 Michael Hanselmann
                      for jobs in self._waiters.values())
1387 b95479a5 Michael Hanselmann
1388 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1389 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1390 b95479a5 Michael Hanselmann
    """Checks if a dependency job has the requested status.
1391 b95479a5 Michael Hanselmann

1392 b95479a5 Michael Hanselmann
    If the other job is not yet in a finalized status, the calling job will be
1393 b95479a5 Michael Hanselmann
    notified (re-added to the workerpool) at a later point.
1394 b95479a5 Michael Hanselmann

1395 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1396 b95479a5 Michael Hanselmann
    @param job: Job object
1397 76b62028 Iustin Pop
    @type dep_job_id: int
1398 b95479a5 Michael Hanselmann
    @param dep_job_id: ID of dependency job
1399 b95479a5 Michael Hanselmann
    @type dep_status: list
1400 b95479a5 Michael Hanselmann
    @param dep_status: Required status
1401 b95479a5 Michael Hanselmann

1402 b95479a5 Michael Hanselmann
    """
1403 76b62028 Iustin Pop
    assert ht.TJobId(job.id)
1404 76b62028 Iustin Pop
    assert ht.TJobId(dep_job_id)
1405 b95479a5 Michael Hanselmann
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1406 b95479a5 Michael Hanselmann
1407 b95479a5 Michael Hanselmann
    if job.id == dep_job_id:
1408 b95479a5 Michael Hanselmann
      return (self.ERROR, "Job can't depend on itself")
1409 b95479a5 Michael Hanselmann
1410 b95479a5 Michael Hanselmann
    # Get status of dependency job
1411 b95479a5 Michael Hanselmann
    try:
1412 b95479a5 Michael Hanselmann
      status = self._getstatus_fn(dep_job_id)
1413 b95479a5 Michael Hanselmann
    except errors.JobLost, err:
1414 b95479a5 Michael Hanselmann
      return (self.ERROR, "Dependency error: %s" % err)
1415 b95479a5 Michael Hanselmann
1416 b95479a5 Michael Hanselmann
    assert status in constants.JOB_STATUS_ALL
1417 b95479a5 Michael Hanselmann
1418 b95479a5 Michael Hanselmann
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1419 b95479a5 Michael Hanselmann
1420 b95479a5 Michael Hanselmann
    if status not in constants.JOBS_FINALIZED:
1421 b95479a5 Michael Hanselmann
      # Register for notification and wait for job to finish
1422 b95479a5 Michael Hanselmann
      job_id_waiters.add(job)
1423 b95479a5 Michael Hanselmann
      return (self.WAIT,
1424 b95479a5 Michael Hanselmann
              "Need to wait for job %s, wanted status '%s'" %
1425 b95479a5 Michael Hanselmann
              (dep_job_id, dep_status))
1426 b95479a5 Michael Hanselmann
1427 b95479a5 Michael Hanselmann
    # Remove from waiters list
1428 b95479a5 Michael Hanselmann
    if job in job_id_waiters:
1429 b95479a5 Michael Hanselmann
      job_id_waiters.remove(job)
1430 b95479a5 Michael Hanselmann
1431 b95479a5 Michael Hanselmann
    if (status == constants.JOB_STATUS_CANCELED and
1432 b95479a5 Michael Hanselmann
        constants.JOB_STATUS_CANCELED not in dep_status):
1433 b95479a5 Michael Hanselmann
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1434 b95479a5 Michael Hanselmann
1435 b95479a5 Michael Hanselmann
    elif not dep_status or status in dep_status:
1436 b95479a5 Michael Hanselmann
      return (self.CONTINUE,
1437 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s'" %
1438 b95479a5 Michael Hanselmann
              (dep_job_id, status))
1439 b95479a5 Michael Hanselmann
1440 b95479a5 Michael Hanselmann
    else:
1441 b95479a5 Michael Hanselmann
      return (self.WRONGSTATUS,
1442 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s',"
1443 b95479a5 Michael Hanselmann
              " not one of '%s' as required" %
1444 b95479a5 Michael Hanselmann
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1445 b95479a5 Michael Hanselmann
1446 37d76f1e Michael Hanselmann
  def _RemoveEmptyWaitersUnlocked(self):
1447 37d76f1e Michael Hanselmann
    """Remove all jobs without actual waiters.
1448 37d76f1e Michael Hanselmann

1449 37d76f1e Michael Hanselmann
    """
1450 37d76f1e Michael Hanselmann
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1451 37d76f1e Michael Hanselmann
                   if not waiters]:
1452 37d76f1e Michael Hanselmann
      del self._waiters[job_id]
1453 37d76f1e Michael Hanselmann
1454 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
1455 b95479a5 Michael Hanselmann
    """Notifies all jobs waiting for a certain job ID.
1456 b95479a5 Michael Hanselmann

1457 1316ebc2 Michael Hanselmann
    @attention: Do not call until L{CheckAndRegister} returned a status other
1458 1316ebc2 Michael Hanselmann
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1459 76b62028 Iustin Pop
    @type job_id: int
1460 b95479a5 Michael Hanselmann
    @param job_id: Job ID
1461 b95479a5 Michael Hanselmann

1462 b95479a5 Michael Hanselmann
    """
1463 76b62028 Iustin Pop
    assert ht.TJobId(job_id)
1464 b95479a5 Michael Hanselmann
1465 37d76f1e Michael Hanselmann
    self._lock.acquire()
1466 37d76f1e Michael Hanselmann
    try:
1467 37d76f1e Michael Hanselmann
      self._RemoveEmptyWaitersUnlocked()
1468 37d76f1e Michael Hanselmann
1469 37d76f1e Michael Hanselmann
      jobs = self._waiters.pop(job_id, None)
1470 37d76f1e Michael Hanselmann
    finally:
1471 37d76f1e Michael Hanselmann
      self._lock.release()
1472 37d76f1e Michael Hanselmann
1473 b95479a5 Michael Hanselmann
    if jobs:
1474 b95479a5 Michael Hanselmann
      # Re-add jobs to workerpool
1475 b95479a5 Michael Hanselmann
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1476 b95479a5 Michael Hanselmann
                    len(jobs), job_id)
1477 b95479a5 Michael Hanselmann
      self._enqueue_fn(jobs)
1478 b95479a5 Michael Hanselmann
1479 b95479a5 Michael Hanselmann
1480 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
1481 6c881c52 Iustin Pop
  """Decorator for "public" functions.
1482 ea03467c Iustin Pop

1483 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
1484 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
1485 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
1486 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
1487 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
1488 ea03467c Iustin Pop

1489 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
1490 f1da30e6 Michael Hanselmann

1491 6c881c52 Iustin Pop
  Example::
1492 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
1493 6c881c52 Iustin Pop
    @_RequireOpenQueue
1494 6c881c52 Iustin Pop
    def Example(self):
1495 6c881c52 Iustin Pop
      pass
1496 db37da70 Michael Hanselmann

1497 6c881c52 Iustin Pop
  """
1498 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
1499 b459a848 Andrea Spadaccini
    # pylint: disable=W0212
1500 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
1501 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
1502 6c881c52 Iustin Pop
  return wrapper
1503 db37da70 Michael Hanselmann
1504 db37da70 Michael Hanselmann
1505 c8d0be94 Michael Hanselmann
def _RequireNonDrainedQueue(fn):
1506 c8d0be94 Michael Hanselmann
  """Decorator checking for a non-drained queue.
1507 c8d0be94 Michael Hanselmann

1508 c8d0be94 Michael Hanselmann
  To be used with functions submitting new jobs.
1509 c8d0be94 Michael Hanselmann

1510 c8d0be94 Michael Hanselmann
  """
1511 c8d0be94 Michael Hanselmann
  def wrapper(self, *args, **kwargs):
1512 c8d0be94 Michael Hanselmann
    """Wrapper function.
1513 c8d0be94 Michael Hanselmann

1514 c8d0be94 Michael Hanselmann
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1515 c8d0be94 Michael Hanselmann

1516 c8d0be94 Michael Hanselmann
    """
1517 c8d0be94 Michael Hanselmann
    # Ok when sharing the big job queue lock, as the drain file is created when
1518 c8d0be94 Michael Hanselmann
    # the lock is exclusive.
1519 c8d0be94 Michael Hanselmann
    # Needs access to protected member, pylint: disable=W0212
1520 c8d0be94 Michael Hanselmann
    if self._drained:
1521 c8d0be94 Michael Hanselmann
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1522 6d5ea385 Michael Hanselmann
1523 6d5ea385 Michael Hanselmann
    if not self._accepting_jobs:
1524 6d5ea385 Michael Hanselmann
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1525 6d5ea385 Michael Hanselmann
1526 c8d0be94 Michael Hanselmann
    return fn(self, *args, **kwargs)
1527 c8d0be94 Michael Hanselmann
  return wrapper
1528 c8d0be94 Michael Hanselmann
1529 c8d0be94 Michael Hanselmann
1530 6c881c52 Iustin Pop
class JobQueue(object):
1531 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
1532 db37da70 Michael Hanselmann

1533 6c881c52 Iustin Pop
  """
1534 85f03e0d Michael Hanselmann
  def __init__(self, context):
1535 ea03467c Iustin Pop
    """Constructor for JobQueue.
1536 ea03467c Iustin Pop

1537 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
1538 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
1539 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
1540 ea03467c Iustin Pop
    running).
1541 ea03467c Iustin Pop

1542 ea03467c Iustin Pop
    @type context: GanetiContext
1543 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
1544 ea03467c Iustin Pop
        data and other ganeti objects
1545 ea03467c Iustin Pop

1546 ea03467c Iustin Pop
    """
1547 5bdce580 Michael Hanselmann
    self.context = context
1548 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
1549 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
1550 f1da30e6 Michael Hanselmann
1551 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
1552 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
1553 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
1554 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
1555 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
1556 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
1557 ebb80afa Guido Trotter
1558 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
1559 ebb80afa Guido Trotter
    self.release = self._lock.release
1560 85f03e0d Michael Hanselmann
1561 6d5ea385 Michael Hanselmann
    # Accept jobs by default
1562 6d5ea385 Michael Hanselmann
    self._accepting_jobs = True
1563 6d5ea385 Michael Hanselmann
1564 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
1565 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
1566 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1567 f1da30e6 Michael Hanselmann
1568 04ab05ce Michael Hanselmann
    # Read serial file
1569 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
1570 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
1571 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
1572 c4beba1c Iustin Pop
1573 23752136 Michael Hanselmann
    # Get initial list of nodes
1574 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
1575 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
1576 59303563 Iustin Pop
                       if n.master_candidate)
1577 8e00939c Michael Hanselmann
1578 8e00939c Michael Hanselmann
    # Remove master node
1579 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
1580 23752136 Michael Hanselmann
1581 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
1582 23752136 Michael Hanselmann
1583 6d5ea385 Michael Hanselmann
    self._queue_size = None
1584 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1585 6d5ea385 Michael Hanselmann
    assert ht.TInt(self._queue_size)
1586 ff699aa9 Michael Hanselmann
    self._drained = jstore.CheckDrainFlag()
1587 20571a26 Guido Trotter
1588 b95479a5 Michael Hanselmann
    # Job dependencies
1589 b95479a5 Michael Hanselmann
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1590 b95479a5 Michael Hanselmann
                                        self._EnqueueJobs)
1591 fcb21ad7 Michael Hanselmann
    self.context.glm.AddToLockMonitor(self.depmgr)
1592 b95479a5 Michael Hanselmann
1593 85f03e0d Michael Hanselmann
    # Setup worker pool
1594 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
1595 85f03e0d Michael Hanselmann
    try:
1596 de9d02c7 Michael Hanselmann
      self._InspectQueue()
1597 de9d02c7 Michael Hanselmann
    except:
1598 de9d02c7 Michael Hanselmann
      self._wpool.TerminateWorkers()
1599 de9d02c7 Michael Hanselmann
      raise
1600 711b5124 Michael Hanselmann
1601 de9d02c7 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1602 de9d02c7 Michael Hanselmann
  @_RequireOpenQueue
1603 de9d02c7 Michael Hanselmann
  def _InspectQueue(self):
1604 de9d02c7 Michael Hanselmann
    """Loads the whole job queue and resumes unfinished jobs.
1605 de9d02c7 Michael Hanselmann

1606 de9d02c7 Michael Hanselmann
    This function needs the lock here because WorkerPool.AddTask() may start a
1607 de9d02c7 Michael Hanselmann
    job while we're still doing our work.
1608 711b5124 Michael Hanselmann

1609 de9d02c7 Michael Hanselmann
    """
1610 de9d02c7 Michael Hanselmann
    logging.info("Inspecting job queue")
1611 de9d02c7 Michael Hanselmann
1612 7b5c4a69 Michael Hanselmann
    restartjobs = []
1613 7b5c4a69 Michael Hanselmann
1614 de9d02c7 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked()
1615 de9d02c7 Michael Hanselmann
    jobs_count = len(all_job_ids)
1616 de9d02c7 Michael Hanselmann
    lastinfo = time.time()
1617 de9d02c7 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1618 de9d02c7 Michael Hanselmann
      # Give an update every 1000 jobs or 10 seconds
1619 de9d02c7 Michael Hanselmann
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1620 de9d02c7 Michael Hanselmann
          idx == (jobs_count - 1)):
1621 de9d02c7 Michael Hanselmann
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1622 de9d02c7 Michael Hanselmann
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1623 711b5124 Michael Hanselmann
        lastinfo = time.time()
1624 94ed59a5 Iustin Pop
1625 de9d02c7 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1626 85f03e0d Michael Hanselmann
1627 de9d02c7 Michael Hanselmann
      # a failure in loading the job can cause 'None' to be returned
1628 de9d02c7 Michael Hanselmann
      if job is None:
1629 de9d02c7 Michael Hanselmann
        continue
1630 85f03e0d Michael Hanselmann
1631 de9d02c7 Michael Hanselmann
      status = job.CalcStatus()
1632 711b5124 Michael Hanselmann
1633 320d1daf Michael Hanselmann
      if status == constants.JOB_STATUS_QUEUED:
1634 7b5c4a69 Michael Hanselmann
        restartjobs.append(job)
1635 de9d02c7 Michael Hanselmann
1636 de9d02c7 Michael Hanselmann
      elif status in (constants.JOB_STATUS_RUNNING,
1637 47099cd1 Michael Hanselmann
                      constants.JOB_STATUS_WAITING,
1638 de9d02c7 Michael Hanselmann
                      constants.JOB_STATUS_CANCELING):
1639 de9d02c7 Michael Hanselmann
        logging.warning("Unfinished job %s found: %s", job.id, job)
1640 320d1daf Michael Hanselmann
1641 47099cd1 Michael Hanselmann
        if status == constants.JOB_STATUS_WAITING:
1642 320d1daf Michael Hanselmann
          # Restart job
1643 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1644 320d1daf Michael Hanselmann
          restartjobs.append(job)
1645 320d1daf Michael Hanselmann
        else:
1646 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1647 320d1daf Michael Hanselmann
                                "Unclean master daemon shutdown")
1648 45df0793 Michael Hanselmann
          job.Finalize()
1649 320d1daf Michael Hanselmann
1650 de9d02c7 Michael Hanselmann
        self.UpdateJobUnlocked(job)
1651 de9d02c7 Michael Hanselmann
1652 7b5c4a69 Michael Hanselmann
    if restartjobs:
1653 7b5c4a69 Michael Hanselmann
      logging.info("Restarting %s jobs", len(restartjobs))
1654 75d81fc8 Michael Hanselmann
      self._EnqueueJobsUnlocked(restartjobs)
1655 7b5c4a69 Michael Hanselmann
1656 de9d02c7 Michael Hanselmann
    logging.info("Job queue inspection finished")
1657 85f03e0d Michael Hanselmann
1658 fb1ffbca Michael Hanselmann
  def _GetRpc(self, address_list):
1659 fb1ffbca Michael Hanselmann
    """Gets RPC runner with context.
1660 fb1ffbca Michael Hanselmann

1661 fb1ffbca Michael Hanselmann
    """
1662 fb1ffbca Michael Hanselmann
    return rpc.JobQueueRunner(self.context, address_list)
1663 fb1ffbca Michael Hanselmann
1664 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1665 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1666 99aabbed Iustin Pop
  def AddNode(self, node):
1667 99aabbed Iustin Pop
    """Register a new node with the queue.
1668 99aabbed Iustin Pop

1669 99aabbed Iustin Pop
    @type node: L{objects.Node}
1670 99aabbed Iustin Pop
    @param node: the node object to be added
1671 99aabbed Iustin Pop

1672 99aabbed Iustin Pop
    """
1673 99aabbed Iustin Pop
    node_name = node.name
1674 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1675 23752136 Michael Hanselmann
1676 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1677 fb1ffbca Michael Hanselmann
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1678 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1679 c8457ce7 Iustin Pop
    if msg:
1680 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1681 c8457ce7 Iustin Pop
                      node_name, msg)
1682 23752136 Michael Hanselmann
1683 59303563 Iustin Pop
    if not node.master_candidate:
1684 59303563 Iustin Pop
      # remove if existing, ignoring errors
1685 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1686 59303563 Iustin Pop
      # and skip the replication of the job ids
1687 59303563 Iustin Pop
      return
1688 59303563 Iustin Pop
1689 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1690 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1691 23752136 Michael Hanselmann
1692 d2e03a33 Michael Hanselmann
    # Upload current serial file
1693 e2b4a7ba Michael Hanselmann
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1694 d2e03a33 Michael Hanselmann
1695 fb1ffbca Michael Hanselmann
    # Static address list
1696 fb1ffbca Michael Hanselmann
    addrs = [node.primary_ip]
1697 fb1ffbca Michael Hanselmann
1698 d2e03a33 Michael Hanselmann
    for file_name in files:
1699 9f774ee8 Michael Hanselmann
      # Read file content
1700 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1701 9f774ee8 Michael Hanselmann
1702 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1703 cffbbae7 Michael Hanselmann
                             file_name, content)
1704 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1705 c8457ce7 Iustin Pop
      if msg:
1706 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1707 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1708 d2e03a33 Michael Hanselmann
1709 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1710 d2e03a33 Michael Hanselmann
1711 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1712 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1713 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1714 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1715 ea03467c Iustin Pop

1716 ea03467c Iustin Pop
    @type node_name: str
1717 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1718 ea03467c Iustin Pop

1719 ea03467c Iustin Pop
    """
1720 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1721 23752136 Michael Hanselmann
1722 7e950d31 Iustin Pop
  @staticmethod
1723 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1724 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1725 ea03467c Iustin Pop

1726 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1727 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1728 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1729 ea03467c Iustin Pop

1730 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1731 ea03467c Iustin Pop
    @type nodes: list
1732 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1733 ea03467c Iustin Pop
    @type failmsg: str
1734 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1735 ea03467c Iustin Pop

1736 ea03467c Iustin Pop
    """
1737 e74798c1 Michael Hanselmann
    failed = []
1738 e74798c1 Michael Hanselmann
    success = []
1739 e74798c1 Michael Hanselmann
1740 e74798c1 Michael Hanselmann
    for node in nodes:
1741 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1742 c8457ce7 Iustin Pop
      if msg:
1743 e74798c1 Michael Hanselmann
        failed.append(node)
1744 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1745 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1746 c8457ce7 Iustin Pop
      else:
1747 c8457ce7 Iustin Pop
        success.append(node)
1748 e74798c1 Michael Hanselmann
1749 e74798c1 Michael Hanselmann
    # +1 for the master node
1750 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1751 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1752 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1753 e74798c1 Michael Hanselmann
1754 99aabbed Iustin Pop
  def _GetNodeIp(self):
1755 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1756 99aabbed Iustin Pop

1757 ea03467c Iustin Pop
    @rtype: (list, list)
1758 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1759 ea03467c Iustin Pop
        names and the second one with the node addresses
1760 ea03467c Iustin Pop

1761 99aabbed Iustin Pop
    """
1762 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1763 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1764 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1765 99aabbed Iustin Pop
    return name_list, addr_list
1766 99aabbed Iustin Pop
1767 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1768 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1769 8e00939c Michael Hanselmann

1770 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1771 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1772 ea03467c Iustin Pop

1773 ea03467c Iustin Pop
    @type file_name: str
1774 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1775 ea03467c Iustin Pop
    @type data: str
1776 ea03467c Iustin Pop
    @param data: the new contents of the file
1777 4c36bdf5 Guido Trotter
    @type replicate: boolean
1778 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1779 ea03467c Iustin Pop

1780 8e00939c Michael Hanselmann
    """
1781 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1782 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1783 82b22e19 René Nussbaumer
                    gid=getents.masterd_gid)
1784 8e00939c Michael Hanselmann
1785 4c36bdf5 Guido Trotter
    if replicate:
1786 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1787 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1788 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1789 23752136 Michael Hanselmann
1790 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1791 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1792 ea03467c Iustin Pop

1793 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1794 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1795 ea03467c Iustin Pop

1796 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1797 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1798 ea03467c Iustin Pop

1799 ea03467c Iustin Pop
    """
1800 dd875d32 Michael Hanselmann
    # Rename them locally
1801 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1802 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1803 abc1f2ce Michael Hanselmann
1804 dd875d32 Michael Hanselmann
    # ... and on all nodes
1805 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1806 fb1ffbca Michael Hanselmann
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1807 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1808 abc1f2ce Michael Hanselmann
1809 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1810 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1811 f1da30e6 Michael Hanselmann

1812 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1813 f1da30e6 Michael Hanselmann

1814 009e73d0 Iustin Pop
    @type count: integer
1815 009e73d0 Iustin Pop
    @param count: how many serials to return
1816 76b62028 Iustin Pop
    @rtype: list of int
1817 76b62028 Iustin Pop
    @return: a list of job identifiers.
1818 f1da30e6 Michael Hanselmann

1819 f1da30e6 Michael Hanselmann
    """
1820 719f8fba Michael Hanselmann
    assert ht.TPositiveInt(count)
1821 719f8fba Michael Hanselmann
1822 f1da30e6 Michael Hanselmann
    # New number
1823 009e73d0 Iustin Pop
    serial = self._last_serial + count
1824 f1da30e6 Michael Hanselmann
1825 f1da30e6 Michael Hanselmann
    # Write to file
1826 e2b4a7ba Michael Hanselmann
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1827 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1828 f1da30e6 Michael Hanselmann
1829 1410a389 Michael Hanselmann
    result = [jstore.FormatJobID(v)
1830 3c88bf36 Michael Hanselmann
              for v in range(self._last_serial + 1, serial + 1)]
1831 3c88bf36 Michael Hanselmann
1832 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1833 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1834 f1da30e6 Michael Hanselmann
1835 3c88bf36 Michael Hanselmann
    assert len(result) == count
1836 3c88bf36 Michael Hanselmann
1837 009e73d0 Iustin Pop
    return result
1838 f1da30e6 Michael Hanselmann
1839 85f03e0d Michael Hanselmann
  @staticmethod
1840 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1841 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1842 ea03467c Iustin Pop

1843 ea03467c Iustin Pop
    @type job_id: str
1844 ea03467c Iustin Pop
    @param job_id: the job identifier
1845 ea03467c Iustin Pop
    @rtype: str
1846 ea03467c Iustin Pop
    @return: the path to the job file
1847 ea03467c Iustin Pop

1848 ea03467c Iustin Pop
    """
1849 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1850 f1da30e6 Michael Hanselmann
1851 1410a389 Michael Hanselmann
  @staticmethod
1852 1410a389 Michael Hanselmann
  def _GetArchivedJobPath(job_id):
1853 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1854 ea03467c Iustin Pop

1855 ea03467c Iustin Pop
    @type job_id: str
1856 ea03467c Iustin Pop
    @param job_id: the job identifier
1857 ea03467c Iustin Pop
    @rtype: str
1858 ea03467c Iustin Pop
    @return: the path to the archived job file
1859 ea03467c Iustin Pop

1860 ea03467c Iustin Pop
    """
1861 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1862 1410a389 Michael Hanselmann
                          jstore.GetArchiveDirectory(job_id),
1863 1410a389 Michael Hanselmann
                          "job-%s" % job_id)
1864 0cb94105 Michael Hanselmann
1865 cb66225d Michael Hanselmann
  @staticmethod
1866 cb66225d Michael Hanselmann
  def _GetJobIDsUnlocked(sort=True):
1867 911a495b Iustin Pop
    """Return all known job IDs.
1868 911a495b Iustin Pop

1869 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1870 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1871 ac0930b9 Iustin Pop
    extra IDs).
1872 ac0930b9 Iustin Pop

1873 85a1c57d Guido Trotter
    @type sort: boolean
1874 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1875 ea03467c Iustin Pop
    @rtype: list
1876 ea03467c Iustin Pop
    @return: the list of job IDs
1877 ea03467c Iustin Pop

1878 911a495b Iustin Pop
    """
1879 85a1c57d Guido Trotter
    jlist = []
1880 e2b4a7ba Michael Hanselmann
    for filename in utils.ListVisibleFiles(pathutils.QUEUE_DIR):
1881 cb66225d Michael Hanselmann
      m = constants.JOB_FILE_RE.match(filename)
1882 85a1c57d Guido Trotter
      if m:
1883 76b62028 Iustin Pop
        jlist.append(int(m.group(1)))
1884 85a1c57d Guido Trotter
    if sort:
1885 76b62028 Iustin Pop
      jlist.sort()
1886 f0d874fe Iustin Pop
    return jlist
1887 911a495b Iustin Pop
1888 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1889 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1890 ea03467c Iustin Pop

1891 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1892 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1893 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1894 ea03467c Iustin Pop

1895 76b62028 Iustin Pop
    @type job_id: int
1896 ea03467c Iustin Pop
    @param job_id: the job id
1897 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1898 ea03467c Iustin Pop
    @return: either None or the job object
1899 ea03467c Iustin Pop

1900 ea03467c Iustin Pop
    """
1901 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
1902 5685c1a5 Michael Hanselmann
    if job:
1903 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
1904 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Found read-only job in memcache"
1905 5685c1a5 Michael Hanselmann
      return job
1906 ac0930b9 Iustin Pop
1907 3d6c5566 Guido Trotter
    try:
1908 194c8ca4 Michael Hanselmann
      job = self._LoadJobFromDisk(job_id, False)
1909 aa9f8167 Iustin Pop
      if job is None:
1910 aa9f8167 Iustin Pop
        return job
1911 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
1912 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
1913 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
1914 3d6c5566 Guido Trotter
      if old_path == new_path:
1915 3d6c5566 Guido Trotter
        # job already archived (future case)
1916 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
1917 3d6c5566 Guido Trotter
      else:
1918 3d6c5566 Guido Trotter
        # non-archived case
1919 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
1920 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
1921 3d6c5566 Guido Trotter
      return None
1922 162c8636 Guido Trotter
1923 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Job just loaded is not writable"
1924 c0f6d0d8 Michael Hanselmann
1925 162c8636 Guido Trotter
    self._memcache[job_id] = job
1926 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
1927 162c8636 Guido Trotter
    return job
1928 162c8636 Guido Trotter
1929 c0f6d0d8 Michael Hanselmann
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1930 162c8636 Guido Trotter
    """Load the given job file from disk.
1931 162c8636 Guido Trotter

1932 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1933 162c8636 Guido Trotter

1934 76b62028 Iustin Pop
    @type job_id: int
1935 162c8636 Guido Trotter
    @param job_id: job identifier
1936 194c8ca4 Michael Hanselmann
    @type try_archived: bool
1937 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
1938 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
1939 162c8636 Guido Trotter
    @return: either None or the job object
1940 162c8636 Guido Trotter

1941 162c8636 Guido Trotter
    """
1942 c0f6d0d8 Michael Hanselmann
    path_functions = [(self._GetJobPath, True)]
1943 194c8ca4 Michael Hanselmann
1944 194c8ca4 Michael Hanselmann
    if try_archived:
1945 c0f6d0d8 Michael Hanselmann
      path_functions.append((self._GetArchivedJobPath, False))
1946 194c8ca4 Michael Hanselmann
1947 194c8ca4 Michael Hanselmann
    raw_data = None
1948 c0f6d0d8 Michael Hanselmann
    writable_default = None
1949 194c8ca4 Michael Hanselmann
1950 c0f6d0d8 Michael Hanselmann
    for (fn, writable_default) in path_functions:
1951 194c8ca4 Michael Hanselmann
      filepath = fn(job_id)
1952 194c8ca4 Michael Hanselmann
      logging.debug("Loading job from %s", filepath)
1953 194c8ca4 Michael Hanselmann
      try:
1954 194c8ca4 Michael Hanselmann
        raw_data = utils.ReadFile(filepath)
1955 194c8ca4 Michael Hanselmann
      except EnvironmentError, err:
1956 194c8ca4 Michael Hanselmann
        if err.errno != errno.ENOENT:
1957 194c8ca4 Michael Hanselmann
          raise
1958 194c8ca4 Michael Hanselmann
      else:
1959 194c8ca4 Michael Hanselmann
        break
1960 194c8ca4 Michael Hanselmann
1961 194c8ca4 Michael Hanselmann
    if not raw_data:
1962 194c8ca4 Michael Hanselmann
      return None
1963 13998ef2 Michael Hanselmann
1964 c0f6d0d8 Michael Hanselmann
    if writable is None:
1965 c0f6d0d8 Michael Hanselmann
      writable = writable_default
1966 c0f6d0d8 Michael Hanselmann
1967 94ed59a5 Iustin Pop
    try:
1968 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
1969 c0f6d0d8 Michael Hanselmann
      job = _QueuedJob.Restore(self, data, writable)
1970 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1971 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
1972 94ed59a5 Iustin Pop
1973 ac0930b9 Iustin Pop
    return job
1974 f1da30e6 Michael Hanselmann
1975 c0f6d0d8 Michael Hanselmann
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1976 0f9c08dc Guido Trotter
    """Load the given job file from disk.
1977 0f9c08dc Guido Trotter

1978 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1979 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
1980 0f9c08dc Guido Trotter
    exception is logged.
1981 0f9c08dc Guido Trotter

1982 76b62028 Iustin Pop
    @type job_id: int
1983 0f9c08dc Guido Trotter
    @param job_id: job identifier
1984 194c8ca4 Michael Hanselmann
    @type try_archived: bool
1985 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
1986 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
1987 0f9c08dc Guido Trotter
    @return: either None or the job object
1988 0f9c08dc Guido Trotter

1989 0f9c08dc Guido Trotter
    """
1990 0f9c08dc Guido Trotter
    try:
1991 c0f6d0d8 Michael Hanselmann
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1992 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
1993 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
1994 0f9c08dc Guido Trotter
      return None
1995 0f9c08dc Guido Trotter
1996 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
1997 20571a26 Guido Trotter
    """Update the queue size.
1998 20571a26 Guido Trotter

1999 20571a26 Guido Trotter
    """
2000 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2001 20571a26 Guido Trotter
2002 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2003 20571a26 Guido Trotter
  @_RequireOpenQueue
2004 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
2005 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
2006 3ccafd0e Iustin Pop

2007 ea03467c Iustin Pop
    @type drain_flag: boolean
2008 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
2009 ea03467c Iustin Pop

2010 3ccafd0e Iustin Pop
    """
2011 ff699aa9 Michael Hanselmann
    jstore.SetDrainFlag(drain_flag)
2012 20571a26 Guido Trotter
2013 20571a26 Guido Trotter
    self._drained = drain_flag
2014 20571a26 Guido Trotter
2015 3ccafd0e Iustin Pop
    return True
2016 3ccafd0e Iustin Pop
2017 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2018 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
2019 85f03e0d Michael Hanselmann
    """Create and store a new job.
2020 f1da30e6 Michael Hanselmann

2021 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
2022 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
2023 c3f0a12f Iustin Pop

2024 009e73d0 Iustin Pop
    @type job_id: job ID
2025 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
2026 c3f0a12f Iustin Pop
    @type ops: list
2027 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
2028 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
2029 7beb1e53 Guido Trotter
    @return: the job object to be queued
2030 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2031 e71c8147 Michael Hanselmann
    @raise errors.GenericError: If an opcode is not valid
2032 c3f0a12f Iustin Pop

2033 c3f0a12f Iustin Pop
    """
2034 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2035 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
2036 f87b405e Michael Hanselmann
2037 c0f6d0d8 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops, True)
2038 f1da30e6 Michael Hanselmann
2039 e71c8147 Michael Hanselmann
    # Check priority
2040 e71c8147 Michael Hanselmann
    for idx, op in enumerate(job.ops):
2041 e71c8147 Michael Hanselmann
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2042 e71c8147 Michael Hanselmann
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2043 e71c8147 Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2044 e71c8147 Michael Hanselmann
                                  " are %s" % (idx, op.priority, allowed))
2045 e71c8147 Michael Hanselmann
2046 b247c6fc Michael Hanselmann
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2047 b247c6fc Michael Hanselmann
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2048 b247c6fc Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2049 b247c6fc Michael Hanselmann
                                  " match %s: %s" %
2050 b247c6fc Michael Hanselmann
                                  (idx, opcodes.TNoRelativeJobDependencies,
2051 b247c6fc Michael Hanselmann
                                   dependencies))
2052 b247c6fc Michael Hanselmann
2053 f1da30e6 Michael Hanselmann
    # Write to disk
2054 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
2055 f1da30e6 Michael Hanselmann
2056 20571a26 Guido Trotter
    self._queue_size += 1
2057 20571a26 Guido Trotter
2058 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
2059 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
2060 ac0930b9 Iustin Pop
2061 7beb1e53 Guido Trotter
    return job
2062 f1da30e6 Michael Hanselmann
2063 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2064 2971c913 Iustin Pop
  @_RequireOpenQueue
2065 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2066 2971c913 Iustin Pop
  def SubmitJob(self, ops):
2067 2971c913 Iustin Pop
    """Create and store a new job.
2068 2971c913 Iustin Pop

2069 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2070 2971c913 Iustin Pop

2071 2971c913 Iustin Pop
    """
2072 b247c6fc Michael Hanselmann
    (job_id, ) = self._NewSerialsUnlocked(1)
2073 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2074 7beb1e53 Guido Trotter
    return job_id
2075 2971c913 Iustin Pop
2076 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2077 2971c913 Iustin Pop
  @_RequireOpenQueue
2078 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2079 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
2080 2971c913 Iustin Pop
    """Create and store multiple jobs.
2081 2971c913 Iustin Pop

2082 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2083 2971c913 Iustin Pop

2084 2971c913 Iustin Pop
    """
2085 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2086 b247c6fc Michael Hanselmann
2087 b247c6fc Michael Hanselmann
    (results, added_jobs) = \
2088 b247c6fc Michael Hanselmann
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2089 7b5c4a69 Michael Hanselmann
2090 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked(added_jobs)
2091 2971c913 Iustin Pop
2092 2971c913 Iustin Pop
    return results
2093 2971c913 Iustin Pop
2094 b247c6fc Michael Hanselmann
  @staticmethod
2095 b247c6fc Michael Hanselmann
  def _FormatSubmitError(msg, ops):
2096 b247c6fc Michael Hanselmann
    """Formats errors which occurred while submitting a job.
2097 b247c6fc Michael Hanselmann

2098 b247c6fc Michael Hanselmann
    """
2099 b247c6fc Michael Hanselmann
    return ("%s; opcodes %s" %
2100 b247c6fc Michael Hanselmann
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2101 b247c6fc Michael Hanselmann
2102 b247c6fc Michael Hanselmann
  @staticmethod
2103 b247c6fc Michael Hanselmann
  def _ResolveJobDependencies(resolve_fn, deps):
2104 b247c6fc Michael Hanselmann
    """Resolves relative job IDs in dependencies.
2105 b247c6fc Michael Hanselmann

2106 b247c6fc Michael Hanselmann
    @type resolve_fn: callable
2107 b247c6fc Michael Hanselmann
    @param resolve_fn: Function to resolve a relative job ID
2108 b247c6fc Michael Hanselmann
    @type deps: list
2109 b247c6fc Michael Hanselmann
    @param deps: Dependencies
2110 b247c6fc Michael Hanselmann
    @rtype: list
2111 b247c6fc Michael Hanselmann
    @return: Resolved dependencies
2112 b247c6fc Michael Hanselmann

2113 b247c6fc Michael Hanselmann
    """
2114 b247c6fc Michael Hanselmann
    result = []
2115 b247c6fc Michael Hanselmann
2116 b247c6fc Michael Hanselmann
    for (dep_job_id, dep_status) in deps:
2117 b247c6fc Michael Hanselmann
      if ht.TRelativeJobId(dep_job_id):
2118 b247c6fc Michael Hanselmann
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2119 b247c6fc Michael Hanselmann
        try:
2120 b247c6fc Michael Hanselmann
          job_id = resolve_fn(dep_job_id)
2121 b247c6fc Michael Hanselmann
        except IndexError:
2122 b247c6fc Michael Hanselmann
          # Abort
2123 b247c6fc Michael Hanselmann
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2124 b247c6fc Michael Hanselmann
      else:
2125 b247c6fc Michael Hanselmann
        job_id = dep_job_id
2126 b247c6fc Michael Hanselmann
2127 b247c6fc Michael Hanselmann
      result.append((job_id, dep_status))
2128 b247c6fc Michael Hanselmann
2129 b247c6fc Michael Hanselmann
    return (True, result)
2130 b247c6fc Michael Hanselmann
2131 b247c6fc Michael Hanselmann
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2132 b247c6fc Michael Hanselmann
    """Create and store multiple jobs.
2133 b247c6fc Michael Hanselmann

2134 b247c6fc Michael Hanselmann
    @see: L{_SubmitJobUnlocked}
2135 b247c6fc Michael Hanselmann

2136 b247c6fc Michael Hanselmann
    """
2137 b247c6fc Michael Hanselmann
    results = []
2138 b247c6fc Michael Hanselmann
    added_jobs = []
2139 b247c6fc Michael Hanselmann
2140 b247c6fc Michael Hanselmann
    def resolve_fn(job_idx, reljobid):
2141 b247c6fc Michael Hanselmann
      assert reljobid < 0
2142 b247c6fc Michael Hanselmann
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2143 b247c6fc Michael Hanselmann
2144 b247c6fc Michael Hanselmann
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2145 b247c6fc Michael Hanselmann
      for op in ops:
2146 b247c6fc Michael Hanselmann
        if getattr(op, opcodes.DEPEND_ATTR, None):
2147 b247c6fc Michael Hanselmann
          (status, data) = \
2148 b247c6fc Michael Hanselmann
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2149 b247c6fc Michael Hanselmann
                                         op.depends)
2150 b247c6fc Michael Hanselmann
          if not status:
2151 b247c6fc Michael Hanselmann
            # Abort resolving dependencies
2152 b247c6fc Michael Hanselmann
            assert ht.TNonEmptyString(data), "No error message"
2153 b247c6fc Michael Hanselmann
            break
2154 b247c6fc Michael Hanselmann
          # Use resolved dependencies
2155 b247c6fc Michael Hanselmann
          op.depends = data
2156 b247c6fc Michael Hanselmann
      else:
2157 b247c6fc Michael Hanselmann
        try:
2158 b247c6fc Michael Hanselmann
          job = self._SubmitJobUnlocked(job_id, ops)
2159 b247c6fc Michael Hanselmann
        except errors.GenericError, err:
2160 b247c6fc Michael Hanselmann
          status = False
2161 b247c6fc Michael Hanselmann
          data = self._FormatSubmitError(str(err), ops)
2162 b247c6fc Michael Hanselmann
        else:
2163 b247c6fc Michael Hanselmann
          status = True
2164 b247c6fc Michael Hanselmann
          data = job_id
2165 b247c6fc Michael Hanselmann
          added_jobs.append(job)
2166 b247c6fc Michael Hanselmann
2167 b247c6fc Michael Hanselmann
      results.append((status, data))
2168 b247c6fc Michael Hanselmann
2169 b247c6fc Michael Hanselmann
    return (results, added_jobs)
2170 b247c6fc Michael Hanselmann
2171 75d81fc8 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2172 7b5c4a69 Michael Hanselmann
  def _EnqueueJobs(self, jobs):
2173 7b5c4a69 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2174 7b5c4a69 Michael Hanselmann

2175 7b5c4a69 Michael Hanselmann
    @type jobs: list
2176 7b5c4a69 Michael Hanselmann
    @param jobs: List of all jobs
2177 7b5c4a69 Michael Hanselmann

2178 7b5c4a69 Michael Hanselmann
    """
2179 75d81fc8 Michael Hanselmann
    return self._EnqueueJobsUnlocked(jobs)
2180 75d81fc8 Michael Hanselmann
2181 75d81fc8 Michael Hanselmann
  def _EnqueueJobsUnlocked(self, jobs):
2182 75d81fc8 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2183 75d81fc8 Michael Hanselmann

2184 75d81fc8 Michael Hanselmann
    @type jobs: list
2185 75d81fc8 Michael Hanselmann
    @param jobs: List of all jobs
2186 75d81fc8 Michael Hanselmann

2187 75d81fc8 Michael Hanselmann
    """
2188 75d81fc8 Michael Hanselmann
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2189 7b5c4a69 Michael Hanselmann
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2190 7b5c4a69 Michael Hanselmann
                             priority=[job.CalcPriority() for job in jobs])
2191 7b5c4a69 Michael Hanselmann
2192 b95479a5 Michael Hanselmann
  def _GetJobStatusForDependencies(self, job_id):
2193 b95479a5 Michael Hanselmann
    """Gets the status of a job for dependencies.
2194 b95479a5 Michael Hanselmann

2195 76b62028 Iustin Pop
    @type job_id: int
2196 b95479a5 Michael Hanselmann
    @param job_id: Job ID
2197 b95479a5 Michael Hanselmann
    @raise errors.JobLost: If job can't be found
2198 b95479a5 Michael Hanselmann

2199 b95479a5 Michael Hanselmann
    """
2200 b95479a5 Michael Hanselmann
    # Not using in-memory cache as doing so would require an exclusive lock
2201 b95479a5 Michael Hanselmann
2202 b95479a5 Michael Hanselmann
    # Try to load from disk
2203 c0f6d0d8 Michael Hanselmann
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2204 c0f6d0d8 Michael Hanselmann
2205 17385bd2 Andrea Spadaccini
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2206 b95479a5 Michael Hanselmann
2207 b95479a5 Michael Hanselmann
    if job:
2208 b95479a5 Michael Hanselmann
      return job.CalcStatus()
2209 b95479a5 Michael Hanselmann
2210 b95479a5 Michael Hanselmann
    raise errors.JobLost("Job %s not found" % job_id)
2211 b95479a5 Michael Hanselmann
2212 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2213 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
2214 ea03467c Iustin Pop
    """Update a job's on disk storage.
2215 ea03467c Iustin Pop

2216 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
2217 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
2218 ea03467c Iustin Pop
    nodes.
2219 ea03467c Iustin Pop

2220 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
2221 ea03467c Iustin Pop
    @param job: the changed job
2222 4c36bdf5 Guido Trotter
    @type replicate: boolean
2223 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
2224 ea03467c Iustin Pop

2225 ea03467c Iustin Pop
    """
2226 66bd7445 Michael Hanselmann
    if __debug__:
2227 66bd7445 Michael Hanselmann
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2228 66bd7445 Michael Hanselmann
      assert (finalized ^ (job.end_timestamp is None))
2229 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't update read-only job"
2230 66bd7445 Michael Hanselmann
2231 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
2232 a182a3ed Michael Hanselmann
    data = serializer.DumpJson(job.Serialize())
2233 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
2234 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
2235 ac0930b9 Iustin Pop
2236 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2237 5c735209 Iustin Pop
                        timeout):
2238 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
2239 6c5a7090 Michael Hanselmann

2240 76b62028 Iustin Pop
    @type job_id: int
2241 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
2242 6c5a7090 Michael Hanselmann
    @type fields: list of strings
2243 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
2244 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
2245 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
2246 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
2247 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
2248 5c735209 Iustin Pop
    @type timeout: float
2249 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
2250 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
2251 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
2252 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
2253 ea03467c Iustin Pop

2254 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
2255 ea03467c Iustin Pop
        we instead return a special value,
2256 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2257 ea03467c Iustin Pop
        as such by the clients
2258 6c5a7090 Michael Hanselmann

2259 6c5a7090 Michael Hanselmann
    """
2260 c0f6d0d8 Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2261 c0f6d0d8 Michael Hanselmann
                             writable=False)
2262 989a8bee Michael Hanselmann
2263 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
2264 989a8bee Michael Hanselmann
2265 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
2266 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
2267 dfe57c22 Michael Hanselmann
2268 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2269 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2270 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
2271 188c5e0a Michael Hanselmann
    """Cancels a job.
2272 188c5e0a Michael Hanselmann

2273 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
2274 ea03467c Iustin Pop

2275 76b62028 Iustin Pop
    @type job_id: int
2276 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
2277 188c5e0a Michael Hanselmann

2278 188c5e0a Michael Hanselmann
    """
2279 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
2280 188c5e0a Michael Hanselmann
2281 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2282 188c5e0a Michael Hanselmann
    if not job:
2283 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2284 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
2285 fbf0262f Michael Hanselmann
2286 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Can't cancel read-only job"
2287 c0f6d0d8 Michael Hanselmann
2288 099b2870 Michael Hanselmann
    (success, msg) = job.Cancel()
2289 188c5e0a Michael Hanselmann
2290 099b2870 Michael Hanselmann
    if success:
2291 66bd7445 Michael Hanselmann
      # If the job was finalized (e.g. cancelled), this is the final write
2292 66bd7445 Michael Hanselmann
      # allowed. The job can be archived anytime.
2293 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
2294 fbf0262f Michael Hanselmann
2295 099b2870 Michael Hanselmann
    return (success, msg)
2296 fbf0262f Michael Hanselmann
2297 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
2298 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
2299 d7fd1f28 Michael Hanselmann
    """Archives jobs.
2300 c609f802 Michael Hanselmann

2301 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
2302 25e7b43f Iustin Pop
    @param jobs: Job objects
2303 d7fd1f28 Michael Hanselmann
    @rtype: int
2304 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
2305 c609f802 Michael Hanselmann

2306 c609f802 Michael Hanselmann
    """
2307 d7fd1f28 Michael Hanselmann
    archive_jobs = []
2308 d7fd1f28 Michael Hanselmann
    rename_files = []
2309 d7fd1f28 Michael Hanselmann
    for job in jobs:
2310 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't archive read-only job"
2311 c0f6d0d8 Michael Hanselmann
2312 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2313 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
2314 d7fd1f28 Michael Hanselmann
        continue
2315 c609f802 Michael Hanselmann
2316 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
2317 c609f802 Michael Hanselmann
2318 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
2319 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
2320 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
2321 c609f802 Michael Hanselmann
2322 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
2323 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
2324 f1da30e6 Michael Hanselmann
2325 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
2326 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
2327 d7fd1f28 Michael Hanselmann
2328 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2329 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
2330 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
2331 20571a26 Guido Trotter
    # archived jobs to fix this.
2332 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
2333 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
2334 78d12585 Michael Hanselmann
2335 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2336 07cd723a Iustin Pop
  @_RequireOpenQueue
2337 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
2338 07cd723a Iustin Pop
    """Archives a job.
2339 07cd723a Iustin Pop

2340 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2341 ea03467c Iustin Pop

2342 76b62028 Iustin Pop
    @type job_id: int
2343 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
2344 78d12585 Michael Hanselmann
    @rtype: bool
2345 78d12585 Michael Hanselmann
    @return: Whether job was archived
2346 07cd723a Iustin Pop

2347 07cd723a Iustin Pop
    """
2348 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
2349 78d12585 Michael Hanselmann
2350 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2351 78d12585 Michael Hanselmann
    if not job:
2352 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2353 78d12585 Michael Hanselmann
      return False
2354 78d12585 Michael Hanselmann
2355 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
2356 07cd723a Iustin Pop
2357 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2358 07cd723a Iustin Pop
  @_RequireOpenQueue
2359 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
2360 07cd723a Iustin Pop
    """Archives all jobs based on age.
2361 07cd723a Iustin Pop

2362 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
2363 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
2364 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
2365 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
2366 07cd723a Iustin Pop

2367 07cd723a Iustin Pop
    @type age: int
2368 07cd723a Iustin Pop
    @param age: the minimum age in seconds
2369 07cd723a Iustin Pop

2370 07cd723a Iustin Pop
    """
2371 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
2372 07cd723a Iustin Pop
2373 07cd723a Iustin Pop
    now = time.time()
2374 f8ad5591 Michael Hanselmann
    end_time = now + timeout
2375 f8ad5591 Michael Hanselmann
    archived_count = 0
2376 f8ad5591 Michael Hanselmann
    last_touched = 0
2377 f8ad5591 Michael Hanselmann
2378 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
2379 d7fd1f28 Michael Hanselmann
    pending = []
2380 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
2381 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
2382 f8ad5591 Michael Hanselmann
2383 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
2384 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
2385 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
2386 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
2387 f8ad5591 Michael Hanselmann
        break
2388 f8ad5591 Michael Hanselmann
2389 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
2390 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
2391 f8ad5591 Michael Hanselmann
      if job:
2392 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
2393 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
2394 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
2395 f8ad5591 Michael Hanselmann
          else:
2396 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
2397 07cd723a Iustin Pop
        else:
2398 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
2399 f8ad5591 Michael Hanselmann
2400 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
2401 d7fd1f28 Michael Hanselmann
          pending.append(job)
2402 d7fd1f28 Michael Hanselmann
2403 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
2404 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
2405 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
2406 d7fd1f28 Michael Hanselmann
            pending = []
2407 f8ad5591 Michael Hanselmann
2408 d7fd1f28 Michael Hanselmann
    if pending:
2409 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
2410 07cd723a Iustin Pop
2411 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
2412 07cd723a Iustin Pop
2413 e07f7f7a Michael Hanselmann
  def _Query(self, fields, qfilter):
2414 e07f7f7a Michael Hanselmann
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2415 e07f7f7a Michael Hanselmann
                       namefield="id")
2416 e07f7f7a Michael Hanselmann
2417 e07f7f7a Michael Hanselmann
    job_ids = qobj.RequestedNames()
2418 e07f7f7a Michael Hanselmann
2419 e07f7f7a Michael Hanselmann
    list_all = (job_ids is None)
2420 e07f7f7a Michael Hanselmann
2421 e07f7f7a Michael Hanselmann
    if list_all:
2422 e07f7f7a Michael Hanselmann
      # Since files are added to/removed from the queue atomically, there's no
2423 e07f7f7a Michael Hanselmann
      # risk of getting the job ids in an inconsistent state.
2424 e07f7f7a Michael Hanselmann
      job_ids = self._GetJobIDsUnlocked()
2425 e07f7f7a Michael Hanselmann
2426 e07f7f7a Michael Hanselmann
    jobs = []
2427 e07f7f7a Michael Hanselmann
2428 e07f7f7a Michael Hanselmann
    for job_id in job_ids:
2429 e07f7f7a Michael Hanselmann
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2430 e07f7f7a Michael Hanselmann
      if job is not None or not list_all:
2431 e07f7f7a Michael Hanselmann
        jobs.append((job_id, job))
2432 e07f7f7a Michael Hanselmann
2433 e07f7f7a Michael Hanselmann
    return (qobj, jobs, list_all)
2434 e07f7f7a Michael Hanselmann
2435 e07f7f7a Michael Hanselmann
  def QueryJobs(self, fields, qfilter):
2436 e07f7f7a Michael Hanselmann
    """Returns a list of jobs in queue.
2437 e07f7f7a Michael Hanselmann

2438 e07f7f7a Michael Hanselmann
    @type fields: sequence
2439 e07f7f7a Michael Hanselmann
    @param fields: List of wanted fields
2440 e07f7f7a Michael Hanselmann
    @type qfilter: None or query2 filter (list)
2441 e07f7f7a Michael Hanselmann
    @param qfilter: Query filter
2442 e07f7f7a Michael Hanselmann

2443 e07f7f7a Michael Hanselmann
    """
2444 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2445 e07f7f7a Michael Hanselmann
2446 76b62028 Iustin Pop
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2447 e07f7f7a Michael Hanselmann
2448 e07f7f7a Michael Hanselmann
  def OldStyleQueryJobs(self, job_ids, fields):
2449 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
2450 e2715f69 Michael Hanselmann

2451 ea03467c Iustin Pop
    @type job_ids: list
2452 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
2453 ea03467c Iustin Pop
    @type fields: list
2454 ea03467c Iustin Pop
    @param fields: names of fields to return
2455 ea03467c Iustin Pop
    @rtype: list
2456 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
2457 ea03467c Iustin Pop
        the requested fields
2458 e2715f69 Michael Hanselmann

2459 e2715f69 Michael Hanselmann
    """
2460 76b62028 Iustin Pop
    # backwards compat:
2461 76b62028 Iustin Pop
    job_ids = [int(jid) for jid in job_ids]
2462 e07f7f7a Michael Hanselmann
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2463 e2715f69 Michael Hanselmann
2464 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2465 e2715f69 Michael Hanselmann
2466 76b62028 Iustin Pop
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2467 e2715f69 Michael Hanselmann
2468 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2469 6d5ea385 Michael Hanselmann
  def PrepareShutdown(self):
2470 6d5ea385 Michael Hanselmann
    """Prepare to stop the job queue.
2471 6d5ea385 Michael Hanselmann

2472 6d5ea385 Michael Hanselmann
    Disables execution of jobs in the workerpool and returns whether there are
2473 6d5ea385 Michael Hanselmann
    any jobs currently running. If the latter is the case, the job queue is not
2474 6d5ea385 Michael Hanselmann
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2475 6d5ea385 Michael Hanselmann
    be called without interfering with any job. Queued and unfinished jobs will
2476 6d5ea385 Michael Hanselmann
    be resumed next time.
2477 6d5ea385 Michael Hanselmann

2478 6d5ea385 Michael Hanselmann
    Once this function has been called no new job submissions will be accepted
2479 6d5ea385 Michael Hanselmann
    (see L{_RequireNonDrainedQueue}).
2480 6d5ea385 Michael Hanselmann

2481 6d5ea385 Michael Hanselmann
    @rtype: bool
2482 6d5ea385 Michael Hanselmann
    @return: Whether there are any running jobs
2483 6d5ea385 Michael Hanselmann

2484 6d5ea385 Michael Hanselmann
    """
2485 6d5ea385 Michael Hanselmann
    if self._accepting_jobs:
2486 6d5ea385 Michael Hanselmann
      self._accepting_jobs = False
2487 6d5ea385 Michael Hanselmann
2488 6d5ea385 Michael Hanselmann
      # Tell worker pool to stop processing pending tasks
2489 6d5ea385 Michael Hanselmann
      self._wpool.SetActive(False)
2490 6d5ea385 Michael Hanselmann
2491 6d5ea385 Michael Hanselmann
    return self._wpool.HasRunningTasks()
2492 6d5ea385 Michael Hanselmann
2493 6d5ea385 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2494 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2495 e2715f69 Michael Hanselmann
  def Shutdown(self):
2496 e2715f69 Michael Hanselmann
    """Stops the job queue.
2497 e2715f69 Michael Hanselmann

2498 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
2499 ea03467c Iustin Pop

2500 e2715f69 Michael Hanselmann
    """
2501 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
2502 85f03e0d Michael Hanselmann
2503 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
2504 a71f9c7d Guido Trotter
    self._queue_filelock = None