Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 70817cee

History | View | Annotate | Download (73.4 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 b95479a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 e2715f69 Michael Hanselmann
import logging
33 f1da30e6 Michael Hanselmann
import errno
34 f1048938 Iustin Pop
import time
35 5685c1a5 Michael Hanselmann
import weakref
36 b95479a5 Michael Hanselmann
import threading
37 dfc8824a Michael Hanselmann
import itertools
38 498ae1cc Iustin Pop
39 6c2549d6 Guido Trotter
try:
40 b459a848 Andrea Spadaccini
  # pylint: disable=E0611
41 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
42 6c2549d6 Guido Trotter
except ImportError:
43 6c2549d6 Guido Trotter
  import pyinotify
44 6c2549d6 Guido Trotter
45 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
46 e2715f69 Michael Hanselmann
from ganeti import constants
47 f1da30e6 Michael Hanselmann
from ganeti import serializer
48 e2715f69 Michael Hanselmann
from ganeti import workerpool
49 99bd4f0a Guido Trotter
from ganeti import locking
50 f1da30e6 Michael Hanselmann
from ganeti import opcodes
51 7a1ecaed Iustin Pop
from ganeti import errors
52 e2715f69 Michael Hanselmann
from ganeti import mcpu
53 7996a135 Iustin Pop
from ganeti import utils
54 04ab05ce Michael Hanselmann
from ganeti import jstore
55 c3f0a12f Iustin Pop
from ganeti import rpc
56 82b22e19 René Nussbaumer
from ganeti import runtime
57 a744b676 Manuel Franceschini
from ganeti import netutils
58 989a8bee Michael Hanselmann
from ganeti import compat
59 b95479a5 Michael Hanselmann
from ganeti import ht
60 a06c6ae8 Michael Hanselmann
from ganeti import query
61 a06c6ae8 Michael Hanselmann
from ganeti import qlang
62 e2715f69 Michael Hanselmann
63 fbf0262f Michael Hanselmann
64 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
65 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
66 e2715f69 Michael Hanselmann
67 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
68 ebb80afa Guido Trotter
_LOCK = "_lock"
69 ebb80afa Guido Trotter
_QUEUE = "_queue"
70 99bd4f0a Guido Trotter
71 498ae1cc Iustin Pop
72 9728ae5d Iustin Pop
class CancelJob(Exception):
73 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
74 fbf0262f Michael Hanselmann

75 fbf0262f Michael Hanselmann
  """
76 fbf0262f Michael Hanselmann
77 fbf0262f Michael Hanselmann
78 70552c46 Michael Hanselmann
def TimeStampNow():
79 ea03467c Iustin Pop
  """Returns the current timestamp.
80 ea03467c Iustin Pop

81 ea03467c Iustin Pop
  @rtype: tuple
82 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
83 ea03467c Iustin Pop

84 ea03467c Iustin Pop
  """
85 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
86 70552c46 Michael Hanselmann
87 70552c46 Michael Hanselmann
88 a06c6ae8 Michael Hanselmann
class _SimpleJobQuery:
89 a06c6ae8 Michael Hanselmann
  """Wrapper for job queries.
90 a06c6ae8 Michael Hanselmann

91 a06c6ae8 Michael Hanselmann
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
92 a06c6ae8 Michael Hanselmann

93 a06c6ae8 Michael Hanselmann
  """
94 a06c6ae8 Michael Hanselmann
  def __init__(self, fields):
95 a06c6ae8 Michael Hanselmann
    """Initializes this class.
96 a06c6ae8 Michael Hanselmann

97 a06c6ae8 Michael Hanselmann
    """
98 a06c6ae8 Michael Hanselmann
    self._query = query.Query(query.JOB_FIELDS, fields)
99 a06c6ae8 Michael Hanselmann
100 a06c6ae8 Michael Hanselmann
  def __call__(self, job):
101 a06c6ae8 Michael Hanselmann
    """Executes a job query using cached field list.
102 a06c6ae8 Michael Hanselmann

103 a06c6ae8 Michael Hanselmann
    """
104 a06c6ae8 Michael Hanselmann
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
105 a06c6ae8 Michael Hanselmann
106 a06c6ae8 Michael Hanselmann
107 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
108 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
109 e2715f69 Michael Hanselmann

110 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
111 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
112 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
113 ea03467c Iustin Pop
  @ivar status: the current status
114 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
115 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
116 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
117 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
118 f1048938 Iustin Pop

119 e2715f69 Michael Hanselmann
  """
120 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
121 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
122 66d895a8 Iustin Pop
               "__weakref__"]
123 66d895a8 Iustin Pop
124 85f03e0d Michael Hanselmann
  def __init__(self, op):
125 66abb9ff Michael Hanselmann
    """Initializes instances of this class.
126 ea03467c Iustin Pop

127 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
128 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
129 ea03467c Iustin Pop

130 ea03467c Iustin Pop
    """
131 85f03e0d Michael Hanselmann
    self.input = op
132 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
133 85f03e0d Michael Hanselmann
    self.result = None
134 85f03e0d Michael Hanselmann
    self.log = []
135 70552c46 Michael Hanselmann
    self.start_timestamp = None
136 b9b5abcb Iustin Pop
    self.exec_timestamp = None
137 70552c46 Michael Hanselmann
    self.end_timestamp = None
138 f1da30e6 Michael Hanselmann
139 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
140 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
141 8f5c488d Michael Hanselmann
142 f1da30e6 Michael Hanselmann
  @classmethod
143 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
144 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
145 ea03467c Iustin Pop

146 ea03467c Iustin Pop
    @type state: dict
147 ea03467c Iustin Pop
    @param state: the serialized state
148 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
149 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
150 ea03467c Iustin Pop

151 ea03467c Iustin Pop
    """
152 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
153 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
154 85f03e0d Michael Hanselmann
    obj.status = state["status"]
155 85f03e0d Michael Hanselmann
    obj.result = state["result"]
156 85f03e0d Michael Hanselmann
    obj.log = state["log"]
157 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
158 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
159 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
160 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
161 f1da30e6 Michael Hanselmann
    return obj
162 f1da30e6 Michael Hanselmann
163 f1da30e6 Michael Hanselmann
  def Serialize(self):
164 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
165 ea03467c Iustin Pop

166 ea03467c Iustin Pop
    @rtype: dict
167 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
168 ea03467c Iustin Pop

169 ea03467c Iustin Pop
    """
170 6c5a7090 Michael Hanselmann
    return {
171 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
172 6c5a7090 Michael Hanselmann
      "status": self.status,
173 6c5a7090 Michael Hanselmann
      "result": self.result,
174 6c5a7090 Michael Hanselmann
      "log": self.log,
175 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
176 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
177 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
178 8f5c488d Michael Hanselmann
      "priority": self.priority,
179 6c5a7090 Michael Hanselmann
      }
180 f1048938 Iustin Pop
181 e2715f69 Michael Hanselmann
182 e2715f69 Michael Hanselmann
class _QueuedJob(object):
183 e2715f69 Michael Hanselmann
  """In-memory job representation.
184 e2715f69 Michael Hanselmann

185 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
186 ea03467c Iustin Pop
  be taken care of by users of this class.
187 ea03467c Iustin Pop

188 ea03467c Iustin Pop
  @type queue: L{JobQueue}
189 ea03467c Iustin Pop
  @ivar queue: the parent queue
190 ea03467c Iustin Pop
  @ivar id: the job ID
191 ea03467c Iustin Pop
  @type ops: list
192 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
193 ea03467c Iustin Pop
  @type log_serial: int
194 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
195 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
196 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
197 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
198 c0f6d0d8 Michael Hanselmann
  @ivar writable: Whether the job is allowed to be modified
199 e2715f69 Michael Hanselmann

200 e2715f69 Michael Hanselmann
  """
201 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
202 26d3fd2f Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
203 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
204 c0f6d0d8 Michael Hanselmann
               "__weakref__", "processor_lock", "writable"]
205 66d895a8 Iustin Pop
206 c0f6d0d8 Michael Hanselmann
  def __init__(self, queue, job_id, ops, writable):
207 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
208 ea03467c Iustin Pop

209 ea03467c Iustin Pop
    @type queue: L{JobQueue}
210 ea03467c Iustin Pop
    @param queue: our parent queue
211 ea03467c Iustin Pop
    @type job_id: job_id
212 ea03467c Iustin Pop
    @param job_id: our job id
213 ea03467c Iustin Pop
    @type ops: list
214 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
215 ea03467c Iustin Pop
        in _QueuedOpCodes
216 c0f6d0d8 Michael Hanselmann
    @type writable: bool
217 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
218 ea03467c Iustin Pop

219 ea03467c Iustin Pop
    """
220 e2715f69 Michael Hanselmann
    if not ops:
221 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
222 e2715f69 Michael Hanselmann
223 85f03e0d Michael Hanselmann
    self.queue = queue
224 f1da30e6 Michael Hanselmann
    self.id = job_id
225 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
226 6c5a7090 Michael Hanselmann
    self.log_serial = 0
227 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
228 c56ec146 Iustin Pop
    self.start_timestamp = None
229 c56ec146 Iustin Pop
    self.end_timestamp = None
230 6c5a7090 Michael Hanselmann
231 c0f6d0d8 Michael Hanselmann
    self._InitInMemory(self, writable)
232 fa4aa6b4 Michael Hanselmann
233 fa4aa6b4 Michael Hanselmann
  @staticmethod
234 c0f6d0d8 Michael Hanselmann
  def _InitInMemory(obj, writable):
235 fa4aa6b4 Michael Hanselmann
    """Initializes in-memory variables.
236 fa4aa6b4 Michael Hanselmann

237 fa4aa6b4 Michael Hanselmann
    """
238 c0f6d0d8 Michael Hanselmann
    obj.writable = writable
239 03b63608 Michael Hanselmann
    obj.ops_iter = None
240 26d3fd2f Michael Hanselmann
    obj.cur_opctx = None
241 f8a4adfa Michael Hanselmann
242 f8a4adfa Michael Hanselmann
    # Read-only jobs are not processed and therefore don't need a lock
243 f8a4adfa Michael Hanselmann
    if writable:
244 f8a4adfa Michael Hanselmann
      obj.processor_lock = threading.Lock()
245 f8a4adfa Michael Hanselmann
    else:
246 f8a4adfa Michael Hanselmann
      obj.processor_lock = None
247 be760ba8 Michael Hanselmann
248 9fa2e150 Michael Hanselmann
  def __repr__(self):
249 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
250 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
251 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
252 9fa2e150 Michael Hanselmann
253 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
254 9fa2e150 Michael Hanselmann
255 f1da30e6 Michael Hanselmann
  @classmethod
256 c0f6d0d8 Michael Hanselmann
  def Restore(cls, queue, state, writable):
257 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
258 ea03467c Iustin Pop

259 ea03467c Iustin Pop
    @type queue: L{JobQueue}
260 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
261 ea03467c Iustin Pop
    @type state: dict
262 ea03467c Iustin Pop
    @param state: the serialized state
263 c0f6d0d8 Michael Hanselmann
    @type writable: bool
264 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
265 ea03467c Iustin Pop
    @rtype: _JobQueue
266 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
267 ea03467c Iustin Pop

268 ea03467c Iustin Pop
    """
269 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
270 85f03e0d Michael Hanselmann
    obj.queue = queue
271 85f03e0d Michael Hanselmann
    obj.id = state["id"]
272 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
273 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
274 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
275 6c5a7090 Michael Hanselmann
276 6c5a7090 Michael Hanselmann
    obj.ops = []
277 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
278 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
279 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
280 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
281 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
282 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
283 6c5a7090 Michael Hanselmann
284 c0f6d0d8 Michael Hanselmann
    cls._InitInMemory(obj, writable)
285 be760ba8 Michael Hanselmann
286 f1da30e6 Michael Hanselmann
    return obj
287 f1da30e6 Michael Hanselmann
288 f1da30e6 Michael Hanselmann
  def Serialize(self):
289 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
290 ea03467c Iustin Pop

291 ea03467c Iustin Pop
    @rtype: dict
292 ea03467c Iustin Pop
    @return: the serialized state
293 ea03467c Iustin Pop

294 ea03467c Iustin Pop
    """
295 f1da30e6 Michael Hanselmann
    return {
296 f1da30e6 Michael Hanselmann
      "id": self.id,
297 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
298 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
299 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
300 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
301 f1da30e6 Michael Hanselmann
      }
302 f1da30e6 Michael Hanselmann
303 85f03e0d Michael Hanselmann
  def CalcStatus(self):
304 ea03467c Iustin Pop
    """Compute the status of this job.
305 ea03467c Iustin Pop

306 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
307 ea03467c Iustin Pop
    based on their status, computes the job status.
308 ea03467c Iustin Pop

309 ea03467c Iustin Pop
    The algorithm is:
310 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
311 ea03467c Iustin Pop
        status will be the same
312 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
313 ea03467c Iustin Pop
          - waitlock
314 fbf0262f Michael Hanselmann
          - canceling
315 ea03467c Iustin Pop
          - running
316 ea03467c Iustin Pop

317 ea03467c Iustin Pop
        will determine the job status
318 ea03467c Iustin Pop

319 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
320 ea03467c Iustin Pop
        and the job status will be the same
321 ea03467c Iustin Pop

322 ea03467c Iustin Pop
    @return: the job status
323 ea03467c Iustin Pop

324 ea03467c Iustin Pop
    """
325 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
326 e2715f69 Michael Hanselmann
327 e2715f69 Michael Hanselmann
    all_success = True
328 85f03e0d Michael Hanselmann
    for op in self.ops:
329 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
330 e2715f69 Michael Hanselmann
        continue
331 e2715f69 Michael Hanselmann
332 e2715f69 Michael Hanselmann
      all_success = False
333 e2715f69 Michael Hanselmann
334 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
335 e2715f69 Michael Hanselmann
        pass
336 47099cd1 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING:
337 47099cd1 Michael Hanselmann
        status = constants.JOB_STATUS_WAITING
338 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
339 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
340 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
341 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
342 fbf0262f Michael Hanselmann
        break
343 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
344 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
345 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
346 f1da30e6 Michael Hanselmann
        break
347 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
348 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
349 4cb1d919 Michael Hanselmann
        break
350 e2715f69 Michael Hanselmann
351 e2715f69 Michael Hanselmann
    if all_success:
352 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
353 e2715f69 Michael Hanselmann
354 e2715f69 Michael Hanselmann
    return status
355 e2715f69 Michael Hanselmann
356 8f5c488d Michael Hanselmann
  def CalcPriority(self):
357 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
358 8f5c488d Michael Hanselmann

359 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
360 8f5c488d Michael Hanselmann
    priority is used.
361 8f5c488d Michael Hanselmann

362 8f5c488d Michael Hanselmann
    @rtype: int
363 8f5c488d Michael Hanselmann

364 8f5c488d Michael Hanselmann
    """
365 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
366 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
367 8f5c488d Michael Hanselmann
368 8f5c488d Michael Hanselmann
    if not priorities:
369 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
370 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
371 8f5c488d Michael Hanselmann
372 8f5c488d Michael Hanselmann
    return min(priorities)
373 8f5c488d Michael Hanselmann
374 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
375 ea03467c Iustin Pop
    """Selectively returns the log entries.
376 ea03467c Iustin Pop

377 ea03467c Iustin Pop
    @type newer_than: None or int
378 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
379 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
380 ea03467c Iustin Pop
        than this value
381 ea03467c Iustin Pop
    @rtype: list
382 ea03467c Iustin Pop
    @return: the list of the log entries selected
383 ea03467c Iustin Pop

384 ea03467c Iustin Pop
    """
385 6c5a7090 Michael Hanselmann
    if newer_than is None:
386 6c5a7090 Michael Hanselmann
      serial = -1
387 6c5a7090 Michael Hanselmann
    else:
388 6c5a7090 Michael Hanselmann
      serial = newer_than
389 6c5a7090 Michael Hanselmann
390 6c5a7090 Michael Hanselmann
    entries = []
391 6c5a7090 Michael Hanselmann
    for op in self.ops:
392 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
393 6c5a7090 Michael Hanselmann
394 6c5a7090 Michael Hanselmann
    return entries
395 6c5a7090 Michael Hanselmann
396 6a290889 Guido Trotter
  def GetInfo(self, fields):
397 6a290889 Guido Trotter
    """Returns information about a job.
398 6a290889 Guido Trotter

399 6a290889 Guido Trotter
    @type fields: list
400 6a290889 Guido Trotter
    @param fields: names of fields to return
401 6a290889 Guido Trotter
    @rtype: list
402 6a290889 Guido Trotter
    @return: list with one element for each field
403 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
404 6a290889 Guido Trotter
        has been passed
405 6a290889 Guido Trotter

406 6a290889 Guido Trotter
    """
407 a06c6ae8 Michael Hanselmann
    return _SimpleJobQuery(fields)(self)
408 6a290889 Guido Trotter
409 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
410 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
411 34327f51 Iustin Pop

412 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
413 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
414 34327f51 Iustin Pop
    finalised are not changed.
415 34327f51 Iustin Pop

416 34327f51 Iustin Pop
    @param status: a given opcode status
417 34327f51 Iustin Pop
    @param result: the opcode result
418 34327f51 Iustin Pop

419 34327f51 Iustin Pop
    """
420 747f6113 Michael Hanselmann
    not_marked = True
421 747f6113 Michael Hanselmann
    for op in self.ops:
422 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
423 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
424 747f6113 Michael Hanselmann
        continue
425 747f6113 Michael Hanselmann
      op.status = status
426 747f6113 Michael Hanselmann
      op.result = result
427 747f6113 Michael Hanselmann
      not_marked = False
428 34327f51 Iustin Pop
429 66bd7445 Michael Hanselmann
  def Finalize(self):
430 66bd7445 Michael Hanselmann
    """Marks the job as finalized.
431 66bd7445 Michael Hanselmann

432 66bd7445 Michael Hanselmann
    """
433 66bd7445 Michael Hanselmann
    self.end_timestamp = TimeStampNow()
434 66bd7445 Michael Hanselmann
435 099b2870 Michael Hanselmann
  def Cancel(self):
436 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
437 a0d2fe2c Michael Hanselmann

438 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
439 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
440 a0d2fe2c Michael Hanselmann
      as canceling and a text message
441 a0d2fe2c Michael Hanselmann

442 a0d2fe2c Michael Hanselmann
    """
443 099b2870 Michael Hanselmann
    status = self.CalcStatus()
444 099b2870 Michael Hanselmann
445 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
446 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
447 099b2870 Michael Hanselmann
                             "Job canceled by request")
448 66bd7445 Michael Hanselmann
      self.Finalize()
449 86b16e9d Michael Hanselmann
      return (True, "Job %s canceled" % self.id)
450 099b2870 Michael Hanselmann
451 47099cd1 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITING:
452 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
453 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
454 86b16e9d Michael Hanselmann
      return (True, "Job %s will be canceled" % self.id)
455 099b2870 Michael Hanselmann
456 86b16e9d Michael Hanselmann
    else:
457 86b16e9d Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
458 86b16e9d Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
459 099b2870 Michael Hanselmann
460 f1048938 Iustin Pop
461 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
462 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
463 031a3e57 Michael Hanselmann
    """Initializes this class.
464 ea03467c Iustin Pop

465 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
466 031a3e57 Michael Hanselmann
    @param queue: Job queue
467 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
468 031a3e57 Michael Hanselmann
    @param job: Job object
469 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
470 031a3e57 Michael Hanselmann
    @param op: OpCode
471 031a3e57 Michael Hanselmann

472 031a3e57 Michael Hanselmann
    """
473 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
474 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
475 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
476 031a3e57 Michael Hanselmann
477 031a3e57 Michael Hanselmann
    self._queue = queue
478 031a3e57 Michael Hanselmann
    self._job = job
479 031a3e57 Michael Hanselmann
    self._op = op
480 031a3e57 Michael Hanselmann
481 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
482 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
483 dc1e2262 Michael Hanselmann

484 dc1e2262 Michael Hanselmann
    """
485 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
486 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
487 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
488 dc1e2262 Michael Hanselmann
      raise CancelJob()
489 dc1e2262 Michael Hanselmann
490 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
491 031a3e57 Michael Hanselmann
  def NotifyStart(self):
492 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
493 e92376d7 Iustin Pop

494 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
495 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
496 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
497 47099cd1 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
498 e92376d7 Iustin Pop

499 e92376d7 Iustin Pop
    """
500 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
501 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
502 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
503 fbf0262f Michael Hanselmann
504 271daef8 Iustin Pop
    # Cancel here if we were asked to
505 dc1e2262 Michael Hanselmann
    self._CheckCancel()
506 fbf0262f Michael Hanselmann
507 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
508 9bdab621 Michael Hanselmann
509 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
510 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
511 271daef8 Iustin Pop
512 271daef8 Iustin Pop
    # And finally replicate the job status
513 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
514 031a3e57 Michael Hanselmann
515 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
516 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
517 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
518 9bf5e01f Guido Trotter

519 9bf5e01f Guido Trotter
    """
520 9bf5e01f Guido Trotter
    self._job.log_serial += 1
521 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
522 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
523 9bf5e01f Guido Trotter
524 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
525 031a3e57 Michael Hanselmann
    """Append a log entry.
526 031a3e57 Michael Hanselmann

527 031a3e57 Michael Hanselmann
    """
528 031a3e57 Michael Hanselmann
    assert len(args) < 3
529 031a3e57 Michael Hanselmann
530 031a3e57 Michael Hanselmann
    if len(args) == 1:
531 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
532 031a3e57 Michael Hanselmann
      log_msg = args[0]
533 031a3e57 Michael Hanselmann
    else:
534 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
535 031a3e57 Michael Hanselmann
536 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
537 031a3e57 Michael Hanselmann
    # precision.
538 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
539 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
540 031a3e57 Michael Hanselmann
541 acf931b7 Michael Hanselmann
  def CheckCancel(self):
542 acf931b7 Michael Hanselmann
    """Check whether job has been cancelled.
543 ef2df7d3 Michael Hanselmann

544 ef2df7d3 Michael Hanselmann
    """
545 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
546 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
547 dc1e2262 Michael Hanselmann
548 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
549 dc1e2262 Michael Hanselmann
    self._CheckCancel()
550 dc1e2262 Michael Hanselmann
551 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
552 6a373640 Michael Hanselmann
    """Submits jobs for processing.
553 6a373640 Michael Hanselmann

554 6a373640 Michael Hanselmann
    See L{JobQueue.SubmitManyJobs}.
555 6a373640 Michael Hanselmann

556 6a373640 Michael Hanselmann
    """
557 6a373640 Michael Hanselmann
    # Locking is done in job queue
558 6a373640 Michael Hanselmann
    return self._queue.SubmitManyJobs(jobs)
559 6a373640 Michael Hanselmann
560 031a3e57 Michael Hanselmann
561 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
562 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
563 989a8bee Michael Hanselmann
    """Initializes this class.
564 6c2549d6 Guido Trotter

565 989a8bee Michael Hanselmann
    @type fields: list of strings
566 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
567 989a8bee Michael Hanselmann
    @type prev_job_info: string
568 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
569 989a8bee Michael Hanselmann
    @type prev_log_serial: string
570 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
571 6c2549d6 Guido Trotter

572 989a8bee Michael Hanselmann
    """
573 dc2879ea Michael Hanselmann
    self._squery = _SimpleJobQuery(fields)
574 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
575 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
576 6c2549d6 Guido Trotter
577 989a8bee Michael Hanselmann
  def __call__(self, job):
578 989a8bee Michael Hanselmann
    """Checks whether job has changed.
579 6c2549d6 Guido Trotter

580 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
581 989a8bee Michael Hanselmann
    @param job: Job object
582 6c2549d6 Guido Trotter

583 6c2549d6 Guido Trotter
    """
584 c0f6d0d8 Michael Hanselmann
    assert not job.writable, "Expected read-only job"
585 c0f6d0d8 Michael Hanselmann
586 989a8bee Michael Hanselmann
    status = job.CalcStatus()
587 dc2879ea Michael Hanselmann
    job_info = self._squery(job)
588 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
589 6c2549d6 Guido Trotter
590 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
591 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
592 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
593 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
594 6c2549d6 Guido Trotter
    # significantly different.
595 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
596 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
597 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
598 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
599 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
600 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
601 6c2549d6 Guido Trotter
602 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
603 6c2549d6 Guido Trotter
    # no changes.
604 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
605 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
606 47099cd1 Michael Hanselmann
                       constants.JOB_STATUS_WAITING) or
607 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
608 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
609 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
610 989a8bee Michael Hanselmann
      return (job_info, log_entries)
611 6c2549d6 Guido Trotter
612 989a8bee Michael Hanselmann
    return None
613 989a8bee Michael Hanselmann
614 989a8bee Michael Hanselmann
615 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
616 989a8bee Michael Hanselmann
  def __init__(self, filename):
617 989a8bee Michael Hanselmann
    """Initializes this class.
618 989a8bee Michael Hanselmann

619 989a8bee Michael Hanselmann
    @type filename: string
620 989a8bee Michael Hanselmann
    @param filename: Path to job file
621 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
622 6c2549d6 Guido Trotter

623 989a8bee Michael Hanselmann
    """
624 989a8bee Michael Hanselmann
    self._wm = pyinotify.WatchManager()
625 989a8bee Michael Hanselmann
    self._inotify_handler = \
626 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
627 989a8bee Michael Hanselmann
    self._notifier = \
628 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
629 989a8bee Michael Hanselmann
    try:
630 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
631 989a8bee Michael Hanselmann
    except Exception:
632 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
633 989a8bee Michael Hanselmann
      self._notifier.stop()
634 989a8bee Michael Hanselmann
      raise
635 989a8bee Michael Hanselmann
636 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
637 989a8bee Michael Hanselmann
    """Callback for inotify.
638 989a8bee Michael Hanselmann

639 989a8bee Michael Hanselmann
    """
640 6c2549d6 Guido Trotter
    if not notifier_enabled:
641 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
642 989a8bee Michael Hanselmann
643 989a8bee Michael Hanselmann
  def Wait(self, timeout):
644 989a8bee Michael Hanselmann
    """Waits for the job file to change.
645 989a8bee Michael Hanselmann

646 989a8bee Michael Hanselmann
    @type timeout: float
647 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
648 989a8bee Michael Hanselmann
    @return: Whether there have been events
649 989a8bee Michael Hanselmann

650 989a8bee Michael Hanselmann
    """
651 989a8bee Michael Hanselmann
    assert timeout >= 0
652 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
653 989a8bee Michael Hanselmann
    if have_events:
654 989a8bee Michael Hanselmann
      self._notifier.read_events()
655 989a8bee Michael Hanselmann
    self._notifier.process_events()
656 989a8bee Michael Hanselmann
    return have_events
657 989a8bee Michael Hanselmann
658 989a8bee Michael Hanselmann
  def Close(self):
659 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
660 989a8bee Michael Hanselmann

661 989a8bee Michael Hanselmann
    """
662 989a8bee Michael Hanselmann
    self._notifier.stop()
663 989a8bee Michael Hanselmann
664 989a8bee Michael Hanselmann
665 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
666 989a8bee Michael Hanselmann
  def __init__(self, filename):
667 989a8bee Michael Hanselmann
    """Initializes this class.
668 989a8bee Michael Hanselmann

669 989a8bee Michael Hanselmann
    @type filename: string
670 989a8bee Michael Hanselmann
    @param filename: Path to job file
671 989a8bee Michael Hanselmann

672 989a8bee Michael Hanselmann
    """
673 989a8bee Michael Hanselmann
    self._filewaiter = None
674 989a8bee Michael Hanselmann
    self._filename = filename
675 6c2549d6 Guido Trotter
676 989a8bee Michael Hanselmann
  def Wait(self, timeout):
677 989a8bee Michael Hanselmann
    """Waits for a job to change.
678 6c2549d6 Guido Trotter

679 989a8bee Michael Hanselmann
    @type timeout: float
680 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
681 989a8bee Michael Hanselmann
    @return: Whether there have been events
682 989a8bee Michael Hanselmann

683 989a8bee Michael Hanselmann
    """
684 989a8bee Michael Hanselmann
    if self._filewaiter:
685 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
686 989a8bee Michael Hanselmann
687 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
688 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
689 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
690 989a8bee Michael Hanselmann
    # race condition.
691 989a8bee Michael Hanselmann
    self._filewaiter = _JobFileChangesWaiter(self._filename)
692 989a8bee Michael Hanselmann
693 989a8bee Michael Hanselmann
    return True
694 989a8bee Michael Hanselmann
695 989a8bee Michael Hanselmann
  def Close(self):
696 989a8bee Michael Hanselmann
    """Closes underlying waiter.
697 989a8bee Michael Hanselmann

698 989a8bee Michael Hanselmann
    """
699 989a8bee Michael Hanselmann
    if self._filewaiter:
700 989a8bee Michael Hanselmann
      self._filewaiter.Close()
701 989a8bee Michael Hanselmann
702 989a8bee Michael Hanselmann
703 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
704 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
705 989a8bee Michael Hanselmann

706 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
707 989a8bee Michael Hanselmann
  the current job status has changed.
708 989a8bee Michael Hanselmann

709 989a8bee Michael Hanselmann
  """
710 989a8bee Michael Hanselmann
  @staticmethod
711 dfc8824a Michael Hanselmann
  def _CheckForChanges(counter, job_load_fn, check_fn):
712 dfc8824a Michael Hanselmann
    if counter.next() > 0:
713 dfc8824a Michael Hanselmann
      # If this isn't the first check the job is given some more time to change
714 dfc8824a Michael Hanselmann
      # again. This gives better performance for jobs generating many
715 dfc8824a Michael Hanselmann
      # changes/messages.
716 dfc8824a Michael Hanselmann
      time.sleep(0.1)
717 dfc8824a Michael Hanselmann
718 989a8bee Michael Hanselmann
    job = job_load_fn()
719 989a8bee Michael Hanselmann
    if not job:
720 989a8bee Michael Hanselmann
      raise errors.JobLost()
721 989a8bee Michael Hanselmann
722 989a8bee Michael Hanselmann
    result = check_fn(job)
723 989a8bee Michael Hanselmann
    if result is None:
724 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
725 989a8bee Michael Hanselmann
726 989a8bee Michael Hanselmann
    return result
727 989a8bee Michael Hanselmann
728 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
729 989a8bee Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout):
730 989a8bee Michael Hanselmann
    """Waits for changes on a job.
731 989a8bee Michael Hanselmann

732 989a8bee Michael Hanselmann
    @type filename: string
733 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
734 989a8bee Michael Hanselmann
    @type job_load_fn: callable
735 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
736 989a8bee Michael Hanselmann
    @type fields: list of strings
737 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
738 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
739 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
740 989a8bee Michael Hanselmann
    @type prev_log_serial: int
741 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
742 989a8bee Michael Hanselmann
    @type timeout: float
743 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
744 989a8bee Michael Hanselmann

745 989a8bee Michael Hanselmann
    """
746 dfc8824a Michael Hanselmann
    counter = itertools.count()
747 6c2549d6 Guido Trotter
    try:
748 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
749 989a8bee Michael Hanselmann
      waiter = _JobChangesWaiter(filename)
750 989a8bee Michael Hanselmann
      try:
751 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
752 dfc8824a Michael Hanselmann
                                          counter, job_load_fn, check_fn),
753 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
754 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
755 989a8bee Michael Hanselmann
      finally:
756 989a8bee Michael Hanselmann
        waiter.Close()
757 6c2549d6 Guido Trotter
    except (errors.InotifyError, errors.JobLost):
758 6c2549d6 Guido Trotter
      return None
759 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
760 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
761 6c2549d6 Guido Trotter
762 6c2549d6 Guido Trotter
763 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
764 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
765 6760e4ed Michael Hanselmann

766 6760e4ed Michael Hanselmann
  """
767 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
768 6760e4ed Michael Hanselmann
    to_encode = err
769 6760e4ed Michael Hanselmann
  else:
770 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
771 6760e4ed Michael Hanselmann
772 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
773 6760e4ed Michael Hanselmann
774 6760e4ed Michael Hanselmann
775 26d3fd2f Michael Hanselmann
class _TimeoutStrategyWrapper:
776 26d3fd2f Michael Hanselmann
  def __init__(self, fn):
777 26d3fd2f Michael Hanselmann
    """Initializes this class.
778 26d3fd2f Michael Hanselmann

779 26d3fd2f Michael Hanselmann
    """
780 26d3fd2f Michael Hanselmann
    self._fn = fn
781 26d3fd2f Michael Hanselmann
    self._next = None
782 26d3fd2f Michael Hanselmann
783 26d3fd2f Michael Hanselmann
  def _Advance(self):
784 26d3fd2f Michael Hanselmann
    """Gets the next timeout if necessary.
785 26d3fd2f Michael Hanselmann

786 26d3fd2f Michael Hanselmann
    """
787 26d3fd2f Michael Hanselmann
    if self._next is None:
788 26d3fd2f Michael Hanselmann
      self._next = self._fn()
789 26d3fd2f Michael Hanselmann
790 26d3fd2f Michael Hanselmann
  def Peek(self):
791 26d3fd2f Michael Hanselmann
    """Returns the next timeout.
792 26d3fd2f Michael Hanselmann

793 26d3fd2f Michael Hanselmann
    """
794 26d3fd2f Michael Hanselmann
    self._Advance()
795 26d3fd2f Michael Hanselmann
    return self._next
796 26d3fd2f Michael Hanselmann
797 26d3fd2f Michael Hanselmann
  def Next(self):
798 26d3fd2f Michael Hanselmann
    """Returns the current timeout and advances the internal state.
799 26d3fd2f Michael Hanselmann

800 26d3fd2f Michael Hanselmann
    """
801 26d3fd2f Michael Hanselmann
    self._Advance()
802 26d3fd2f Michael Hanselmann
    result = self._next
803 26d3fd2f Michael Hanselmann
    self._next = None
804 26d3fd2f Michael Hanselmann
    return result
805 26d3fd2f Michael Hanselmann
806 26d3fd2f Michael Hanselmann
807 b80cc518 Michael Hanselmann
class _OpExecContext:
808 26d3fd2f Michael Hanselmann
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
809 b80cc518 Michael Hanselmann
    """Initializes this class.
810 b80cc518 Michael Hanselmann

811 b80cc518 Michael Hanselmann
    """
812 b80cc518 Michael Hanselmann
    self.op = op
813 b80cc518 Michael Hanselmann
    self.index = index
814 b80cc518 Michael Hanselmann
    self.log_prefix = log_prefix
815 b80cc518 Michael Hanselmann
    self.summary = op.input.Summary()
816 b80cc518 Michael Hanselmann
817 b95479a5 Michael Hanselmann
    # Create local copy to modify
818 b95479a5 Michael Hanselmann
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
819 b95479a5 Michael Hanselmann
      self.jobdeps = op.input.depends[:]
820 b95479a5 Michael Hanselmann
    else:
821 b95479a5 Michael Hanselmann
      self.jobdeps = None
822 b95479a5 Michael Hanselmann
823 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = timeout_strategy_factory
824 26d3fd2f Michael Hanselmann
    self._ResetTimeoutStrategy()
825 26d3fd2f Michael Hanselmann
826 26d3fd2f Michael Hanselmann
  def _ResetTimeoutStrategy(self):
827 26d3fd2f Michael Hanselmann
    """Creates a new timeout strategy.
828 26d3fd2f Michael Hanselmann

829 26d3fd2f Michael Hanselmann
    """
830 26d3fd2f Michael Hanselmann
    self._timeout_strategy = \
831 26d3fd2f Michael Hanselmann
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
832 26d3fd2f Michael Hanselmann
833 26d3fd2f Michael Hanselmann
  def CheckPriorityIncrease(self):
834 26d3fd2f Michael Hanselmann
    """Checks whether priority can and should be increased.
835 26d3fd2f Michael Hanselmann

836 26d3fd2f Michael Hanselmann
    Called when locks couldn't be acquired.
837 26d3fd2f Michael Hanselmann

838 26d3fd2f Michael Hanselmann
    """
839 26d3fd2f Michael Hanselmann
    op = self.op
840 26d3fd2f Michael Hanselmann
841 26d3fd2f Michael Hanselmann
    # Exhausted all retries and next round should not use blocking acquire
842 26d3fd2f Michael Hanselmann
    # for locks?
843 26d3fd2f Michael Hanselmann
    if (self._timeout_strategy.Peek() is None and
844 26d3fd2f Michael Hanselmann
        op.priority > constants.OP_PRIO_HIGHEST):
845 26d3fd2f Michael Hanselmann
      logging.debug("Increasing priority")
846 26d3fd2f Michael Hanselmann
      op.priority -= 1
847 26d3fd2f Michael Hanselmann
      self._ResetTimeoutStrategy()
848 26d3fd2f Michael Hanselmann
      return True
849 26d3fd2f Michael Hanselmann
850 26d3fd2f Michael Hanselmann
    return False
851 26d3fd2f Michael Hanselmann
852 26d3fd2f Michael Hanselmann
  def GetNextLockTimeout(self):
853 26d3fd2f Michael Hanselmann
    """Returns the next lock acquire timeout.
854 26d3fd2f Michael Hanselmann

855 26d3fd2f Michael Hanselmann
    """
856 26d3fd2f Michael Hanselmann
    return self._timeout_strategy.Next()
857 26d3fd2f Michael Hanselmann
858 b80cc518 Michael Hanselmann
859 be760ba8 Michael Hanselmann
class _JobProcessor(object):
860 75d81fc8 Michael Hanselmann
  (DEFER,
861 75d81fc8 Michael Hanselmann
   WAITDEP,
862 75d81fc8 Michael Hanselmann
   FINISHED) = range(1, 4)
863 75d81fc8 Michael Hanselmann
864 26d3fd2f Michael Hanselmann
  def __init__(self, queue, opexec_fn, job,
865 26d3fd2f Michael Hanselmann
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
866 be760ba8 Michael Hanselmann
    """Initializes this class.
867 be760ba8 Michael Hanselmann

868 be760ba8 Michael Hanselmann
    """
869 be760ba8 Michael Hanselmann
    self.queue = queue
870 be760ba8 Michael Hanselmann
    self.opexec_fn = opexec_fn
871 be760ba8 Michael Hanselmann
    self.job = job
872 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = _timeout_strategy_factory
873 be760ba8 Michael Hanselmann
874 be760ba8 Michael Hanselmann
  @staticmethod
875 26d3fd2f Michael Hanselmann
  def _FindNextOpcode(job, timeout_strategy_factory):
876 be760ba8 Michael Hanselmann
    """Locates the next opcode to run.
877 be760ba8 Michael Hanselmann

878 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
879 be760ba8 Michael Hanselmann
    @param job: Job object
880 26d3fd2f Michael Hanselmann
    @param timeout_strategy_factory: Callable to create new timeout strategy
881 be760ba8 Michael Hanselmann

882 be760ba8 Michael Hanselmann
    """
883 be760ba8 Michael Hanselmann
    # Create some sort of a cache to speed up locating next opcode for future
884 be760ba8 Michael Hanselmann
    # lookups
885 be760ba8 Michael Hanselmann
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
886 be760ba8 Michael Hanselmann
    # pending and one for processed ops.
887 03b63608 Michael Hanselmann
    if job.ops_iter is None:
888 03b63608 Michael Hanselmann
      job.ops_iter = enumerate(job.ops)
889 be760ba8 Michael Hanselmann
890 be760ba8 Michael Hanselmann
    # Find next opcode to run
891 be760ba8 Michael Hanselmann
    while True:
892 be760ba8 Michael Hanselmann
      try:
893 03b63608 Michael Hanselmann
        (idx, op) = job.ops_iter.next()
894 be760ba8 Michael Hanselmann
      except StopIteration:
895 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for a finished job")
896 be760ba8 Michael Hanselmann
897 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_RUNNING:
898 be760ba8 Michael Hanselmann
        # Found an opcode already marked as running
899 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for job marked as running")
900 be760ba8 Michael Hanselmann
901 26d3fd2f Michael Hanselmann
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
902 26d3fd2f Michael Hanselmann
                             timeout_strategy_factory)
903 be760ba8 Michael Hanselmann
904 66bd7445 Michael Hanselmann
      if op.status not in constants.OPS_FINALIZED:
905 66bd7445 Michael Hanselmann
        return opctx
906 be760ba8 Michael Hanselmann
907 66bd7445 Michael Hanselmann
      # This is a job that was partially completed before master daemon
908 66bd7445 Michael Hanselmann
      # shutdown, so it can be expected that some opcodes are already
909 66bd7445 Michael Hanselmann
      # completed successfully (if any did error out, then the whole job
910 66bd7445 Michael Hanselmann
      # should have been aborted and not resubmitted for processing).
911 66bd7445 Michael Hanselmann
      logging.info("%s: opcode %s already processed, skipping",
912 66bd7445 Michael Hanselmann
                   opctx.log_prefix, opctx.summary)
913 be760ba8 Michael Hanselmann
914 be760ba8 Michael Hanselmann
  @staticmethod
915 be760ba8 Michael Hanselmann
  def _MarkWaitlock(job, op):
916 be760ba8 Michael Hanselmann
    """Marks an opcode as waiting for locks.
917 be760ba8 Michael Hanselmann

918 be760ba8 Michael Hanselmann
    The job's start timestamp is also set if necessary.
919 be760ba8 Michael Hanselmann

920 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
921 be760ba8 Michael Hanselmann
    @param job: Job object
922 a38e8674 Michael Hanselmann
    @type op: L{_QueuedOpCode}
923 a38e8674 Michael Hanselmann
    @param op: Opcode object
924 be760ba8 Michael Hanselmann

925 be760ba8 Michael Hanselmann
    """
926 be760ba8 Michael Hanselmann
    assert op in job.ops
927 5fd6b694 Michael Hanselmann
    assert op.status in (constants.OP_STATUS_QUEUED,
928 47099cd1 Michael Hanselmann
                         constants.OP_STATUS_WAITING)
929 5fd6b694 Michael Hanselmann
930 5fd6b694 Michael Hanselmann
    update = False
931 be760ba8 Michael Hanselmann
932 be760ba8 Michael Hanselmann
    op.result = None
933 5fd6b694 Michael Hanselmann
934 5fd6b694 Michael Hanselmann
    if op.status == constants.OP_STATUS_QUEUED:
935 47099cd1 Michael Hanselmann
      op.status = constants.OP_STATUS_WAITING
936 5fd6b694 Michael Hanselmann
      update = True
937 5fd6b694 Michael Hanselmann
938 5fd6b694 Michael Hanselmann
    if op.start_timestamp is None:
939 5fd6b694 Michael Hanselmann
      op.start_timestamp = TimeStampNow()
940 5fd6b694 Michael Hanselmann
      update = True
941 be760ba8 Michael Hanselmann
942 be760ba8 Michael Hanselmann
    if job.start_timestamp is None:
943 be760ba8 Michael Hanselmann
      job.start_timestamp = op.start_timestamp
944 5fd6b694 Michael Hanselmann
      update = True
945 5fd6b694 Michael Hanselmann
946 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
947 5fd6b694 Michael Hanselmann
948 5fd6b694 Michael Hanselmann
    return update
949 be760ba8 Michael Hanselmann
950 b95479a5 Michael Hanselmann
  @staticmethod
951 b95479a5 Michael Hanselmann
  def _CheckDependencies(queue, job, opctx):
952 b95479a5 Michael Hanselmann
    """Checks if an opcode has dependencies and if so, processes them.
953 b95479a5 Michael Hanselmann

954 b95479a5 Michael Hanselmann
    @type queue: L{JobQueue}
955 b95479a5 Michael Hanselmann
    @param queue: Queue object
956 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
957 b95479a5 Michael Hanselmann
    @param job: Job object
958 b95479a5 Michael Hanselmann
    @type opctx: L{_OpExecContext}
959 b95479a5 Michael Hanselmann
    @param opctx: Opcode execution context
960 b95479a5 Michael Hanselmann
    @rtype: bool
961 b95479a5 Michael Hanselmann
    @return: Whether opcode will be re-scheduled by dependency tracker
962 b95479a5 Michael Hanselmann

963 b95479a5 Michael Hanselmann
    """
964 b95479a5 Michael Hanselmann
    op = opctx.op
965 b95479a5 Michael Hanselmann
966 b95479a5 Michael Hanselmann
    result = False
967 b95479a5 Michael Hanselmann
968 b95479a5 Michael Hanselmann
    while opctx.jobdeps:
969 b95479a5 Michael Hanselmann
      (dep_job_id, dep_status) = opctx.jobdeps[0]
970 b95479a5 Michael Hanselmann
971 b95479a5 Michael Hanselmann
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
972 b95479a5 Michael Hanselmann
                                                          dep_status)
973 b95479a5 Michael Hanselmann
      assert ht.TNonEmptyString(depmsg), "No dependency message"
974 b95479a5 Michael Hanselmann
975 b95479a5 Michael Hanselmann
      logging.info("%s: %s", opctx.log_prefix, depmsg)
976 b95479a5 Michael Hanselmann
977 b95479a5 Michael Hanselmann
      if depresult == _JobDependencyManager.CONTINUE:
978 b95479a5 Michael Hanselmann
        # Remove dependency and continue
979 b95479a5 Michael Hanselmann
        opctx.jobdeps.pop(0)
980 b95479a5 Michael Hanselmann
981 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.WAIT:
982 b95479a5 Michael Hanselmann
        # Need to wait for notification, dependency tracker will re-add job
983 b95479a5 Michael Hanselmann
        # to workerpool
984 b95479a5 Michael Hanselmann
        result = True
985 b95479a5 Michael Hanselmann
        break
986 b95479a5 Michael Hanselmann
987 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.CANCEL:
988 b95479a5 Michael Hanselmann
        # Job was cancelled, cancel this job as well
989 b95479a5 Michael Hanselmann
        job.Cancel()
990 b95479a5 Michael Hanselmann
        assert op.status == constants.OP_STATUS_CANCELING
991 b95479a5 Michael Hanselmann
        break
992 b95479a5 Michael Hanselmann
993 b95479a5 Michael Hanselmann
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
994 b95479a5 Michael Hanselmann
                         _JobDependencyManager.ERROR):
995 b95479a5 Michael Hanselmann
        # Job failed or there was an error, this job must fail
996 b95479a5 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
997 b95479a5 Michael Hanselmann
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
998 b95479a5 Michael Hanselmann
        break
999 b95479a5 Michael Hanselmann
1000 b95479a5 Michael Hanselmann
      else:
1001 b95479a5 Michael Hanselmann
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1002 b95479a5 Michael Hanselmann
                                     depresult)
1003 b95479a5 Michael Hanselmann
1004 b95479a5 Michael Hanselmann
    return result
1005 b95479a5 Michael Hanselmann
1006 b80cc518 Michael Hanselmann
  def _ExecOpCodeUnlocked(self, opctx):
1007 be760ba8 Michael Hanselmann
    """Processes one opcode and returns the result.
1008 be760ba8 Michael Hanselmann

1009 be760ba8 Michael Hanselmann
    """
1010 b80cc518 Michael Hanselmann
    op = opctx.op
1011 b80cc518 Michael Hanselmann
1012 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1013 be760ba8 Michael Hanselmann
1014 26d3fd2f Michael Hanselmann
    timeout = opctx.GetNextLockTimeout()
1015 26d3fd2f Michael Hanselmann
1016 be760ba8 Michael Hanselmann
    try:
1017 be760ba8 Michael Hanselmann
      # Make sure not to hold queue lock while calling ExecOpCode
1018 be760ba8 Michael Hanselmann
      result = self.opexec_fn(op.input,
1019 26d3fd2f Michael Hanselmann
                              _OpExecCallbacks(self.queue, self.job, op),
1020 f23db633 Michael Hanselmann
                              timeout=timeout, priority=op.priority)
1021 26d3fd2f Michael Hanselmann
    except mcpu.LockAcquireTimeout:
1022 26d3fd2f Michael Hanselmann
      assert timeout is not None, "Received timeout for blocking acquire"
1023 26d3fd2f Michael Hanselmann
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1024 9e49dfc5 Michael Hanselmann
1025 47099cd1 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_WAITING,
1026 9e49dfc5 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1027 9e49dfc5 Michael Hanselmann
1028 9e49dfc5 Michael Hanselmann
      # Was job cancelled while we were waiting for the lock?
1029 9e49dfc5 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELING:
1030 9e49dfc5 Michael Hanselmann
        return (constants.OP_STATUS_CANCELING, None)
1031 9e49dfc5 Michael Hanselmann
1032 5fd6b694 Michael Hanselmann
      # Stay in waitlock while trying to re-acquire lock
1033 47099cd1 Michael Hanselmann
      return (constants.OP_STATUS_WAITING, None)
1034 be760ba8 Michael Hanselmann
    except CancelJob:
1035 b80cc518 Michael Hanselmann
      logging.exception("%s: Canceling job", opctx.log_prefix)
1036 be760ba8 Michael Hanselmann
      assert op.status == constants.OP_STATUS_CANCELING
1037 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_CANCELING, None)
1038 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1039 b80cc518 Michael Hanselmann
      logging.exception("%s: Caught exception in %s",
1040 b80cc518 Michael Hanselmann
                        opctx.log_prefix, opctx.summary)
1041 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1042 be760ba8 Michael Hanselmann
    else:
1043 b80cc518 Michael Hanselmann
      logging.debug("%s: %s successful",
1044 b80cc518 Michael Hanselmann
                    opctx.log_prefix, opctx.summary)
1045 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_SUCCESS, result)
1046 be760ba8 Michael Hanselmann
1047 26d3fd2f Michael Hanselmann
  def __call__(self, _nextop_fn=None):
1048 be760ba8 Michael Hanselmann
    """Continues execution of a job.
1049 be760ba8 Michael Hanselmann

1050 26d3fd2f Michael Hanselmann
    @param _nextop_fn: Callback function for tests
1051 75d81fc8 Michael Hanselmann
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1052 75d81fc8 Michael Hanselmann
      be deferred and C{WAITDEP} if the dependency manager
1053 75d81fc8 Michael Hanselmann
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1054 be760ba8 Michael Hanselmann

1055 be760ba8 Michael Hanselmann
    """
1056 be760ba8 Michael Hanselmann
    queue = self.queue
1057 be760ba8 Michael Hanselmann
    job = self.job
1058 be760ba8 Michael Hanselmann
1059 be760ba8 Michael Hanselmann
    logging.debug("Processing job %s", job.id)
1060 be760ba8 Michael Hanselmann
1061 be760ba8 Michael Hanselmann
    queue.acquire(shared=1)
1062 be760ba8 Michael Hanselmann
    try:
1063 be760ba8 Michael Hanselmann
      opcount = len(job.ops)
1064 be760ba8 Michael Hanselmann
1065 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Expected writable job"
1066 c0f6d0d8 Michael Hanselmann
1067 66bd7445 Michael Hanselmann
      # Don't do anything for finalized jobs
1068 66bd7445 Michael Hanselmann
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1069 75d81fc8 Michael Hanselmann
        return self.FINISHED
1070 66bd7445 Michael Hanselmann
1071 26d3fd2f Michael Hanselmann
      # Is a previous opcode still pending?
1072 26d3fd2f Michael Hanselmann
      if job.cur_opctx:
1073 26d3fd2f Michael Hanselmann
        opctx = job.cur_opctx
1074 5fd6b694 Michael Hanselmann
        job.cur_opctx = None
1075 26d3fd2f Michael Hanselmann
      else:
1076 26d3fd2f Michael Hanselmann
        if __debug__ and _nextop_fn:
1077 26d3fd2f Michael Hanselmann
          _nextop_fn()
1078 26d3fd2f Michael Hanselmann
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1079 26d3fd2f Michael Hanselmann
1080 b80cc518 Michael Hanselmann
      op = opctx.op
1081 be760ba8 Michael Hanselmann
1082 be760ba8 Michael Hanselmann
      # Consistency check
1083 be760ba8 Michael Hanselmann
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1084 66bd7445 Michael Hanselmann
                                     constants.OP_STATUS_CANCELING)
1085 5fd6b694 Michael Hanselmann
                        for i in job.ops[opctx.index + 1:])
1086 be760ba8 Michael Hanselmann
1087 be760ba8 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_QUEUED,
1088 47099cd1 Michael Hanselmann
                           constants.OP_STATUS_WAITING,
1089 66bd7445 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1090 be760ba8 Michael Hanselmann
1091 26d3fd2f Michael Hanselmann
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1092 26d3fd2f Michael Hanselmann
              op.priority >= constants.OP_PRIO_HIGHEST)
1093 26d3fd2f Michael Hanselmann
1094 b95479a5 Michael Hanselmann
      waitjob = None
1095 b95479a5 Michael Hanselmann
1096 66bd7445 Michael Hanselmann
      if op.status != constants.OP_STATUS_CANCELING:
1097 30c945d0 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
1098 47099cd1 Michael Hanselmann
                             constants.OP_STATUS_WAITING)
1099 30c945d0 Michael Hanselmann
1100 be760ba8 Michael Hanselmann
        # Prepare to start opcode
1101 5fd6b694 Michael Hanselmann
        if self._MarkWaitlock(job, op):
1102 5fd6b694 Michael Hanselmann
          # Write to disk
1103 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1104 be760ba8 Michael Hanselmann
1105 47099cd1 Michael Hanselmann
        assert op.status == constants.OP_STATUS_WAITING
1106 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1107 5fd6b694 Michael Hanselmann
        assert job.start_timestamp and op.start_timestamp
1108 b95479a5 Michael Hanselmann
        assert waitjob is None
1109 b95479a5 Michael Hanselmann
1110 b95479a5 Michael Hanselmann
        # Check if waiting for a job is necessary
1111 b95479a5 Michael Hanselmann
        waitjob = self._CheckDependencies(queue, job, opctx)
1112 be760ba8 Michael Hanselmann
1113 47099cd1 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_WAITING,
1114 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_CANCELING,
1115 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_ERROR)
1116 be760ba8 Michael Hanselmann
1117 b95479a5 Michael Hanselmann
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1118 b95479a5 Michael Hanselmann
                                         constants.OP_STATUS_ERROR)):
1119 b95479a5 Michael Hanselmann
          logging.info("%s: opcode %s waiting for locks",
1120 b95479a5 Michael Hanselmann
                       opctx.log_prefix, opctx.summary)
1121 be760ba8 Michael Hanselmann
1122 b95479a5 Michael Hanselmann
          assert not opctx.jobdeps, "Not all dependencies were removed"
1123 b95479a5 Michael Hanselmann
1124 b95479a5 Michael Hanselmann
          queue.release()
1125 b95479a5 Michael Hanselmann
          try:
1126 b95479a5 Michael Hanselmann
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1127 b95479a5 Michael Hanselmann
          finally:
1128 b95479a5 Michael Hanselmann
            queue.acquire(shared=1)
1129 b95479a5 Michael Hanselmann
1130 b95479a5 Michael Hanselmann
          op.status = op_status
1131 b95479a5 Michael Hanselmann
          op.result = op_result
1132 b95479a5 Michael Hanselmann
1133 b95479a5 Michael Hanselmann
          assert not waitjob
1134 be760ba8 Michael Hanselmann
1135 47099cd1 Michael Hanselmann
        if op.status == constants.OP_STATUS_WAITING:
1136 26d3fd2f Michael Hanselmann
          # Couldn't get locks in time
1137 26d3fd2f Michael Hanselmann
          assert not op.end_timestamp
1138 be760ba8 Michael Hanselmann
        else:
1139 26d3fd2f Michael Hanselmann
          # Finalize opcode
1140 26d3fd2f Michael Hanselmann
          op.end_timestamp = TimeStampNow()
1141 be760ba8 Michael Hanselmann
1142 26d3fd2f Michael Hanselmann
          if op.status == constants.OP_STATUS_CANCELING:
1143 26d3fd2f Michael Hanselmann
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1144 26d3fd2f Michael Hanselmann
                                  for i in job.ops[opctx.index:])
1145 26d3fd2f Michael Hanselmann
          else:
1146 26d3fd2f Michael Hanselmann
            assert op.status in constants.OPS_FINALIZED
1147 be760ba8 Michael Hanselmann
1148 47099cd1 Michael Hanselmann
      if op.status == constants.OP_STATUS_WAITING or waitjob:
1149 be760ba8 Michael Hanselmann
        finalize = False
1150 be760ba8 Michael Hanselmann
1151 b95479a5 Michael Hanselmann
        if not waitjob and opctx.CheckPriorityIncrease():
1152 5fd6b694 Michael Hanselmann
          # Priority was changed, need to update on-disk file
1153 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1154 be760ba8 Michael Hanselmann
1155 26d3fd2f Michael Hanselmann
        # Keep around for another round
1156 26d3fd2f Michael Hanselmann
        job.cur_opctx = opctx
1157 be760ba8 Michael Hanselmann
1158 26d3fd2f Michael Hanselmann
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1159 26d3fd2f Michael Hanselmann
                op.priority >= constants.OP_PRIO_HIGHEST)
1160 be760ba8 Michael Hanselmann
1161 26d3fd2f Michael Hanselmann
        # In no case must the status be finalized here
1162 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1163 be760ba8 Michael Hanselmann
1164 be760ba8 Michael Hanselmann
      else:
1165 26d3fd2f Michael Hanselmann
        # Ensure all opcodes so far have been successful
1166 26d3fd2f Michael Hanselmann
        assert (opctx.index == 0 or
1167 26d3fd2f Michael Hanselmann
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1168 26d3fd2f Michael Hanselmann
                           for i in job.ops[:opctx.index]))
1169 26d3fd2f Michael Hanselmann
1170 26d3fd2f Michael Hanselmann
        # Reset context
1171 26d3fd2f Michael Hanselmann
        job.cur_opctx = None
1172 26d3fd2f Michael Hanselmann
1173 26d3fd2f Michael Hanselmann
        if op.status == constants.OP_STATUS_SUCCESS:
1174 26d3fd2f Michael Hanselmann
          finalize = False
1175 26d3fd2f Michael Hanselmann
1176 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_ERROR:
1177 26d3fd2f Michael Hanselmann
          # Ensure failed opcode has an exception as its result
1178 26d3fd2f Michael Hanselmann
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1179 26d3fd2f Michael Hanselmann
1180 26d3fd2f Michael Hanselmann
          to_encode = errors.OpExecError("Preceding opcode failed")
1181 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1182 26d3fd2f Michael Hanselmann
                                _EncodeOpError(to_encode))
1183 26d3fd2f Michael Hanselmann
          finalize = True
1184 be760ba8 Michael Hanselmann
1185 26d3fd2f Michael Hanselmann
          # Consistency check
1186 26d3fd2f Michael Hanselmann
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1187 26d3fd2f Michael Hanselmann
                            errors.GetEncodedError(i.result)
1188 26d3fd2f Michael Hanselmann
                            for i in job.ops[opctx.index:])
1189 be760ba8 Michael Hanselmann
1190 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELING:
1191 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1192 26d3fd2f Michael Hanselmann
                                "Job canceled by request")
1193 26d3fd2f Michael Hanselmann
          finalize = True
1194 26d3fd2f Michael Hanselmann
1195 26d3fd2f Michael Hanselmann
        else:
1196 26d3fd2f Michael Hanselmann
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1197 26d3fd2f Michael Hanselmann
1198 66bd7445 Michael Hanselmann
        if opctx.index == (opcount - 1):
1199 66bd7445 Michael Hanselmann
          # Finalize on last opcode
1200 66bd7445 Michael Hanselmann
          finalize = True
1201 66bd7445 Michael Hanselmann
1202 66bd7445 Michael Hanselmann
        if finalize:
1203 26d3fd2f Michael Hanselmann
          # All opcodes have been run, finalize job
1204 66bd7445 Michael Hanselmann
          job.Finalize()
1205 26d3fd2f Michael Hanselmann
1206 26d3fd2f Michael Hanselmann
        # Write to disk. If the job status is final, this is the final write
1207 26d3fd2f Michael Hanselmann
        # allowed. Once the file has been written, it can be archived anytime.
1208 26d3fd2f Michael Hanselmann
        queue.UpdateJobUnlocked(job)
1209 be760ba8 Michael Hanselmann
1210 b95479a5 Michael Hanselmann
        assert not waitjob
1211 b95479a5 Michael Hanselmann
1212 66bd7445 Michael Hanselmann
        if finalize:
1213 26d3fd2f Michael Hanselmann
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1214 75d81fc8 Michael Hanselmann
          return self.FINISHED
1215 be760ba8 Michael Hanselmann
1216 b95479a5 Michael Hanselmann
      assert not waitjob or queue.depmgr.JobWaiting(job)
1217 b95479a5 Michael Hanselmann
1218 75d81fc8 Michael Hanselmann
      if waitjob:
1219 75d81fc8 Michael Hanselmann
        return self.WAITDEP
1220 75d81fc8 Michael Hanselmann
      else:
1221 75d81fc8 Michael Hanselmann
        return self.DEFER
1222 be760ba8 Michael Hanselmann
    finally:
1223 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Job became read-only while being processed"
1224 be760ba8 Michael Hanselmann
      queue.release()
1225 be760ba8 Michael Hanselmann
1226 be760ba8 Michael Hanselmann
1227 df5a5730 Michael Hanselmann
def _EvaluateJobProcessorResult(depmgr, job, result):
1228 df5a5730 Michael Hanselmann
  """Looks at a result from L{_JobProcessor} for a job.
1229 df5a5730 Michael Hanselmann

1230 df5a5730 Michael Hanselmann
  To be used in a L{_JobQueueWorker}.
1231 df5a5730 Michael Hanselmann

1232 df5a5730 Michael Hanselmann
  """
1233 df5a5730 Michael Hanselmann
  if result == _JobProcessor.FINISHED:
1234 df5a5730 Michael Hanselmann
    # Notify waiting jobs
1235 df5a5730 Michael Hanselmann
    depmgr.NotifyWaiters(job.id)
1236 df5a5730 Michael Hanselmann
1237 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.DEFER:
1238 df5a5730 Michael Hanselmann
    # Schedule again
1239 df5a5730 Michael Hanselmann
    raise workerpool.DeferTask(priority=job.CalcPriority())
1240 df5a5730 Michael Hanselmann
1241 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.WAITDEP:
1242 df5a5730 Michael Hanselmann
    # No-op, dependency manager will re-schedule
1243 df5a5730 Michael Hanselmann
    pass
1244 df5a5730 Michael Hanselmann
1245 df5a5730 Michael Hanselmann
  else:
1246 df5a5730 Michael Hanselmann
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1247 df5a5730 Michael Hanselmann
                                 (result, ))
1248 df5a5730 Michael Hanselmann
1249 df5a5730 Michael Hanselmann
1250 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
1251 031a3e57 Michael Hanselmann
  """The actual job workers.
1252 031a3e57 Michael Hanselmann

1253 031a3e57 Michael Hanselmann
  """
1254 b459a848 Andrea Spadaccini
  def RunTask(self, job): # pylint: disable=W0221
1255 e2715f69 Michael Hanselmann
    """Job executor.
1256 e2715f69 Michael Hanselmann

1257 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1258 ea03467c Iustin Pop
    @param job: the job to be processed
1259 ea03467c Iustin Pop

1260 e2715f69 Michael Hanselmann
    """
1261 f8a4adfa Michael Hanselmann
    assert job.writable, "Expected writable job"
1262 f8a4adfa Michael Hanselmann
1263 b95479a5 Michael Hanselmann
    # Ensure only one worker is active on a single job. If a job registers for
1264 b95479a5 Michael Hanselmann
    # a dependency job, and the other job notifies before the first worker is
1265 b95479a5 Michael Hanselmann
    # done, the job can end up in the tasklist more than once.
1266 b95479a5 Michael Hanselmann
    job.processor_lock.acquire()
1267 b95479a5 Michael Hanselmann
    try:
1268 b95479a5 Michael Hanselmann
      return self._RunTaskInner(job)
1269 b95479a5 Michael Hanselmann
    finally:
1270 b95479a5 Michael Hanselmann
      job.processor_lock.release()
1271 b95479a5 Michael Hanselmann
1272 b95479a5 Michael Hanselmann
  def _RunTaskInner(self, job):
1273 b95479a5 Michael Hanselmann
    """Executes a job.
1274 b95479a5 Michael Hanselmann

1275 b95479a5 Michael Hanselmann
    Must be called with per-job lock acquired.
1276 b95479a5 Michael Hanselmann

1277 b95479a5 Michael Hanselmann
    """
1278 be760ba8 Michael Hanselmann
    queue = job.queue
1279 be760ba8 Michael Hanselmann
    assert queue == self.pool.queue
1280 be760ba8 Michael Hanselmann
1281 0aeeb6e3 Michael Hanselmann
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1282 0aeeb6e3 Michael Hanselmann
    setname_fn(None)
1283 daba67c7 Michael Hanselmann
1284 be760ba8 Michael Hanselmann
    proc = mcpu.Processor(queue.context, job.id)
1285 be760ba8 Michael Hanselmann
1286 0aeeb6e3 Michael Hanselmann
    # Create wrapper for setting thread name
1287 0aeeb6e3 Michael Hanselmann
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1288 0aeeb6e3 Michael Hanselmann
                                    proc.ExecOpCode)
1289 0aeeb6e3 Michael Hanselmann
1290 df5a5730 Michael Hanselmann
    _EvaluateJobProcessorResult(queue.depmgr, job,
1291 df5a5730 Michael Hanselmann
                                _JobProcessor(queue, wrap_execop_fn, job)())
1292 75d81fc8 Michael Hanselmann
1293 0aeeb6e3 Michael Hanselmann
  @staticmethod
1294 0aeeb6e3 Michael Hanselmann
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1295 0aeeb6e3 Michael Hanselmann
    """Updates the worker thread name to include a short summary of the opcode.
1296 0aeeb6e3 Michael Hanselmann

1297 0aeeb6e3 Michael Hanselmann
    @param setname_fn: Callable setting worker thread name
1298 0aeeb6e3 Michael Hanselmann
    @param execop_fn: Callable for executing opcode (usually
1299 0aeeb6e3 Michael Hanselmann
                      L{mcpu.Processor.ExecOpCode})
1300 0aeeb6e3 Michael Hanselmann

1301 0aeeb6e3 Michael Hanselmann
    """
1302 0aeeb6e3 Michael Hanselmann
    setname_fn(op)
1303 0aeeb6e3 Michael Hanselmann
    try:
1304 0aeeb6e3 Michael Hanselmann
      return execop_fn(op, *args, **kwargs)
1305 0aeeb6e3 Michael Hanselmann
    finally:
1306 0aeeb6e3 Michael Hanselmann
      setname_fn(None)
1307 0aeeb6e3 Michael Hanselmann
1308 0aeeb6e3 Michael Hanselmann
  @staticmethod
1309 0aeeb6e3 Michael Hanselmann
  def _GetWorkerName(job, op):
1310 0aeeb6e3 Michael Hanselmann
    """Sets the worker thread name.
1311 0aeeb6e3 Michael Hanselmann

1312 0aeeb6e3 Michael Hanselmann
    @type job: L{_QueuedJob}
1313 0aeeb6e3 Michael Hanselmann
    @type op: L{opcodes.OpCode}
1314 0aeeb6e3 Michael Hanselmann

1315 0aeeb6e3 Michael Hanselmann
    """
1316 0aeeb6e3 Michael Hanselmann
    parts = ["Job%s" % job.id]
1317 0aeeb6e3 Michael Hanselmann
1318 0aeeb6e3 Michael Hanselmann
    if op:
1319 0aeeb6e3 Michael Hanselmann
      parts.append(op.TinySummary())
1320 0aeeb6e3 Michael Hanselmann
1321 0aeeb6e3 Michael Hanselmann
    return "/".join(parts)
1322 0aeeb6e3 Michael Hanselmann
1323 e2715f69 Michael Hanselmann
1324 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
1325 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
1326 ea03467c Iustin Pop

1327 ea03467c Iustin Pop
  """
1328 5bdce580 Michael Hanselmann
  def __init__(self, queue):
1329 0aeeb6e3 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("Jq",
1330 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
1331 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
1332 5bdce580 Michael Hanselmann
    self.queue = queue
1333 e2715f69 Michael Hanselmann
1334 e2715f69 Michael Hanselmann
1335 b95479a5 Michael Hanselmann
class _JobDependencyManager:
1336 b95479a5 Michael Hanselmann
  """Keeps track of job dependencies.
1337 b95479a5 Michael Hanselmann

1338 b95479a5 Michael Hanselmann
  """
1339 b95479a5 Michael Hanselmann
  (WAIT,
1340 b95479a5 Michael Hanselmann
   ERROR,
1341 b95479a5 Michael Hanselmann
   CANCEL,
1342 b95479a5 Michael Hanselmann
   CONTINUE,
1343 b95479a5 Michael Hanselmann
   WRONGSTATUS) = range(1, 6)
1344 b95479a5 Michael Hanselmann
1345 b95479a5 Michael Hanselmann
  def __init__(self, getstatus_fn, enqueue_fn):
1346 b95479a5 Michael Hanselmann
    """Initializes this class.
1347 b95479a5 Michael Hanselmann

1348 b95479a5 Michael Hanselmann
    """
1349 b95479a5 Michael Hanselmann
    self._getstatus_fn = getstatus_fn
1350 b95479a5 Michael Hanselmann
    self._enqueue_fn = enqueue_fn
1351 b95479a5 Michael Hanselmann
1352 b95479a5 Michael Hanselmann
    self._waiters = {}
1353 b95479a5 Michael Hanselmann
    self._lock = locking.SharedLock("JobDepMgr")
1354 b95479a5 Michael Hanselmann
1355 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1356 b459a848 Andrea Spadaccini
  def GetLockInfo(self, requested): # pylint: disable=W0613
1357 fcb21ad7 Michael Hanselmann
    """Retrieves information about waiting jobs.
1358 fcb21ad7 Michael Hanselmann

1359 fcb21ad7 Michael Hanselmann
    @type requested: set
1360 fcb21ad7 Michael Hanselmann
    @param requested: Requested information, see C{query.LQ_*}
1361 fcb21ad7 Michael Hanselmann

1362 fcb21ad7 Michael Hanselmann
    """
1363 fcb21ad7 Michael Hanselmann
    # No need to sort here, that's being done by the lock manager and query
1364 fcb21ad7 Michael Hanselmann
    # library. There are no priorities for notifying jobs, hence all show up as
1365 fcb21ad7 Michael Hanselmann
    # one item under "pending".
1366 fcb21ad7 Michael Hanselmann
    return [("job/%s" % job_id, None, None,
1367 fcb21ad7 Michael Hanselmann
             [("job", [job.id for job in waiters])])
1368 fcb21ad7 Michael Hanselmann
            for job_id, waiters in self._waiters.items()
1369 fcb21ad7 Michael Hanselmann
            if waiters]
1370 fcb21ad7 Michael Hanselmann
1371 fcb21ad7 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1372 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
1373 b95479a5 Michael Hanselmann
    """Checks if a job is waiting.
1374 b95479a5 Michael Hanselmann

1375 b95479a5 Michael Hanselmann
    """
1376 b95479a5 Michael Hanselmann
    return compat.any(job in jobs
1377 b95479a5 Michael Hanselmann
                      for jobs in self._waiters.values())
1378 b95479a5 Michael Hanselmann
1379 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1380 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1381 b95479a5 Michael Hanselmann
    """Checks if a dependency job has the requested status.
1382 b95479a5 Michael Hanselmann

1383 b95479a5 Michael Hanselmann
    If the other job is not yet in a finalized status, the calling job will be
1384 b95479a5 Michael Hanselmann
    notified (re-added to the workerpool) at a later point.
1385 b95479a5 Michael Hanselmann

1386 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1387 b95479a5 Michael Hanselmann
    @param job: Job object
1388 b95479a5 Michael Hanselmann
    @type dep_job_id: string
1389 b95479a5 Michael Hanselmann
    @param dep_job_id: ID of dependency job
1390 b95479a5 Michael Hanselmann
    @type dep_status: list
1391 b95479a5 Michael Hanselmann
    @param dep_status: Required status
1392 b95479a5 Michael Hanselmann

1393 b95479a5 Michael Hanselmann
    """
1394 b95479a5 Michael Hanselmann
    assert ht.TString(job.id)
1395 b95479a5 Michael Hanselmann
    assert ht.TString(dep_job_id)
1396 b95479a5 Michael Hanselmann
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1397 b95479a5 Michael Hanselmann
1398 b95479a5 Michael Hanselmann
    if job.id == dep_job_id:
1399 b95479a5 Michael Hanselmann
      return (self.ERROR, "Job can't depend on itself")
1400 b95479a5 Michael Hanselmann
1401 b95479a5 Michael Hanselmann
    # Get status of dependency job
1402 b95479a5 Michael Hanselmann
    try:
1403 b95479a5 Michael Hanselmann
      status = self._getstatus_fn(dep_job_id)
1404 b95479a5 Michael Hanselmann
    except errors.JobLost, err:
1405 b95479a5 Michael Hanselmann
      return (self.ERROR, "Dependency error: %s" % err)
1406 b95479a5 Michael Hanselmann
1407 b95479a5 Michael Hanselmann
    assert status in constants.JOB_STATUS_ALL
1408 b95479a5 Michael Hanselmann
1409 b95479a5 Michael Hanselmann
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1410 b95479a5 Michael Hanselmann
1411 b95479a5 Michael Hanselmann
    if status not in constants.JOBS_FINALIZED:
1412 b95479a5 Michael Hanselmann
      # Register for notification and wait for job to finish
1413 b95479a5 Michael Hanselmann
      job_id_waiters.add(job)
1414 b95479a5 Michael Hanselmann
      return (self.WAIT,
1415 b95479a5 Michael Hanselmann
              "Need to wait for job %s, wanted status '%s'" %
1416 b95479a5 Michael Hanselmann
              (dep_job_id, dep_status))
1417 b95479a5 Michael Hanselmann
1418 b95479a5 Michael Hanselmann
    # Remove from waiters list
1419 b95479a5 Michael Hanselmann
    if job in job_id_waiters:
1420 b95479a5 Michael Hanselmann
      job_id_waiters.remove(job)
1421 b95479a5 Michael Hanselmann
1422 b95479a5 Michael Hanselmann
    if (status == constants.JOB_STATUS_CANCELED and
1423 b95479a5 Michael Hanselmann
        constants.JOB_STATUS_CANCELED not in dep_status):
1424 b95479a5 Michael Hanselmann
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1425 b95479a5 Michael Hanselmann
1426 b95479a5 Michael Hanselmann
    elif not dep_status or status in dep_status:
1427 b95479a5 Michael Hanselmann
      return (self.CONTINUE,
1428 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s'" %
1429 b95479a5 Michael Hanselmann
              (dep_job_id, status))
1430 b95479a5 Michael Hanselmann
1431 b95479a5 Michael Hanselmann
    else:
1432 b95479a5 Michael Hanselmann
      return (self.WRONGSTATUS,
1433 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s',"
1434 b95479a5 Michael Hanselmann
              " not one of '%s' as required" %
1435 b95479a5 Michael Hanselmann
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1436 b95479a5 Michael Hanselmann
1437 37d76f1e Michael Hanselmann
  def _RemoveEmptyWaitersUnlocked(self):
1438 37d76f1e Michael Hanselmann
    """Remove all jobs without actual waiters.
1439 37d76f1e Michael Hanselmann

1440 37d76f1e Michael Hanselmann
    """
1441 37d76f1e Michael Hanselmann
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1442 37d76f1e Michael Hanselmann
                   if not waiters]:
1443 37d76f1e Michael Hanselmann
      del self._waiters[job_id]
1444 37d76f1e Michael Hanselmann
1445 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
1446 b95479a5 Michael Hanselmann
    """Notifies all jobs waiting for a certain job ID.
1447 b95479a5 Michael Hanselmann

1448 1316ebc2 Michael Hanselmann
    @attention: Do not call until L{CheckAndRegister} returned a status other
1449 1316ebc2 Michael Hanselmann
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1450 b95479a5 Michael Hanselmann
    @type job_id: string
1451 b95479a5 Michael Hanselmann
    @param job_id: Job ID
1452 b95479a5 Michael Hanselmann

1453 b95479a5 Michael Hanselmann
    """
1454 b95479a5 Michael Hanselmann
    assert ht.TString(job_id)
1455 b95479a5 Michael Hanselmann
1456 37d76f1e Michael Hanselmann
    self._lock.acquire()
1457 37d76f1e Michael Hanselmann
    try:
1458 37d76f1e Michael Hanselmann
      self._RemoveEmptyWaitersUnlocked()
1459 37d76f1e Michael Hanselmann
1460 37d76f1e Michael Hanselmann
      jobs = self._waiters.pop(job_id, None)
1461 37d76f1e Michael Hanselmann
    finally:
1462 37d76f1e Michael Hanselmann
      self._lock.release()
1463 37d76f1e Michael Hanselmann
1464 b95479a5 Michael Hanselmann
    if jobs:
1465 b95479a5 Michael Hanselmann
      # Re-add jobs to workerpool
1466 b95479a5 Michael Hanselmann
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1467 b95479a5 Michael Hanselmann
                    len(jobs), job_id)
1468 b95479a5 Michael Hanselmann
      self._enqueue_fn(jobs)
1469 b95479a5 Michael Hanselmann
1470 b95479a5 Michael Hanselmann
1471 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
1472 6c881c52 Iustin Pop
  """Decorator for "public" functions.
1473 ea03467c Iustin Pop

1474 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
1475 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
1476 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
1477 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
1478 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
1479 ea03467c Iustin Pop

1480 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
1481 f1da30e6 Michael Hanselmann

1482 6c881c52 Iustin Pop
  Example::
1483 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
1484 6c881c52 Iustin Pop
    @_RequireOpenQueue
1485 6c881c52 Iustin Pop
    def Example(self):
1486 6c881c52 Iustin Pop
      pass
1487 db37da70 Michael Hanselmann

1488 6c881c52 Iustin Pop
  """
1489 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
1490 b459a848 Andrea Spadaccini
    # pylint: disable=W0212
1491 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
1492 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
1493 6c881c52 Iustin Pop
  return wrapper
1494 db37da70 Michael Hanselmann
1495 db37da70 Michael Hanselmann
1496 c8d0be94 Michael Hanselmann
def _RequireNonDrainedQueue(fn):
1497 c8d0be94 Michael Hanselmann
  """Decorator checking for a non-drained queue.
1498 c8d0be94 Michael Hanselmann

1499 c8d0be94 Michael Hanselmann
  To be used with functions submitting new jobs.
1500 c8d0be94 Michael Hanselmann

1501 c8d0be94 Michael Hanselmann
  """
1502 c8d0be94 Michael Hanselmann
  def wrapper(self, *args, **kwargs):
1503 c8d0be94 Michael Hanselmann
    """Wrapper function.
1504 c8d0be94 Michael Hanselmann

1505 c8d0be94 Michael Hanselmann
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1506 c8d0be94 Michael Hanselmann

1507 c8d0be94 Michael Hanselmann
    """
1508 c8d0be94 Michael Hanselmann
    # Ok when sharing the big job queue lock, as the drain file is created when
1509 c8d0be94 Michael Hanselmann
    # the lock is exclusive.
1510 c8d0be94 Michael Hanselmann
    # Needs access to protected member, pylint: disable=W0212
1511 c8d0be94 Michael Hanselmann
    if self._drained:
1512 c8d0be94 Michael Hanselmann
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1513 6d5ea385 Michael Hanselmann
1514 6d5ea385 Michael Hanselmann
    if not self._accepting_jobs:
1515 6d5ea385 Michael Hanselmann
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1516 6d5ea385 Michael Hanselmann
1517 c8d0be94 Michael Hanselmann
    return fn(self, *args, **kwargs)
1518 c8d0be94 Michael Hanselmann
  return wrapper
1519 c8d0be94 Michael Hanselmann
1520 c8d0be94 Michael Hanselmann
1521 6c881c52 Iustin Pop
class JobQueue(object):
1522 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
1523 db37da70 Michael Hanselmann

1524 6c881c52 Iustin Pop
  """
1525 85f03e0d Michael Hanselmann
  def __init__(self, context):
1526 ea03467c Iustin Pop
    """Constructor for JobQueue.
1527 ea03467c Iustin Pop

1528 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
1529 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
1530 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
1531 ea03467c Iustin Pop
    running).
1532 ea03467c Iustin Pop

1533 ea03467c Iustin Pop
    @type context: GanetiContext
1534 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
1535 ea03467c Iustin Pop
        data and other ganeti objects
1536 ea03467c Iustin Pop

1537 ea03467c Iustin Pop
    """
1538 5bdce580 Michael Hanselmann
    self.context = context
1539 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
1540 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
1541 f1da30e6 Michael Hanselmann
1542 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
1543 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
1544 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
1545 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
1546 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
1547 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
1548 ebb80afa Guido Trotter
1549 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
1550 ebb80afa Guido Trotter
    self.release = self._lock.release
1551 85f03e0d Michael Hanselmann
1552 6d5ea385 Michael Hanselmann
    # Accept jobs by default
1553 6d5ea385 Michael Hanselmann
    self._accepting_jobs = True
1554 6d5ea385 Michael Hanselmann
1555 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
1556 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
1557 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1558 f1da30e6 Michael Hanselmann
1559 04ab05ce Michael Hanselmann
    # Read serial file
1560 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
1561 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
1562 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
1563 c4beba1c Iustin Pop
1564 23752136 Michael Hanselmann
    # Get initial list of nodes
1565 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
1566 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
1567 59303563 Iustin Pop
                       if n.master_candidate)
1568 8e00939c Michael Hanselmann
1569 8e00939c Michael Hanselmann
    # Remove master node
1570 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
1571 23752136 Michael Hanselmann
1572 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
1573 23752136 Michael Hanselmann
1574 6d5ea385 Michael Hanselmann
    self._queue_size = None
1575 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1576 6d5ea385 Michael Hanselmann
    assert ht.TInt(self._queue_size)
1577 ff699aa9 Michael Hanselmann
    self._drained = jstore.CheckDrainFlag()
1578 20571a26 Guido Trotter
1579 b95479a5 Michael Hanselmann
    # Job dependencies
1580 b95479a5 Michael Hanselmann
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1581 b95479a5 Michael Hanselmann
                                        self._EnqueueJobs)
1582 fcb21ad7 Michael Hanselmann
    self.context.glm.AddToLockMonitor(self.depmgr)
1583 b95479a5 Michael Hanselmann
1584 85f03e0d Michael Hanselmann
    # Setup worker pool
1585 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
1586 85f03e0d Michael Hanselmann
    try:
1587 de9d02c7 Michael Hanselmann
      self._InspectQueue()
1588 de9d02c7 Michael Hanselmann
    except:
1589 de9d02c7 Michael Hanselmann
      self._wpool.TerminateWorkers()
1590 de9d02c7 Michael Hanselmann
      raise
1591 711b5124 Michael Hanselmann
1592 de9d02c7 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1593 de9d02c7 Michael Hanselmann
  @_RequireOpenQueue
1594 de9d02c7 Michael Hanselmann
  def _InspectQueue(self):
1595 de9d02c7 Michael Hanselmann
    """Loads the whole job queue and resumes unfinished jobs.
1596 de9d02c7 Michael Hanselmann

1597 de9d02c7 Michael Hanselmann
    This function needs the lock here because WorkerPool.AddTask() may start a
1598 de9d02c7 Michael Hanselmann
    job while we're still doing our work.
1599 711b5124 Michael Hanselmann

1600 de9d02c7 Michael Hanselmann
    """
1601 de9d02c7 Michael Hanselmann
    logging.info("Inspecting job queue")
1602 de9d02c7 Michael Hanselmann
1603 7b5c4a69 Michael Hanselmann
    restartjobs = []
1604 7b5c4a69 Michael Hanselmann
1605 de9d02c7 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked()
1606 de9d02c7 Michael Hanselmann
    jobs_count = len(all_job_ids)
1607 de9d02c7 Michael Hanselmann
    lastinfo = time.time()
1608 de9d02c7 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1609 de9d02c7 Michael Hanselmann
      # Give an update every 1000 jobs or 10 seconds
1610 de9d02c7 Michael Hanselmann
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1611 de9d02c7 Michael Hanselmann
          idx == (jobs_count - 1)):
1612 de9d02c7 Michael Hanselmann
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1613 de9d02c7 Michael Hanselmann
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1614 711b5124 Michael Hanselmann
        lastinfo = time.time()
1615 94ed59a5 Iustin Pop
1616 de9d02c7 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1617 85f03e0d Michael Hanselmann
1618 de9d02c7 Michael Hanselmann
      # a failure in loading the job can cause 'None' to be returned
1619 de9d02c7 Michael Hanselmann
      if job is None:
1620 de9d02c7 Michael Hanselmann
        continue
1621 85f03e0d Michael Hanselmann
1622 de9d02c7 Michael Hanselmann
      status = job.CalcStatus()
1623 711b5124 Michael Hanselmann
1624 320d1daf Michael Hanselmann
      if status == constants.JOB_STATUS_QUEUED:
1625 7b5c4a69 Michael Hanselmann
        restartjobs.append(job)
1626 de9d02c7 Michael Hanselmann
1627 de9d02c7 Michael Hanselmann
      elif status in (constants.JOB_STATUS_RUNNING,
1628 47099cd1 Michael Hanselmann
                      constants.JOB_STATUS_WAITING,
1629 de9d02c7 Michael Hanselmann
                      constants.JOB_STATUS_CANCELING):
1630 de9d02c7 Michael Hanselmann
        logging.warning("Unfinished job %s found: %s", job.id, job)
1631 320d1daf Michael Hanselmann
1632 47099cd1 Michael Hanselmann
        if status == constants.JOB_STATUS_WAITING:
1633 320d1daf Michael Hanselmann
          # Restart job
1634 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1635 320d1daf Michael Hanselmann
          restartjobs.append(job)
1636 320d1daf Michael Hanselmann
        else:
1637 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1638 320d1daf Michael Hanselmann
                                "Unclean master daemon shutdown")
1639 45df0793 Michael Hanselmann
          job.Finalize()
1640 320d1daf Michael Hanselmann
1641 de9d02c7 Michael Hanselmann
        self.UpdateJobUnlocked(job)
1642 de9d02c7 Michael Hanselmann
1643 7b5c4a69 Michael Hanselmann
    if restartjobs:
1644 7b5c4a69 Michael Hanselmann
      logging.info("Restarting %s jobs", len(restartjobs))
1645 75d81fc8 Michael Hanselmann
      self._EnqueueJobsUnlocked(restartjobs)
1646 7b5c4a69 Michael Hanselmann
1647 de9d02c7 Michael Hanselmann
    logging.info("Job queue inspection finished")
1648 85f03e0d Michael Hanselmann
1649 fb1ffbca Michael Hanselmann
  def _GetRpc(self, address_list):
1650 fb1ffbca Michael Hanselmann
    """Gets RPC runner with context.
1651 fb1ffbca Michael Hanselmann

1652 fb1ffbca Michael Hanselmann
    """
1653 fb1ffbca Michael Hanselmann
    return rpc.JobQueueRunner(self.context, address_list)
1654 fb1ffbca Michael Hanselmann
1655 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1656 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1657 99aabbed Iustin Pop
  def AddNode(self, node):
1658 99aabbed Iustin Pop
    """Register a new node with the queue.
1659 99aabbed Iustin Pop

1660 99aabbed Iustin Pop
    @type node: L{objects.Node}
1661 99aabbed Iustin Pop
    @param node: the node object to be added
1662 99aabbed Iustin Pop

1663 99aabbed Iustin Pop
    """
1664 99aabbed Iustin Pop
    node_name = node.name
1665 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1666 23752136 Michael Hanselmann
1667 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1668 fb1ffbca Michael Hanselmann
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1669 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1670 c8457ce7 Iustin Pop
    if msg:
1671 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1672 c8457ce7 Iustin Pop
                      node_name, msg)
1673 23752136 Michael Hanselmann
1674 59303563 Iustin Pop
    if not node.master_candidate:
1675 59303563 Iustin Pop
      # remove if existing, ignoring errors
1676 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1677 59303563 Iustin Pop
      # and skip the replication of the job ids
1678 59303563 Iustin Pop
      return
1679 59303563 Iustin Pop
1680 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1681 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1682 23752136 Michael Hanselmann
1683 d2e03a33 Michael Hanselmann
    # Upload current serial file
1684 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1685 d2e03a33 Michael Hanselmann
1686 fb1ffbca Michael Hanselmann
    # Static address list
1687 fb1ffbca Michael Hanselmann
    addrs = [node.primary_ip]
1688 fb1ffbca Michael Hanselmann
1689 d2e03a33 Michael Hanselmann
    for file_name in files:
1690 9f774ee8 Michael Hanselmann
      # Read file content
1691 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1692 9f774ee8 Michael Hanselmann
1693 fb1ffbca Michael Hanselmann
      result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1694 fb1ffbca Michael Hanselmann
                                                        content)
1695 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1696 c8457ce7 Iustin Pop
      if msg:
1697 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1698 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1699 d2e03a33 Michael Hanselmann
1700 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1701 d2e03a33 Michael Hanselmann
1702 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1703 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1704 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1705 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1706 ea03467c Iustin Pop

1707 ea03467c Iustin Pop
    @type node_name: str
1708 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1709 ea03467c Iustin Pop

1710 ea03467c Iustin Pop
    """
1711 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1712 23752136 Michael Hanselmann
1713 7e950d31 Iustin Pop
  @staticmethod
1714 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1715 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1716 ea03467c Iustin Pop

1717 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1718 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1719 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1720 ea03467c Iustin Pop

1721 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1722 ea03467c Iustin Pop
    @type nodes: list
1723 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1724 ea03467c Iustin Pop
    @type failmsg: str
1725 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1726 ea03467c Iustin Pop

1727 ea03467c Iustin Pop
    """
1728 e74798c1 Michael Hanselmann
    failed = []
1729 e74798c1 Michael Hanselmann
    success = []
1730 e74798c1 Michael Hanselmann
1731 e74798c1 Michael Hanselmann
    for node in nodes:
1732 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1733 c8457ce7 Iustin Pop
      if msg:
1734 e74798c1 Michael Hanselmann
        failed.append(node)
1735 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1736 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1737 c8457ce7 Iustin Pop
      else:
1738 c8457ce7 Iustin Pop
        success.append(node)
1739 e74798c1 Michael Hanselmann
1740 e74798c1 Michael Hanselmann
    # +1 for the master node
1741 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1742 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1743 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1744 e74798c1 Michael Hanselmann
1745 99aabbed Iustin Pop
  def _GetNodeIp(self):
1746 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1747 99aabbed Iustin Pop

1748 ea03467c Iustin Pop
    @rtype: (list, list)
1749 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1750 ea03467c Iustin Pop
        names and the second one with the node addresses
1751 ea03467c Iustin Pop

1752 99aabbed Iustin Pop
    """
1753 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1754 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1755 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1756 99aabbed Iustin Pop
    return name_list, addr_list
1757 99aabbed Iustin Pop
1758 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1759 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1760 8e00939c Michael Hanselmann

1761 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1762 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1763 ea03467c Iustin Pop

1764 ea03467c Iustin Pop
    @type file_name: str
1765 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1766 ea03467c Iustin Pop
    @type data: str
1767 ea03467c Iustin Pop
    @param data: the new contents of the file
1768 4c36bdf5 Guido Trotter
    @type replicate: boolean
1769 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1770 ea03467c Iustin Pop

1771 8e00939c Michael Hanselmann
    """
1772 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1773 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1774 82b22e19 René Nussbaumer
                    gid=getents.masterd_gid)
1775 8e00939c Michael Hanselmann
1776 4c36bdf5 Guido Trotter
    if replicate:
1777 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1778 fb1ffbca Michael Hanselmann
      result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1779 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1780 23752136 Michael Hanselmann
1781 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1782 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1783 ea03467c Iustin Pop

1784 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1785 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1786 ea03467c Iustin Pop

1787 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1788 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1789 ea03467c Iustin Pop

1790 ea03467c Iustin Pop
    """
1791 dd875d32 Michael Hanselmann
    # Rename them locally
1792 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1793 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1794 abc1f2ce Michael Hanselmann
1795 dd875d32 Michael Hanselmann
    # ... and on all nodes
1796 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1797 fb1ffbca Michael Hanselmann
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1798 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1799 abc1f2ce Michael Hanselmann
1800 7e950d31 Iustin Pop
  @staticmethod
1801 7e950d31 Iustin Pop
  def _FormatJobID(job_id):
1802 ea03467c Iustin Pop
    """Convert a job ID to string format.
1803 ea03467c Iustin Pop

1804 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
1805 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
1806 ea03467c Iustin Pop
    abstract this change.
1807 ea03467c Iustin Pop

1808 ea03467c Iustin Pop
    @type job_id: int or long
1809 ea03467c Iustin Pop
    @param job_id: the numeric job id
1810 ea03467c Iustin Pop
    @rtype: str
1811 ea03467c Iustin Pop
    @return: the formatted job id
1812 ea03467c Iustin Pop

1813 ea03467c Iustin Pop
    """
1814 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
1815 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1816 85f03e0d Michael Hanselmann
    if job_id < 0:
1817 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1818 85f03e0d Michael Hanselmann
1819 85f03e0d Michael Hanselmann
    return str(job_id)
1820 85f03e0d Michael Hanselmann
1821 58b22b6e Michael Hanselmann
  @classmethod
1822 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
1823 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
1824 58b22b6e Michael Hanselmann

1825 58b22b6e Michael Hanselmann
    @type job_id: str
1826 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
1827 58b22b6e Michael Hanselmann
    @rtype: str
1828 58b22b6e Michael Hanselmann
    @return: Directory name
1829 58b22b6e Michael Hanselmann

1830 58b22b6e Michael Hanselmann
    """
1831 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1832 58b22b6e Michael Hanselmann
1833 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1834 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1835 f1da30e6 Michael Hanselmann

1836 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1837 f1da30e6 Michael Hanselmann

1838 009e73d0 Iustin Pop
    @type count: integer
1839 009e73d0 Iustin Pop
    @param count: how many serials to return
1840 ea03467c Iustin Pop
    @rtype: str
1841 ea03467c Iustin Pop
    @return: a string representing the job identifier.
1842 f1da30e6 Michael Hanselmann

1843 f1da30e6 Michael Hanselmann
    """
1844 719f8fba Michael Hanselmann
    assert ht.TPositiveInt(count)
1845 719f8fba Michael Hanselmann
1846 f1da30e6 Michael Hanselmann
    # New number
1847 009e73d0 Iustin Pop
    serial = self._last_serial + count
1848 f1da30e6 Michael Hanselmann
1849 f1da30e6 Michael Hanselmann
    # Write to file
1850 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1851 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1852 f1da30e6 Michael Hanselmann
1853 009e73d0 Iustin Pop
    result = [self._FormatJobID(v)
1854 3c88bf36 Michael Hanselmann
              for v in range(self._last_serial + 1, serial + 1)]
1855 3c88bf36 Michael Hanselmann
1856 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1857 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1858 f1da30e6 Michael Hanselmann
1859 3c88bf36 Michael Hanselmann
    assert len(result) == count
1860 3c88bf36 Michael Hanselmann
1861 009e73d0 Iustin Pop
    return result
1862 f1da30e6 Michael Hanselmann
1863 85f03e0d Michael Hanselmann
  @staticmethod
1864 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1865 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1866 ea03467c Iustin Pop

1867 ea03467c Iustin Pop
    @type job_id: str
1868 ea03467c Iustin Pop
    @param job_id: the job identifier
1869 ea03467c Iustin Pop
    @rtype: str
1870 ea03467c Iustin Pop
    @return: the path to the job file
1871 ea03467c Iustin Pop

1872 ea03467c Iustin Pop
    """
1873 c4feafe8 Iustin Pop
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1874 f1da30e6 Michael Hanselmann
1875 58b22b6e Michael Hanselmann
  @classmethod
1876 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
1877 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1878 ea03467c Iustin Pop

1879 ea03467c Iustin Pop
    @type job_id: str
1880 ea03467c Iustin Pop
    @param job_id: the job identifier
1881 ea03467c Iustin Pop
    @rtype: str
1882 ea03467c Iustin Pop
    @return: the path to the archived job file
1883 ea03467c Iustin Pop

1884 ea03467c Iustin Pop
    """
1885 0411c011 Iustin Pop
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1886 0411c011 Iustin Pop
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1887 0cb94105 Michael Hanselmann
1888 cb66225d Michael Hanselmann
  @staticmethod
1889 cb66225d Michael Hanselmann
  def _GetJobIDsUnlocked(sort=True):
1890 911a495b Iustin Pop
    """Return all known job IDs.
1891 911a495b Iustin Pop

1892 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1893 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1894 ac0930b9 Iustin Pop
    extra IDs).
1895 ac0930b9 Iustin Pop

1896 85a1c57d Guido Trotter
    @type sort: boolean
1897 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1898 ea03467c Iustin Pop
    @rtype: list
1899 ea03467c Iustin Pop
    @return: the list of job IDs
1900 ea03467c Iustin Pop

1901 911a495b Iustin Pop
    """
1902 85a1c57d Guido Trotter
    jlist = []
1903 b5b8309d Guido Trotter
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1904 cb66225d Michael Hanselmann
      m = constants.JOB_FILE_RE.match(filename)
1905 85a1c57d Guido Trotter
      if m:
1906 85a1c57d Guido Trotter
        jlist.append(m.group(1))
1907 85a1c57d Guido Trotter
    if sort:
1908 85a1c57d Guido Trotter
      jlist = utils.NiceSort(jlist)
1909 f0d874fe Iustin Pop
    return jlist
1910 911a495b Iustin Pop
1911 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1912 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1913 ea03467c Iustin Pop

1914 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1915 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1916 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1917 ea03467c Iustin Pop

1918 ea03467c Iustin Pop
    @param job_id: the job id
1919 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1920 ea03467c Iustin Pop
    @return: either None or the job object
1921 ea03467c Iustin Pop

1922 ea03467c Iustin Pop
    """
1923 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
1924 5685c1a5 Michael Hanselmann
    if job:
1925 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
1926 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Found read-only job in memcache"
1927 5685c1a5 Michael Hanselmann
      return job
1928 ac0930b9 Iustin Pop
1929 3d6c5566 Guido Trotter
    try:
1930 194c8ca4 Michael Hanselmann
      job = self._LoadJobFromDisk(job_id, False)
1931 aa9f8167 Iustin Pop
      if job is None:
1932 aa9f8167 Iustin Pop
        return job
1933 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
1934 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
1935 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
1936 3d6c5566 Guido Trotter
      if old_path == new_path:
1937 3d6c5566 Guido Trotter
        # job already archived (future case)
1938 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
1939 3d6c5566 Guido Trotter
      else:
1940 3d6c5566 Guido Trotter
        # non-archived case
1941 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
1942 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
1943 3d6c5566 Guido Trotter
      return None
1944 162c8636 Guido Trotter
1945 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Job just loaded is not writable"
1946 c0f6d0d8 Michael Hanselmann
1947 162c8636 Guido Trotter
    self._memcache[job_id] = job
1948 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
1949 162c8636 Guido Trotter
    return job
1950 162c8636 Guido Trotter
1951 c0f6d0d8 Michael Hanselmann
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1952 162c8636 Guido Trotter
    """Load the given job file from disk.
1953 162c8636 Guido Trotter

1954 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1955 162c8636 Guido Trotter

1956 162c8636 Guido Trotter
    @type job_id: string
1957 162c8636 Guido Trotter
    @param job_id: job identifier
1958 194c8ca4 Michael Hanselmann
    @type try_archived: bool
1959 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
1960 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
1961 162c8636 Guido Trotter
    @return: either None or the job object
1962 162c8636 Guido Trotter

1963 162c8636 Guido Trotter
    """
1964 c0f6d0d8 Michael Hanselmann
    path_functions = [(self._GetJobPath, True)]
1965 194c8ca4 Michael Hanselmann
1966 194c8ca4 Michael Hanselmann
    if try_archived:
1967 c0f6d0d8 Michael Hanselmann
      path_functions.append((self._GetArchivedJobPath, False))
1968 194c8ca4 Michael Hanselmann
1969 194c8ca4 Michael Hanselmann
    raw_data = None
1970 c0f6d0d8 Michael Hanselmann
    writable_default = None
1971 194c8ca4 Michael Hanselmann
1972 c0f6d0d8 Michael Hanselmann
    for (fn, writable_default) in path_functions:
1973 194c8ca4 Michael Hanselmann
      filepath = fn(job_id)
1974 194c8ca4 Michael Hanselmann
      logging.debug("Loading job from %s", filepath)
1975 194c8ca4 Michael Hanselmann
      try:
1976 194c8ca4 Michael Hanselmann
        raw_data = utils.ReadFile(filepath)
1977 194c8ca4 Michael Hanselmann
      except EnvironmentError, err:
1978 194c8ca4 Michael Hanselmann
        if err.errno != errno.ENOENT:
1979 194c8ca4 Michael Hanselmann
          raise
1980 194c8ca4 Michael Hanselmann
      else:
1981 194c8ca4 Michael Hanselmann
        break
1982 194c8ca4 Michael Hanselmann
1983 194c8ca4 Michael Hanselmann
    if not raw_data:
1984 194c8ca4 Michael Hanselmann
      return None
1985 13998ef2 Michael Hanselmann
1986 c0f6d0d8 Michael Hanselmann
    if writable is None:
1987 c0f6d0d8 Michael Hanselmann
      writable = writable_default
1988 c0f6d0d8 Michael Hanselmann
1989 94ed59a5 Iustin Pop
    try:
1990 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
1991 c0f6d0d8 Michael Hanselmann
      job = _QueuedJob.Restore(self, data, writable)
1992 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1993 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
1994 94ed59a5 Iustin Pop
1995 ac0930b9 Iustin Pop
    return job
1996 f1da30e6 Michael Hanselmann
1997 c0f6d0d8 Michael Hanselmann
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1998 0f9c08dc Guido Trotter
    """Load the given job file from disk.
1999 0f9c08dc Guido Trotter

2000 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2001 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
2002 0f9c08dc Guido Trotter
    exception is logged.
2003 0f9c08dc Guido Trotter

2004 0f9c08dc Guido Trotter
    @type job_id: string
2005 0f9c08dc Guido Trotter
    @param job_id: job identifier
2006 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2007 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2008 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
2009 0f9c08dc Guido Trotter
    @return: either None or the job object
2010 0f9c08dc Guido Trotter

2011 0f9c08dc Guido Trotter
    """
2012 0f9c08dc Guido Trotter
    try:
2013 c0f6d0d8 Michael Hanselmann
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2014 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
2015 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
2016 0f9c08dc Guido Trotter
      return None
2017 0f9c08dc Guido Trotter
2018 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
2019 20571a26 Guido Trotter
    """Update the queue size.
2020 20571a26 Guido Trotter

2021 20571a26 Guido Trotter
    """
2022 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2023 20571a26 Guido Trotter
2024 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2025 20571a26 Guido Trotter
  @_RequireOpenQueue
2026 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
2027 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
2028 3ccafd0e Iustin Pop

2029 ea03467c Iustin Pop
    @type drain_flag: boolean
2030 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
2031 ea03467c Iustin Pop

2032 3ccafd0e Iustin Pop
    """
2033 ff699aa9 Michael Hanselmann
    jstore.SetDrainFlag(drain_flag)
2034 20571a26 Guido Trotter
2035 20571a26 Guido Trotter
    self._drained = drain_flag
2036 20571a26 Guido Trotter
2037 3ccafd0e Iustin Pop
    return True
2038 3ccafd0e Iustin Pop
2039 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2040 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
2041 85f03e0d Michael Hanselmann
    """Create and store a new job.
2042 f1da30e6 Michael Hanselmann

2043 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
2044 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
2045 c3f0a12f Iustin Pop

2046 009e73d0 Iustin Pop
    @type job_id: job ID
2047 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
2048 c3f0a12f Iustin Pop
    @type ops: list
2049 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
2050 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
2051 7beb1e53 Guido Trotter
    @return: the job object to be queued
2052 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2053 e71c8147 Michael Hanselmann
    @raise errors.GenericError: If an opcode is not valid
2054 c3f0a12f Iustin Pop

2055 c3f0a12f Iustin Pop
    """
2056 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2057 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
2058 f87b405e Michael Hanselmann
2059 c0f6d0d8 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops, True)
2060 f1da30e6 Michael Hanselmann
2061 e71c8147 Michael Hanselmann
    # Check priority
2062 e71c8147 Michael Hanselmann
    for idx, op in enumerate(job.ops):
2063 e71c8147 Michael Hanselmann
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2064 e71c8147 Michael Hanselmann
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2065 e71c8147 Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2066 e71c8147 Michael Hanselmann
                                  " are %s" % (idx, op.priority, allowed))
2067 e71c8147 Michael Hanselmann
2068 b247c6fc Michael Hanselmann
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2069 b247c6fc Michael Hanselmann
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2070 b247c6fc Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2071 b247c6fc Michael Hanselmann
                                  " match %s: %s" %
2072 b247c6fc Michael Hanselmann
                                  (idx, opcodes.TNoRelativeJobDependencies,
2073 b247c6fc Michael Hanselmann
                                   dependencies))
2074 b247c6fc Michael Hanselmann
2075 f1da30e6 Michael Hanselmann
    # Write to disk
2076 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
2077 f1da30e6 Michael Hanselmann
2078 20571a26 Guido Trotter
    self._queue_size += 1
2079 20571a26 Guido Trotter
2080 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
2081 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
2082 ac0930b9 Iustin Pop
2083 7beb1e53 Guido Trotter
    return job
2084 f1da30e6 Michael Hanselmann
2085 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2086 2971c913 Iustin Pop
  @_RequireOpenQueue
2087 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2088 2971c913 Iustin Pop
  def SubmitJob(self, ops):
2089 2971c913 Iustin Pop
    """Create and store a new job.
2090 2971c913 Iustin Pop

2091 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2092 2971c913 Iustin Pop

2093 2971c913 Iustin Pop
    """
2094 b247c6fc Michael Hanselmann
    (job_id, ) = self._NewSerialsUnlocked(1)
2095 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2096 7beb1e53 Guido Trotter
    return job_id
2097 2971c913 Iustin Pop
2098 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2099 2971c913 Iustin Pop
  @_RequireOpenQueue
2100 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2101 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
2102 2971c913 Iustin Pop
    """Create and store multiple jobs.
2103 2971c913 Iustin Pop

2104 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2105 2971c913 Iustin Pop

2106 2971c913 Iustin Pop
    """
2107 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2108 b247c6fc Michael Hanselmann
2109 b247c6fc Michael Hanselmann
    (results, added_jobs) = \
2110 b247c6fc Michael Hanselmann
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2111 7b5c4a69 Michael Hanselmann
2112 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked(added_jobs)
2113 2971c913 Iustin Pop
2114 2971c913 Iustin Pop
    return results
2115 2971c913 Iustin Pop
2116 b247c6fc Michael Hanselmann
  @staticmethod
2117 b247c6fc Michael Hanselmann
  def _FormatSubmitError(msg, ops):
2118 b247c6fc Michael Hanselmann
    """Formats errors which occurred while submitting a job.
2119 b247c6fc Michael Hanselmann

2120 b247c6fc Michael Hanselmann
    """
2121 b247c6fc Michael Hanselmann
    return ("%s; opcodes %s" %
2122 b247c6fc Michael Hanselmann
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2123 b247c6fc Michael Hanselmann
2124 b247c6fc Michael Hanselmann
  @staticmethod
2125 b247c6fc Michael Hanselmann
  def _ResolveJobDependencies(resolve_fn, deps):
2126 b247c6fc Michael Hanselmann
    """Resolves relative job IDs in dependencies.
2127 b247c6fc Michael Hanselmann

2128 b247c6fc Michael Hanselmann
    @type resolve_fn: callable
2129 b247c6fc Michael Hanselmann
    @param resolve_fn: Function to resolve a relative job ID
2130 b247c6fc Michael Hanselmann
    @type deps: list
2131 b247c6fc Michael Hanselmann
    @param deps: Dependencies
2132 b247c6fc Michael Hanselmann
    @rtype: list
2133 b247c6fc Michael Hanselmann
    @return: Resolved dependencies
2134 b247c6fc Michael Hanselmann

2135 b247c6fc Michael Hanselmann
    """
2136 b247c6fc Michael Hanselmann
    result = []
2137 b247c6fc Michael Hanselmann
2138 b247c6fc Michael Hanselmann
    for (dep_job_id, dep_status) in deps:
2139 b247c6fc Michael Hanselmann
      if ht.TRelativeJobId(dep_job_id):
2140 b247c6fc Michael Hanselmann
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2141 b247c6fc Michael Hanselmann
        try:
2142 b247c6fc Michael Hanselmann
          job_id = resolve_fn(dep_job_id)
2143 b247c6fc Michael Hanselmann
        except IndexError:
2144 b247c6fc Michael Hanselmann
          # Abort
2145 b247c6fc Michael Hanselmann
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2146 b247c6fc Michael Hanselmann
      else:
2147 b247c6fc Michael Hanselmann
        job_id = dep_job_id
2148 b247c6fc Michael Hanselmann
2149 b247c6fc Michael Hanselmann
      result.append((job_id, dep_status))
2150 b247c6fc Michael Hanselmann
2151 b247c6fc Michael Hanselmann
    return (True, result)
2152 b247c6fc Michael Hanselmann
2153 b247c6fc Michael Hanselmann
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2154 b247c6fc Michael Hanselmann
    """Create and store multiple jobs.
2155 b247c6fc Michael Hanselmann

2156 b247c6fc Michael Hanselmann
    @see: L{_SubmitJobUnlocked}
2157 b247c6fc Michael Hanselmann

2158 b247c6fc Michael Hanselmann
    """
2159 b247c6fc Michael Hanselmann
    results = []
2160 b247c6fc Michael Hanselmann
    added_jobs = []
2161 b247c6fc Michael Hanselmann
2162 b247c6fc Michael Hanselmann
    def resolve_fn(job_idx, reljobid):
2163 b247c6fc Michael Hanselmann
      assert reljobid < 0
2164 b247c6fc Michael Hanselmann
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2165 b247c6fc Michael Hanselmann
2166 b247c6fc Michael Hanselmann
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2167 b247c6fc Michael Hanselmann
      for op in ops:
2168 b247c6fc Michael Hanselmann
        if getattr(op, opcodes.DEPEND_ATTR, None):
2169 b247c6fc Michael Hanselmann
          (status, data) = \
2170 b247c6fc Michael Hanselmann
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2171 b247c6fc Michael Hanselmann
                                         op.depends)
2172 b247c6fc Michael Hanselmann
          if not status:
2173 b247c6fc Michael Hanselmann
            # Abort resolving dependencies
2174 b247c6fc Michael Hanselmann
            assert ht.TNonEmptyString(data), "No error message"
2175 b247c6fc Michael Hanselmann
            break
2176 b247c6fc Michael Hanselmann
          # Use resolved dependencies
2177 b247c6fc Michael Hanselmann
          op.depends = data
2178 b247c6fc Michael Hanselmann
      else:
2179 b247c6fc Michael Hanselmann
        try:
2180 b247c6fc Michael Hanselmann
          job = self._SubmitJobUnlocked(job_id, ops)
2181 b247c6fc Michael Hanselmann
        except errors.GenericError, err:
2182 b247c6fc Michael Hanselmann
          status = False
2183 b247c6fc Michael Hanselmann
          data = self._FormatSubmitError(str(err), ops)
2184 b247c6fc Michael Hanselmann
        else:
2185 b247c6fc Michael Hanselmann
          status = True
2186 b247c6fc Michael Hanselmann
          data = job_id
2187 b247c6fc Michael Hanselmann
          added_jobs.append(job)
2188 b247c6fc Michael Hanselmann
2189 b247c6fc Michael Hanselmann
      results.append((status, data))
2190 b247c6fc Michael Hanselmann
2191 b247c6fc Michael Hanselmann
    return (results, added_jobs)
2192 b247c6fc Michael Hanselmann
2193 75d81fc8 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2194 7b5c4a69 Michael Hanselmann
  def _EnqueueJobs(self, jobs):
2195 7b5c4a69 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2196 7b5c4a69 Michael Hanselmann

2197 7b5c4a69 Michael Hanselmann
    @type jobs: list
2198 7b5c4a69 Michael Hanselmann
    @param jobs: List of all jobs
2199 7b5c4a69 Michael Hanselmann

2200 7b5c4a69 Michael Hanselmann
    """
2201 75d81fc8 Michael Hanselmann
    return self._EnqueueJobsUnlocked(jobs)
2202 75d81fc8 Michael Hanselmann
2203 75d81fc8 Michael Hanselmann
  def _EnqueueJobsUnlocked(self, jobs):
2204 75d81fc8 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2205 75d81fc8 Michael Hanselmann

2206 75d81fc8 Michael Hanselmann
    @type jobs: list
2207 75d81fc8 Michael Hanselmann
    @param jobs: List of all jobs
2208 75d81fc8 Michael Hanselmann

2209 75d81fc8 Michael Hanselmann
    """
2210 75d81fc8 Michael Hanselmann
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2211 7b5c4a69 Michael Hanselmann
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2212 7b5c4a69 Michael Hanselmann
                             priority=[job.CalcPriority() for job in jobs])
2213 7b5c4a69 Michael Hanselmann
2214 b95479a5 Michael Hanselmann
  def _GetJobStatusForDependencies(self, job_id):
2215 b95479a5 Michael Hanselmann
    """Gets the status of a job for dependencies.
2216 b95479a5 Michael Hanselmann

2217 b95479a5 Michael Hanselmann
    @type job_id: string
2218 b95479a5 Michael Hanselmann
    @param job_id: Job ID
2219 b95479a5 Michael Hanselmann
    @raise errors.JobLost: If job can't be found
2220 b95479a5 Michael Hanselmann

2221 b95479a5 Michael Hanselmann
    """
2222 b95479a5 Michael Hanselmann
    if not isinstance(job_id, basestring):
2223 b95479a5 Michael Hanselmann
      job_id = self._FormatJobID(job_id)
2224 b95479a5 Michael Hanselmann
2225 b95479a5 Michael Hanselmann
    # Not using in-memory cache as doing so would require an exclusive lock
2226 b95479a5 Michael Hanselmann
2227 b95479a5 Michael Hanselmann
    # Try to load from disk
2228 c0f6d0d8 Michael Hanselmann
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2229 c0f6d0d8 Michael Hanselmann
2230 17385bd2 Andrea Spadaccini
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2231 b95479a5 Michael Hanselmann
2232 b95479a5 Michael Hanselmann
    if job:
2233 b95479a5 Michael Hanselmann
      return job.CalcStatus()
2234 b95479a5 Michael Hanselmann
2235 b95479a5 Michael Hanselmann
    raise errors.JobLost("Job %s not found" % job_id)
2236 b95479a5 Michael Hanselmann
2237 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2238 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
2239 ea03467c Iustin Pop
    """Update a job's on disk storage.
2240 ea03467c Iustin Pop

2241 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
2242 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
2243 ea03467c Iustin Pop
    nodes.
2244 ea03467c Iustin Pop

2245 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
2246 ea03467c Iustin Pop
    @param job: the changed job
2247 4c36bdf5 Guido Trotter
    @type replicate: boolean
2248 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
2249 ea03467c Iustin Pop

2250 ea03467c Iustin Pop
    """
2251 66bd7445 Michael Hanselmann
    if __debug__:
2252 66bd7445 Michael Hanselmann
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2253 66bd7445 Michael Hanselmann
      assert (finalized ^ (job.end_timestamp is None))
2254 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't update read-only job"
2255 66bd7445 Michael Hanselmann
2256 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
2257 a182a3ed Michael Hanselmann
    data = serializer.DumpJson(job.Serialize())
2258 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
2259 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
2260 ac0930b9 Iustin Pop
2261 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2262 5c735209 Iustin Pop
                        timeout):
2263 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
2264 6c5a7090 Michael Hanselmann

2265 6c5a7090 Michael Hanselmann
    @type job_id: string
2266 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
2267 6c5a7090 Michael Hanselmann
    @type fields: list of strings
2268 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
2269 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
2270 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
2271 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
2272 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
2273 5c735209 Iustin Pop
    @type timeout: float
2274 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
2275 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
2276 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
2277 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
2278 ea03467c Iustin Pop

2279 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
2280 ea03467c Iustin Pop
        we instead return a special value,
2281 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2282 ea03467c Iustin Pop
        as such by the clients
2283 6c5a7090 Michael Hanselmann

2284 6c5a7090 Michael Hanselmann
    """
2285 c0f6d0d8 Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2286 c0f6d0d8 Michael Hanselmann
                             writable=False)
2287 989a8bee Michael Hanselmann
2288 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
2289 989a8bee Michael Hanselmann
2290 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
2291 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
2292 dfe57c22 Michael Hanselmann
2293 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2294 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2295 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
2296 188c5e0a Michael Hanselmann
    """Cancels a job.
2297 188c5e0a Michael Hanselmann

2298 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
2299 ea03467c Iustin Pop

2300 188c5e0a Michael Hanselmann
    @type job_id: string
2301 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
2302 188c5e0a Michael Hanselmann

2303 188c5e0a Michael Hanselmann
    """
2304 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
2305 188c5e0a Michael Hanselmann
2306 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2307 188c5e0a Michael Hanselmann
    if not job:
2308 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2309 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
2310 fbf0262f Michael Hanselmann
2311 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Can't cancel read-only job"
2312 c0f6d0d8 Michael Hanselmann
2313 099b2870 Michael Hanselmann
    (success, msg) = job.Cancel()
2314 188c5e0a Michael Hanselmann
2315 099b2870 Michael Hanselmann
    if success:
2316 66bd7445 Michael Hanselmann
      # If the job was finalized (e.g. cancelled), this is the final write
2317 66bd7445 Michael Hanselmann
      # allowed. The job can be archived anytime.
2318 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
2319 fbf0262f Michael Hanselmann
2320 099b2870 Michael Hanselmann
    return (success, msg)
2321 fbf0262f Michael Hanselmann
2322 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
2323 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
2324 d7fd1f28 Michael Hanselmann
    """Archives jobs.
2325 c609f802 Michael Hanselmann

2326 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
2327 25e7b43f Iustin Pop
    @param jobs: Job objects
2328 d7fd1f28 Michael Hanselmann
    @rtype: int
2329 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
2330 c609f802 Michael Hanselmann

2331 c609f802 Michael Hanselmann
    """
2332 d7fd1f28 Michael Hanselmann
    archive_jobs = []
2333 d7fd1f28 Michael Hanselmann
    rename_files = []
2334 d7fd1f28 Michael Hanselmann
    for job in jobs:
2335 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't archive read-only job"
2336 c0f6d0d8 Michael Hanselmann
2337 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2338 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
2339 d7fd1f28 Michael Hanselmann
        continue
2340 c609f802 Michael Hanselmann
2341 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
2342 c609f802 Michael Hanselmann
2343 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
2344 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
2345 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
2346 c609f802 Michael Hanselmann
2347 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
2348 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
2349 f1da30e6 Michael Hanselmann
2350 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
2351 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
2352 d7fd1f28 Michael Hanselmann
2353 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2354 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
2355 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
2356 20571a26 Guido Trotter
    # archived jobs to fix this.
2357 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
2358 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
2359 78d12585 Michael Hanselmann
2360 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2361 07cd723a Iustin Pop
  @_RequireOpenQueue
2362 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
2363 07cd723a Iustin Pop
    """Archives a job.
2364 07cd723a Iustin Pop

2365 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2366 ea03467c Iustin Pop

2367 07cd723a Iustin Pop
    @type job_id: string
2368 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
2369 78d12585 Michael Hanselmann
    @rtype: bool
2370 78d12585 Michael Hanselmann
    @return: Whether job was archived
2371 07cd723a Iustin Pop

2372 07cd723a Iustin Pop
    """
2373 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
2374 78d12585 Michael Hanselmann
2375 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2376 78d12585 Michael Hanselmann
    if not job:
2377 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2378 78d12585 Michael Hanselmann
      return False
2379 78d12585 Michael Hanselmann
2380 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
2381 07cd723a Iustin Pop
2382 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2383 07cd723a Iustin Pop
  @_RequireOpenQueue
2384 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
2385 07cd723a Iustin Pop
    """Archives all jobs based on age.
2386 07cd723a Iustin Pop

2387 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
2388 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
2389 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
2390 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
2391 07cd723a Iustin Pop

2392 07cd723a Iustin Pop
    @type age: int
2393 07cd723a Iustin Pop
    @param age: the minimum age in seconds
2394 07cd723a Iustin Pop

2395 07cd723a Iustin Pop
    """
2396 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
2397 07cd723a Iustin Pop
2398 07cd723a Iustin Pop
    now = time.time()
2399 f8ad5591 Michael Hanselmann
    end_time = now + timeout
2400 f8ad5591 Michael Hanselmann
    archived_count = 0
2401 f8ad5591 Michael Hanselmann
    last_touched = 0
2402 f8ad5591 Michael Hanselmann
2403 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
2404 d7fd1f28 Michael Hanselmann
    pending = []
2405 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
2406 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
2407 f8ad5591 Michael Hanselmann
2408 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
2409 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
2410 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
2411 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
2412 f8ad5591 Michael Hanselmann
        break
2413 f8ad5591 Michael Hanselmann
2414 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
2415 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
2416 f8ad5591 Michael Hanselmann
      if job:
2417 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
2418 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
2419 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
2420 f8ad5591 Michael Hanselmann
          else:
2421 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
2422 07cd723a Iustin Pop
        else:
2423 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
2424 f8ad5591 Michael Hanselmann
2425 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
2426 d7fd1f28 Michael Hanselmann
          pending.append(job)
2427 d7fd1f28 Michael Hanselmann
2428 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
2429 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
2430 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
2431 d7fd1f28 Michael Hanselmann
            pending = []
2432 f8ad5591 Michael Hanselmann
2433 d7fd1f28 Michael Hanselmann
    if pending:
2434 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
2435 07cd723a Iustin Pop
2436 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
2437 07cd723a Iustin Pop
2438 e07f7f7a Michael Hanselmann
  def _Query(self, fields, qfilter):
2439 e07f7f7a Michael Hanselmann
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2440 e07f7f7a Michael Hanselmann
                       namefield="id")
2441 e07f7f7a Michael Hanselmann
2442 e07f7f7a Michael Hanselmann
    job_ids = qobj.RequestedNames()
2443 e07f7f7a Michael Hanselmann
2444 e07f7f7a Michael Hanselmann
    list_all = (job_ids is None)
2445 e07f7f7a Michael Hanselmann
2446 e07f7f7a Michael Hanselmann
    if list_all:
2447 e07f7f7a Michael Hanselmann
      # Since files are added to/removed from the queue atomically, there's no
2448 e07f7f7a Michael Hanselmann
      # risk of getting the job ids in an inconsistent state.
2449 e07f7f7a Michael Hanselmann
      job_ids = self._GetJobIDsUnlocked()
2450 e07f7f7a Michael Hanselmann
2451 e07f7f7a Michael Hanselmann
    jobs = []
2452 e07f7f7a Michael Hanselmann
2453 e07f7f7a Michael Hanselmann
    for job_id in job_ids:
2454 e07f7f7a Michael Hanselmann
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2455 e07f7f7a Michael Hanselmann
      if job is not None or not list_all:
2456 e07f7f7a Michael Hanselmann
        jobs.append((job_id, job))
2457 e07f7f7a Michael Hanselmann
2458 e07f7f7a Michael Hanselmann
    return (qobj, jobs, list_all)
2459 e07f7f7a Michael Hanselmann
2460 e07f7f7a Michael Hanselmann
  def QueryJobs(self, fields, qfilter):
2461 e07f7f7a Michael Hanselmann
    """Returns a list of jobs in queue.
2462 e07f7f7a Michael Hanselmann

2463 e07f7f7a Michael Hanselmann
    @type fields: sequence
2464 e07f7f7a Michael Hanselmann
    @param fields: List of wanted fields
2465 e07f7f7a Michael Hanselmann
    @type qfilter: None or query2 filter (list)
2466 e07f7f7a Michael Hanselmann
    @param qfilter: Query filter
2467 e07f7f7a Michael Hanselmann

2468 e07f7f7a Michael Hanselmann
    """
2469 e07f7f7a Michael Hanselmann
    (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2470 e07f7f7a Michael Hanselmann
2471 e07f7f7a Michael Hanselmann
    return query.GetQueryResponse(qobj, ctx, sort_by_name=sort_by_name)
2472 e07f7f7a Michael Hanselmann
2473 e07f7f7a Michael Hanselmann
  def OldStyleQueryJobs(self, job_ids, fields):
2474 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
2475 e2715f69 Michael Hanselmann

2476 ea03467c Iustin Pop
    @type job_ids: list
2477 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
2478 ea03467c Iustin Pop
    @type fields: list
2479 ea03467c Iustin Pop
    @param fields: names of fields to return
2480 ea03467c Iustin Pop
    @rtype: list
2481 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
2482 ea03467c Iustin Pop
        the requested fields
2483 e2715f69 Michael Hanselmann

2484 e2715f69 Michael Hanselmann
    """
2485 e07f7f7a Michael Hanselmann
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2486 e2715f69 Michael Hanselmann
2487 e07f7f7a Michael Hanselmann
    (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2488 e2715f69 Michael Hanselmann
2489 e07f7f7a Michael Hanselmann
    return qobj.OldStyleQuery(ctx, sort_by_name=sort_by_name)
2490 e2715f69 Michael Hanselmann
2491 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2492 6d5ea385 Michael Hanselmann
  def PrepareShutdown(self):
2493 6d5ea385 Michael Hanselmann
    """Prepare to stop the job queue.
2494 6d5ea385 Michael Hanselmann

2495 6d5ea385 Michael Hanselmann
    Disables execution of jobs in the workerpool and returns whether there are
2496 6d5ea385 Michael Hanselmann
    any jobs currently running. If the latter is the case, the job queue is not
2497 6d5ea385 Michael Hanselmann
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2498 6d5ea385 Michael Hanselmann
    be called without interfering with any job. Queued and unfinished jobs will
2499 6d5ea385 Michael Hanselmann
    be resumed next time.
2500 6d5ea385 Michael Hanselmann

2501 6d5ea385 Michael Hanselmann
    Once this function has been called no new job submissions will be accepted
2502 6d5ea385 Michael Hanselmann
    (see L{_RequireNonDrainedQueue}).
2503 6d5ea385 Michael Hanselmann

2504 6d5ea385 Michael Hanselmann
    @rtype: bool
2505 6d5ea385 Michael Hanselmann
    @return: Whether there are any running jobs
2506 6d5ea385 Michael Hanselmann

2507 6d5ea385 Michael Hanselmann
    """
2508 6d5ea385 Michael Hanselmann
    if self._accepting_jobs:
2509 6d5ea385 Michael Hanselmann
      self._accepting_jobs = False
2510 6d5ea385 Michael Hanselmann
2511 6d5ea385 Michael Hanselmann
      # Tell worker pool to stop processing pending tasks
2512 6d5ea385 Michael Hanselmann
      self._wpool.SetActive(False)
2513 6d5ea385 Michael Hanselmann
2514 6d5ea385 Michael Hanselmann
    return self._wpool.HasRunningTasks()
2515 6d5ea385 Michael Hanselmann
2516 6d5ea385 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2517 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2518 e2715f69 Michael Hanselmann
  def Shutdown(self):
2519 e2715f69 Michael Hanselmann
    """Stops the job queue.
2520 e2715f69 Michael Hanselmann

2521 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
2522 ea03467c Iustin Pop

2523 e2715f69 Michael Hanselmann
    """
2524 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
2525 85f03e0d Michael Hanselmann
2526 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
2527 a71f9c7d Guido Trotter
    self._queue_filelock = None