Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 68a856ef

History | View | Annotate | Download (73.5 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 b95479a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 e2715f69 Michael Hanselmann
import logging
33 f1da30e6 Michael Hanselmann
import errno
34 f1048938 Iustin Pop
import time
35 5685c1a5 Michael Hanselmann
import weakref
36 b95479a5 Michael Hanselmann
import threading
37 dfc8824a Michael Hanselmann
import itertools
38 498ae1cc Iustin Pop
39 6c2549d6 Guido Trotter
try:
40 b459a848 Andrea Spadaccini
  # pylint: disable=E0611
41 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
42 6c2549d6 Guido Trotter
except ImportError:
43 6c2549d6 Guido Trotter
  import pyinotify
44 6c2549d6 Guido Trotter
45 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
46 e2715f69 Michael Hanselmann
from ganeti import constants
47 f1da30e6 Michael Hanselmann
from ganeti import serializer
48 e2715f69 Michael Hanselmann
from ganeti import workerpool
49 99bd4f0a Guido Trotter
from ganeti import locking
50 f1da30e6 Michael Hanselmann
from ganeti import opcodes
51 7a1ecaed Iustin Pop
from ganeti import errors
52 e2715f69 Michael Hanselmann
from ganeti import mcpu
53 7996a135 Iustin Pop
from ganeti import utils
54 04ab05ce Michael Hanselmann
from ganeti import jstore
55 c3f0a12f Iustin Pop
from ganeti import rpc
56 82b22e19 René Nussbaumer
from ganeti import runtime
57 a744b676 Manuel Franceschini
from ganeti import netutils
58 989a8bee Michael Hanselmann
from ganeti import compat
59 b95479a5 Michael Hanselmann
from ganeti import ht
60 e2715f69 Michael Hanselmann
61 fbf0262f Michael Hanselmann
62 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
63 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
64 e2715f69 Michael Hanselmann
65 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
66 ebb80afa Guido Trotter
_LOCK = "_lock"
67 ebb80afa Guido Trotter
_QUEUE = "_queue"
68 99bd4f0a Guido Trotter
69 498ae1cc Iustin Pop
70 9728ae5d Iustin Pop
class CancelJob(Exception):
71 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
72 fbf0262f Michael Hanselmann

73 fbf0262f Michael Hanselmann
  """
74 fbf0262f Michael Hanselmann
75 fbf0262f Michael Hanselmann
76 70552c46 Michael Hanselmann
def TimeStampNow():
77 ea03467c Iustin Pop
  """Returns the current timestamp.
78 ea03467c Iustin Pop

79 ea03467c Iustin Pop
  @rtype: tuple
80 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
81 ea03467c Iustin Pop

82 ea03467c Iustin Pop
  """
83 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
84 70552c46 Michael Hanselmann
85 70552c46 Michael Hanselmann
86 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
87 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
88 e2715f69 Michael Hanselmann

89 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
90 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
91 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
92 ea03467c Iustin Pop
  @ivar status: the current status
93 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
94 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
95 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
96 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
97 f1048938 Iustin Pop

98 e2715f69 Michael Hanselmann
  """
99 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
100 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
101 66d895a8 Iustin Pop
               "__weakref__"]
102 66d895a8 Iustin Pop
103 85f03e0d Michael Hanselmann
  def __init__(self, op):
104 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
105 ea03467c Iustin Pop

106 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
107 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
108 ea03467c Iustin Pop

109 ea03467c Iustin Pop
    """
110 85f03e0d Michael Hanselmann
    self.input = op
111 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
112 85f03e0d Michael Hanselmann
    self.result = None
113 85f03e0d Michael Hanselmann
    self.log = []
114 70552c46 Michael Hanselmann
    self.start_timestamp = None
115 b9b5abcb Iustin Pop
    self.exec_timestamp = None
116 70552c46 Michael Hanselmann
    self.end_timestamp = None
117 f1da30e6 Michael Hanselmann
118 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
119 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
120 8f5c488d Michael Hanselmann
121 f1da30e6 Michael Hanselmann
  @classmethod
122 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
123 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
124 ea03467c Iustin Pop

125 ea03467c Iustin Pop
    @type state: dict
126 ea03467c Iustin Pop
    @param state: the serialized state
127 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
128 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
129 ea03467c Iustin Pop

130 ea03467c Iustin Pop
    """
131 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
132 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
133 85f03e0d Michael Hanselmann
    obj.status = state["status"]
134 85f03e0d Michael Hanselmann
    obj.result = state["result"]
135 85f03e0d Michael Hanselmann
    obj.log = state["log"]
136 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
137 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
138 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
139 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
140 f1da30e6 Michael Hanselmann
    return obj
141 f1da30e6 Michael Hanselmann
142 f1da30e6 Michael Hanselmann
  def Serialize(self):
143 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
144 ea03467c Iustin Pop

145 ea03467c Iustin Pop
    @rtype: dict
146 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
147 ea03467c Iustin Pop

148 ea03467c Iustin Pop
    """
149 6c5a7090 Michael Hanselmann
    return {
150 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
151 6c5a7090 Michael Hanselmann
      "status": self.status,
152 6c5a7090 Michael Hanselmann
      "result": self.result,
153 6c5a7090 Michael Hanselmann
      "log": self.log,
154 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
155 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
156 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
157 8f5c488d Michael Hanselmann
      "priority": self.priority,
158 6c5a7090 Michael Hanselmann
      }
159 f1048938 Iustin Pop
160 e2715f69 Michael Hanselmann
161 e2715f69 Michael Hanselmann
class _QueuedJob(object):
162 e2715f69 Michael Hanselmann
  """In-memory job representation.
163 e2715f69 Michael Hanselmann

164 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
165 ea03467c Iustin Pop
  be taken care of by users of this class.
166 ea03467c Iustin Pop

167 ea03467c Iustin Pop
  @type queue: L{JobQueue}
168 ea03467c Iustin Pop
  @ivar queue: the parent queue
169 ea03467c Iustin Pop
  @ivar id: the job ID
170 ea03467c Iustin Pop
  @type ops: list
171 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
172 ea03467c Iustin Pop
  @type log_serial: int
173 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
174 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
175 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
176 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
177 c0f6d0d8 Michael Hanselmann
  @ivar writable: Whether the job is allowed to be modified
178 e2715f69 Michael Hanselmann

179 e2715f69 Michael Hanselmann
  """
180 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
181 26d3fd2f Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
182 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
183 c0f6d0d8 Michael Hanselmann
               "__weakref__", "processor_lock", "writable"]
184 66d895a8 Iustin Pop
185 c0f6d0d8 Michael Hanselmann
  def __init__(self, queue, job_id, ops, writable):
186 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
187 ea03467c Iustin Pop

188 ea03467c Iustin Pop
    @type queue: L{JobQueue}
189 ea03467c Iustin Pop
    @param queue: our parent queue
190 ea03467c Iustin Pop
    @type job_id: job_id
191 ea03467c Iustin Pop
    @param job_id: our job id
192 ea03467c Iustin Pop
    @type ops: list
193 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
194 ea03467c Iustin Pop
        in _QueuedOpCodes
195 c0f6d0d8 Michael Hanselmann
    @type writable: bool
196 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
197 ea03467c Iustin Pop

198 ea03467c Iustin Pop
    """
199 e2715f69 Michael Hanselmann
    if not ops:
200 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
201 e2715f69 Michael Hanselmann
202 85f03e0d Michael Hanselmann
    self.queue = queue
203 f1da30e6 Michael Hanselmann
    self.id = job_id
204 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
205 6c5a7090 Michael Hanselmann
    self.log_serial = 0
206 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
207 c56ec146 Iustin Pop
    self.start_timestamp = None
208 c56ec146 Iustin Pop
    self.end_timestamp = None
209 6c5a7090 Michael Hanselmann
210 c0f6d0d8 Michael Hanselmann
    self._InitInMemory(self, writable)
211 fa4aa6b4 Michael Hanselmann
212 fa4aa6b4 Michael Hanselmann
  @staticmethod
213 c0f6d0d8 Michael Hanselmann
  def _InitInMemory(obj, writable):
214 fa4aa6b4 Michael Hanselmann
    """Initializes in-memory variables.
215 fa4aa6b4 Michael Hanselmann

216 fa4aa6b4 Michael Hanselmann
    """
217 c0f6d0d8 Michael Hanselmann
    obj.writable = writable
218 03b63608 Michael Hanselmann
    obj.ops_iter = None
219 26d3fd2f Michael Hanselmann
    obj.cur_opctx = None
220 f8a4adfa Michael Hanselmann
221 f8a4adfa Michael Hanselmann
    # Read-only jobs are not processed and therefore don't need a lock
222 f8a4adfa Michael Hanselmann
    if writable:
223 f8a4adfa Michael Hanselmann
      obj.processor_lock = threading.Lock()
224 f8a4adfa Michael Hanselmann
    else:
225 f8a4adfa Michael Hanselmann
      obj.processor_lock = None
226 be760ba8 Michael Hanselmann
227 9fa2e150 Michael Hanselmann
  def __repr__(self):
228 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
229 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
230 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
231 9fa2e150 Michael Hanselmann
232 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
233 9fa2e150 Michael Hanselmann
234 f1da30e6 Michael Hanselmann
  @classmethod
235 c0f6d0d8 Michael Hanselmann
  def Restore(cls, queue, state, writable):
236 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
237 ea03467c Iustin Pop

238 ea03467c Iustin Pop
    @type queue: L{JobQueue}
239 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
240 ea03467c Iustin Pop
    @type state: dict
241 ea03467c Iustin Pop
    @param state: the serialized state
242 c0f6d0d8 Michael Hanselmann
    @type writable: bool
243 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
244 ea03467c Iustin Pop
    @rtype: _JobQueue
245 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
246 ea03467c Iustin Pop

247 ea03467c Iustin Pop
    """
248 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
249 85f03e0d Michael Hanselmann
    obj.queue = queue
250 85f03e0d Michael Hanselmann
    obj.id = state["id"]
251 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
252 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
253 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
254 6c5a7090 Michael Hanselmann
255 6c5a7090 Michael Hanselmann
    obj.ops = []
256 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
257 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
258 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
259 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
260 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
261 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
262 6c5a7090 Michael Hanselmann
263 c0f6d0d8 Michael Hanselmann
    cls._InitInMemory(obj, writable)
264 be760ba8 Michael Hanselmann
265 f1da30e6 Michael Hanselmann
    return obj
266 f1da30e6 Michael Hanselmann
267 f1da30e6 Michael Hanselmann
  def Serialize(self):
268 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
269 ea03467c Iustin Pop

270 ea03467c Iustin Pop
    @rtype: dict
271 ea03467c Iustin Pop
    @return: the serialized state
272 ea03467c Iustin Pop

273 ea03467c Iustin Pop
    """
274 f1da30e6 Michael Hanselmann
    return {
275 f1da30e6 Michael Hanselmann
      "id": self.id,
276 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
277 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
278 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
279 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
280 f1da30e6 Michael Hanselmann
      }
281 f1da30e6 Michael Hanselmann
282 85f03e0d Michael Hanselmann
  def CalcStatus(self):
283 ea03467c Iustin Pop
    """Compute the status of this job.
284 ea03467c Iustin Pop

285 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
286 ea03467c Iustin Pop
    based on their status, computes the job status.
287 ea03467c Iustin Pop

288 ea03467c Iustin Pop
    The algorithm is:
289 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
290 ea03467c Iustin Pop
        status will be the same
291 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
292 ea03467c Iustin Pop
          - waitlock
293 fbf0262f Michael Hanselmann
          - canceling
294 ea03467c Iustin Pop
          - running
295 ea03467c Iustin Pop

296 ea03467c Iustin Pop
        will determine the job status
297 ea03467c Iustin Pop

298 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
299 ea03467c Iustin Pop
        and the job status will be the same
300 ea03467c Iustin Pop

301 ea03467c Iustin Pop
    @return: the job status
302 ea03467c Iustin Pop

303 ea03467c Iustin Pop
    """
304 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
305 e2715f69 Michael Hanselmann
306 e2715f69 Michael Hanselmann
    all_success = True
307 85f03e0d Michael Hanselmann
    for op in self.ops:
308 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
309 e2715f69 Michael Hanselmann
        continue
310 e2715f69 Michael Hanselmann
311 e2715f69 Michael Hanselmann
      all_success = False
312 e2715f69 Michael Hanselmann
313 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
314 e2715f69 Michael Hanselmann
        pass
315 47099cd1 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING:
316 47099cd1 Michael Hanselmann
        status = constants.JOB_STATUS_WAITING
317 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
318 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
319 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
320 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
321 fbf0262f Michael Hanselmann
        break
322 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
323 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
324 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
325 f1da30e6 Michael Hanselmann
        break
326 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
327 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
328 4cb1d919 Michael Hanselmann
        break
329 e2715f69 Michael Hanselmann
330 e2715f69 Michael Hanselmann
    if all_success:
331 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
332 e2715f69 Michael Hanselmann
333 e2715f69 Michael Hanselmann
    return status
334 e2715f69 Michael Hanselmann
335 8f5c488d Michael Hanselmann
  def CalcPriority(self):
336 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
337 8f5c488d Michael Hanselmann

338 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
339 8f5c488d Michael Hanselmann
    priority is used.
340 8f5c488d Michael Hanselmann

341 8f5c488d Michael Hanselmann
    @rtype: int
342 8f5c488d Michael Hanselmann

343 8f5c488d Michael Hanselmann
    """
344 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
345 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
346 8f5c488d Michael Hanselmann
347 8f5c488d Michael Hanselmann
    if not priorities:
348 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
349 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
350 8f5c488d Michael Hanselmann
351 8f5c488d Michael Hanselmann
    return min(priorities)
352 8f5c488d Michael Hanselmann
353 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
354 ea03467c Iustin Pop
    """Selectively returns the log entries.
355 ea03467c Iustin Pop

356 ea03467c Iustin Pop
    @type newer_than: None or int
357 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
358 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
359 ea03467c Iustin Pop
        than this value
360 ea03467c Iustin Pop
    @rtype: list
361 ea03467c Iustin Pop
    @return: the list of the log entries selected
362 ea03467c Iustin Pop

363 ea03467c Iustin Pop
    """
364 6c5a7090 Michael Hanselmann
    if newer_than is None:
365 6c5a7090 Michael Hanselmann
      serial = -1
366 6c5a7090 Michael Hanselmann
    else:
367 6c5a7090 Michael Hanselmann
      serial = newer_than
368 6c5a7090 Michael Hanselmann
369 6c5a7090 Michael Hanselmann
    entries = []
370 6c5a7090 Michael Hanselmann
    for op in self.ops:
371 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
372 6c5a7090 Michael Hanselmann
373 6c5a7090 Michael Hanselmann
    return entries
374 6c5a7090 Michael Hanselmann
375 6a290889 Guido Trotter
  def GetInfo(self, fields):
376 6a290889 Guido Trotter
    """Returns information about a job.
377 6a290889 Guido Trotter

378 6a290889 Guido Trotter
    @type fields: list
379 6a290889 Guido Trotter
    @param fields: names of fields to return
380 6a290889 Guido Trotter
    @rtype: list
381 6a290889 Guido Trotter
    @return: list with one element for each field
382 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
383 6a290889 Guido Trotter
        has been passed
384 6a290889 Guido Trotter

385 6a290889 Guido Trotter
    """
386 6a290889 Guido Trotter
    row = []
387 6a290889 Guido Trotter
    for fname in fields:
388 6a290889 Guido Trotter
      if fname == "id":
389 6a290889 Guido Trotter
        row.append(self.id)
390 6a290889 Guido Trotter
      elif fname == "status":
391 6a290889 Guido Trotter
        row.append(self.CalcStatus())
392 b8802cc4 Michael Hanselmann
      elif fname == "priority":
393 b8802cc4 Michael Hanselmann
        row.append(self.CalcPriority())
394 6a290889 Guido Trotter
      elif fname == "ops":
395 6a290889 Guido Trotter
        row.append([op.input.__getstate__() for op in self.ops])
396 6a290889 Guido Trotter
      elif fname == "opresult":
397 6a290889 Guido Trotter
        row.append([op.result for op in self.ops])
398 6a290889 Guido Trotter
      elif fname == "opstatus":
399 6a290889 Guido Trotter
        row.append([op.status for op in self.ops])
400 6a290889 Guido Trotter
      elif fname == "oplog":
401 6a290889 Guido Trotter
        row.append([op.log for op in self.ops])
402 6a290889 Guido Trotter
      elif fname == "opstart":
403 6a290889 Guido Trotter
        row.append([op.start_timestamp for op in self.ops])
404 6a290889 Guido Trotter
      elif fname == "opexec":
405 6a290889 Guido Trotter
        row.append([op.exec_timestamp for op in self.ops])
406 6a290889 Guido Trotter
      elif fname == "opend":
407 6a290889 Guido Trotter
        row.append([op.end_timestamp for op in self.ops])
408 b8802cc4 Michael Hanselmann
      elif fname == "oppriority":
409 b8802cc4 Michael Hanselmann
        row.append([op.priority for op in self.ops])
410 6a290889 Guido Trotter
      elif fname == "received_ts":
411 6a290889 Guido Trotter
        row.append(self.received_timestamp)
412 6a290889 Guido Trotter
      elif fname == "start_ts":
413 6a290889 Guido Trotter
        row.append(self.start_timestamp)
414 6a290889 Guido Trotter
      elif fname == "end_ts":
415 6a290889 Guido Trotter
        row.append(self.end_timestamp)
416 6a290889 Guido Trotter
      elif fname == "summary":
417 6a290889 Guido Trotter
        row.append([op.input.Summary() for op in self.ops])
418 6a290889 Guido Trotter
      else:
419 6a290889 Guido Trotter
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
420 6a290889 Guido Trotter
    return row
421 6a290889 Guido Trotter
422 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
423 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
424 34327f51 Iustin Pop

425 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
426 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
427 34327f51 Iustin Pop
    finalised are not changed.
428 34327f51 Iustin Pop

429 34327f51 Iustin Pop
    @param status: a given opcode status
430 34327f51 Iustin Pop
    @param result: the opcode result
431 34327f51 Iustin Pop

432 34327f51 Iustin Pop
    """
433 747f6113 Michael Hanselmann
    not_marked = True
434 747f6113 Michael Hanselmann
    for op in self.ops:
435 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
436 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
437 747f6113 Michael Hanselmann
        continue
438 747f6113 Michael Hanselmann
      op.status = status
439 747f6113 Michael Hanselmann
      op.result = result
440 747f6113 Michael Hanselmann
      not_marked = False
441 34327f51 Iustin Pop
442 66bd7445 Michael Hanselmann
  def Finalize(self):
443 66bd7445 Michael Hanselmann
    """Marks the job as finalized.
444 66bd7445 Michael Hanselmann

445 66bd7445 Michael Hanselmann
    """
446 66bd7445 Michael Hanselmann
    self.end_timestamp = TimeStampNow()
447 66bd7445 Michael Hanselmann
448 099b2870 Michael Hanselmann
  def Cancel(self):
449 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
450 a0d2fe2c Michael Hanselmann

451 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
452 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
453 a0d2fe2c Michael Hanselmann
      as canceling and a text message
454 a0d2fe2c Michael Hanselmann

455 a0d2fe2c Michael Hanselmann
    """
456 099b2870 Michael Hanselmann
    status = self.CalcStatus()
457 099b2870 Michael Hanselmann
458 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
459 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
460 099b2870 Michael Hanselmann
                             "Job canceled by request")
461 66bd7445 Michael Hanselmann
      self.Finalize()
462 86b16e9d Michael Hanselmann
      return (True, "Job %s canceled" % self.id)
463 099b2870 Michael Hanselmann
464 47099cd1 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITING:
465 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
466 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
467 86b16e9d Michael Hanselmann
      return (True, "Job %s will be canceled" % self.id)
468 099b2870 Michael Hanselmann
469 86b16e9d Michael Hanselmann
    else:
470 86b16e9d Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
471 86b16e9d Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
472 099b2870 Michael Hanselmann
473 f1048938 Iustin Pop
474 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
475 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
476 031a3e57 Michael Hanselmann
    """Initializes this class.
477 ea03467c Iustin Pop

478 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
479 031a3e57 Michael Hanselmann
    @param queue: Job queue
480 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
481 031a3e57 Michael Hanselmann
    @param job: Job object
482 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
483 031a3e57 Michael Hanselmann
    @param op: OpCode
484 031a3e57 Michael Hanselmann

485 031a3e57 Michael Hanselmann
    """
486 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
487 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
488 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
489 031a3e57 Michael Hanselmann
490 031a3e57 Michael Hanselmann
    self._queue = queue
491 031a3e57 Michael Hanselmann
    self._job = job
492 031a3e57 Michael Hanselmann
    self._op = op
493 031a3e57 Michael Hanselmann
494 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
495 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
496 dc1e2262 Michael Hanselmann

497 dc1e2262 Michael Hanselmann
    """
498 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
499 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
500 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
501 dc1e2262 Michael Hanselmann
      raise CancelJob()
502 dc1e2262 Michael Hanselmann
503 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
504 031a3e57 Michael Hanselmann
  def NotifyStart(self):
505 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
506 e92376d7 Iustin Pop

507 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
508 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
509 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
510 47099cd1 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
511 e92376d7 Iustin Pop

512 e92376d7 Iustin Pop
    """
513 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
514 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
515 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
516 fbf0262f Michael Hanselmann
517 271daef8 Iustin Pop
    # Cancel here if we were asked to
518 dc1e2262 Michael Hanselmann
    self._CheckCancel()
519 fbf0262f Michael Hanselmann
520 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
521 9bdab621 Michael Hanselmann
522 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
523 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
524 271daef8 Iustin Pop
525 271daef8 Iustin Pop
    # And finally replicate the job status
526 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
527 031a3e57 Michael Hanselmann
528 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
529 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
530 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
531 9bf5e01f Guido Trotter

532 9bf5e01f Guido Trotter
    """
533 9bf5e01f Guido Trotter
    self._job.log_serial += 1
534 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
535 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
536 9bf5e01f Guido Trotter
537 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
538 031a3e57 Michael Hanselmann
    """Append a log entry.
539 031a3e57 Michael Hanselmann

540 031a3e57 Michael Hanselmann
    """
541 031a3e57 Michael Hanselmann
    assert len(args) < 3
542 031a3e57 Michael Hanselmann
543 031a3e57 Michael Hanselmann
    if len(args) == 1:
544 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
545 031a3e57 Michael Hanselmann
      log_msg = args[0]
546 031a3e57 Michael Hanselmann
    else:
547 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
548 031a3e57 Michael Hanselmann
549 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
550 031a3e57 Michael Hanselmann
    # precision.
551 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
552 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
553 031a3e57 Michael Hanselmann
554 acf931b7 Michael Hanselmann
  def CheckCancel(self):
555 acf931b7 Michael Hanselmann
    """Check whether job has been cancelled.
556 ef2df7d3 Michael Hanselmann

557 ef2df7d3 Michael Hanselmann
    """
558 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
559 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
560 dc1e2262 Michael Hanselmann
561 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
562 dc1e2262 Michael Hanselmann
    self._CheckCancel()
563 dc1e2262 Michael Hanselmann
564 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
565 6a373640 Michael Hanselmann
    """Submits jobs for processing.
566 6a373640 Michael Hanselmann

567 6a373640 Michael Hanselmann
    See L{JobQueue.SubmitManyJobs}.
568 6a373640 Michael Hanselmann

569 6a373640 Michael Hanselmann
    """
570 6a373640 Michael Hanselmann
    # Locking is done in job queue
571 6a373640 Michael Hanselmann
    return self._queue.SubmitManyJobs(jobs)
572 6a373640 Michael Hanselmann
573 031a3e57 Michael Hanselmann
574 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
575 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
576 989a8bee Michael Hanselmann
    """Initializes this class.
577 6c2549d6 Guido Trotter

578 989a8bee Michael Hanselmann
    @type fields: list of strings
579 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
580 989a8bee Michael Hanselmann
    @type prev_job_info: string
581 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
582 989a8bee Michael Hanselmann
    @type prev_log_serial: string
583 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
584 6c2549d6 Guido Trotter

585 989a8bee Michael Hanselmann
    """
586 989a8bee Michael Hanselmann
    self._fields = fields
587 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
588 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
589 6c2549d6 Guido Trotter
590 989a8bee Michael Hanselmann
  def __call__(self, job):
591 989a8bee Michael Hanselmann
    """Checks whether job has changed.
592 6c2549d6 Guido Trotter

593 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
594 989a8bee Michael Hanselmann
    @param job: Job object
595 6c2549d6 Guido Trotter

596 6c2549d6 Guido Trotter
    """
597 c0f6d0d8 Michael Hanselmann
    assert not job.writable, "Expected read-only job"
598 c0f6d0d8 Michael Hanselmann
599 989a8bee Michael Hanselmann
    status = job.CalcStatus()
600 989a8bee Michael Hanselmann
    job_info = job.GetInfo(self._fields)
601 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
602 6c2549d6 Guido Trotter
603 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
604 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
605 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
606 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
607 6c2549d6 Guido Trotter
    # significantly different.
608 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
609 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
610 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
611 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
612 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
613 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
614 6c2549d6 Guido Trotter
615 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
616 6c2549d6 Guido Trotter
    # no changes.
617 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
618 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
619 47099cd1 Michael Hanselmann
                       constants.JOB_STATUS_WAITING) or
620 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
621 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
622 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
623 989a8bee Michael Hanselmann
      return (job_info, log_entries)
624 6c2549d6 Guido Trotter
625 989a8bee Michael Hanselmann
    return None
626 989a8bee Michael Hanselmann
627 989a8bee Michael Hanselmann
628 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
629 989a8bee Michael Hanselmann
  def __init__(self, filename):
630 989a8bee Michael Hanselmann
    """Initializes this class.
631 989a8bee Michael Hanselmann

632 989a8bee Michael Hanselmann
    @type filename: string
633 989a8bee Michael Hanselmann
    @param filename: Path to job file
634 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
635 6c2549d6 Guido Trotter

636 989a8bee Michael Hanselmann
    """
637 989a8bee Michael Hanselmann
    self._wm = pyinotify.WatchManager()
638 989a8bee Michael Hanselmann
    self._inotify_handler = \
639 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
640 989a8bee Michael Hanselmann
    self._notifier = \
641 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
642 989a8bee Michael Hanselmann
    try:
643 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
644 989a8bee Michael Hanselmann
    except Exception:
645 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
646 989a8bee Michael Hanselmann
      self._notifier.stop()
647 989a8bee Michael Hanselmann
      raise
648 989a8bee Michael Hanselmann
649 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
650 989a8bee Michael Hanselmann
    """Callback for inotify.
651 989a8bee Michael Hanselmann

652 989a8bee Michael Hanselmann
    """
653 6c2549d6 Guido Trotter
    if not notifier_enabled:
654 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
655 989a8bee Michael Hanselmann
656 989a8bee Michael Hanselmann
  def Wait(self, timeout):
657 989a8bee Michael Hanselmann
    """Waits for the job file to change.
658 989a8bee Michael Hanselmann

659 989a8bee Michael Hanselmann
    @type timeout: float
660 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
661 989a8bee Michael Hanselmann
    @return: Whether there have been events
662 989a8bee Michael Hanselmann

663 989a8bee Michael Hanselmann
    """
664 989a8bee Michael Hanselmann
    assert timeout >= 0
665 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
666 989a8bee Michael Hanselmann
    if have_events:
667 989a8bee Michael Hanselmann
      self._notifier.read_events()
668 989a8bee Michael Hanselmann
    self._notifier.process_events()
669 989a8bee Michael Hanselmann
    return have_events
670 989a8bee Michael Hanselmann
671 989a8bee Michael Hanselmann
  def Close(self):
672 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
673 989a8bee Michael Hanselmann

674 989a8bee Michael Hanselmann
    """
675 989a8bee Michael Hanselmann
    self._notifier.stop()
676 989a8bee Michael Hanselmann
677 989a8bee Michael Hanselmann
678 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
679 989a8bee Michael Hanselmann
  def __init__(self, filename):
680 989a8bee Michael Hanselmann
    """Initializes this class.
681 989a8bee Michael Hanselmann

682 989a8bee Michael Hanselmann
    @type filename: string
683 989a8bee Michael Hanselmann
    @param filename: Path to job file
684 989a8bee Michael Hanselmann

685 989a8bee Michael Hanselmann
    """
686 989a8bee Michael Hanselmann
    self._filewaiter = None
687 989a8bee Michael Hanselmann
    self._filename = filename
688 6c2549d6 Guido Trotter
689 989a8bee Michael Hanselmann
  def Wait(self, timeout):
690 989a8bee Michael Hanselmann
    """Waits for a job to change.
691 6c2549d6 Guido Trotter

692 989a8bee Michael Hanselmann
    @type timeout: float
693 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
694 989a8bee Michael Hanselmann
    @return: Whether there have been events
695 989a8bee Michael Hanselmann

696 989a8bee Michael Hanselmann
    """
697 989a8bee Michael Hanselmann
    if self._filewaiter:
698 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
699 989a8bee Michael Hanselmann
700 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
701 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
702 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
703 989a8bee Michael Hanselmann
    # race condition.
704 989a8bee Michael Hanselmann
    self._filewaiter = _JobFileChangesWaiter(self._filename)
705 989a8bee Michael Hanselmann
706 989a8bee Michael Hanselmann
    return True
707 989a8bee Michael Hanselmann
708 989a8bee Michael Hanselmann
  def Close(self):
709 989a8bee Michael Hanselmann
    """Closes underlying waiter.
710 989a8bee Michael Hanselmann

711 989a8bee Michael Hanselmann
    """
712 989a8bee Michael Hanselmann
    if self._filewaiter:
713 989a8bee Michael Hanselmann
      self._filewaiter.Close()
714 989a8bee Michael Hanselmann
715 989a8bee Michael Hanselmann
716 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
717 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
718 989a8bee Michael Hanselmann

719 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
720 989a8bee Michael Hanselmann
  the current job status has changed.
721 989a8bee Michael Hanselmann

722 989a8bee Michael Hanselmann
  """
723 989a8bee Michael Hanselmann
  @staticmethod
724 dfc8824a Michael Hanselmann
  def _CheckForChanges(counter, job_load_fn, check_fn):
725 dfc8824a Michael Hanselmann
    if counter.next() > 0:
726 dfc8824a Michael Hanselmann
      # If this isn't the first check the job is given some more time to change
727 dfc8824a Michael Hanselmann
      # again. This gives better performance for jobs generating many
728 dfc8824a Michael Hanselmann
      # changes/messages.
729 dfc8824a Michael Hanselmann
      time.sleep(0.1)
730 dfc8824a Michael Hanselmann
731 989a8bee Michael Hanselmann
    job = job_load_fn()
732 989a8bee Michael Hanselmann
    if not job:
733 989a8bee Michael Hanselmann
      raise errors.JobLost()
734 989a8bee Michael Hanselmann
735 989a8bee Michael Hanselmann
    result = check_fn(job)
736 989a8bee Michael Hanselmann
    if result is None:
737 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
738 989a8bee Michael Hanselmann
739 989a8bee Michael Hanselmann
    return result
740 989a8bee Michael Hanselmann
741 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
742 989a8bee Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout):
743 989a8bee Michael Hanselmann
    """Waits for changes on a job.
744 989a8bee Michael Hanselmann

745 989a8bee Michael Hanselmann
    @type filename: string
746 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
747 989a8bee Michael Hanselmann
    @type job_load_fn: callable
748 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
749 989a8bee Michael Hanselmann
    @type fields: list of strings
750 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
751 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
752 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
753 989a8bee Michael Hanselmann
    @type prev_log_serial: int
754 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
755 989a8bee Michael Hanselmann
    @type timeout: float
756 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
757 989a8bee Michael Hanselmann

758 989a8bee Michael Hanselmann
    """
759 dfc8824a Michael Hanselmann
    counter = itertools.count()
760 6c2549d6 Guido Trotter
    try:
761 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
762 989a8bee Michael Hanselmann
      waiter = _JobChangesWaiter(filename)
763 989a8bee Michael Hanselmann
      try:
764 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
765 dfc8824a Michael Hanselmann
                                          counter, job_load_fn, check_fn),
766 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
767 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
768 989a8bee Michael Hanselmann
      finally:
769 989a8bee Michael Hanselmann
        waiter.Close()
770 6c2549d6 Guido Trotter
    except (errors.InotifyError, errors.JobLost):
771 6c2549d6 Guido Trotter
      return None
772 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
773 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
774 6c2549d6 Guido Trotter
775 6c2549d6 Guido Trotter
776 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
777 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
778 6760e4ed Michael Hanselmann

779 6760e4ed Michael Hanselmann
  """
780 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
781 6760e4ed Michael Hanselmann
    to_encode = err
782 6760e4ed Michael Hanselmann
  else:
783 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
784 6760e4ed Michael Hanselmann
785 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
786 6760e4ed Michael Hanselmann
787 6760e4ed Michael Hanselmann
788 26d3fd2f Michael Hanselmann
class _TimeoutStrategyWrapper:
789 26d3fd2f Michael Hanselmann
  def __init__(self, fn):
790 26d3fd2f Michael Hanselmann
    """Initializes this class.
791 26d3fd2f Michael Hanselmann

792 26d3fd2f Michael Hanselmann
    """
793 26d3fd2f Michael Hanselmann
    self._fn = fn
794 26d3fd2f Michael Hanselmann
    self._next = None
795 26d3fd2f Michael Hanselmann
796 26d3fd2f Michael Hanselmann
  def _Advance(self):
797 26d3fd2f Michael Hanselmann
    """Gets the next timeout if necessary.
798 26d3fd2f Michael Hanselmann

799 26d3fd2f Michael Hanselmann
    """
800 26d3fd2f Michael Hanselmann
    if self._next is None:
801 26d3fd2f Michael Hanselmann
      self._next = self._fn()
802 26d3fd2f Michael Hanselmann
803 26d3fd2f Michael Hanselmann
  def Peek(self):
804 26d3fd2f Michael Hanselmann
    """Returns the next timeout.
805 26d3fd2f Michael Hanselmann

806 26d3fd2f Michael Hanselmann
    """
807 26d3fd2f Michael Hanselmann
    self._Advance()
808 26d3fd2f Michael Hanselmann
    return self._next
809 26d3fd2f Michael Hanselmann
810 26d3fd2f Michael Hanselmann
  def Next(self):
811 26d3fd2f Michael Hanselmann
    """Returns the current timeout and advances the internal state.
812 26d3fd2f Michael Hanselmann

813 26d3fd2f Michael Hanselmann
    """
814 26d3fd2f Michael Hanselmann
    self._Advance()
815 26d3fd2f Michael Hanselmann
    result = self._next
816 26d3fd2f Michael Hanselmann
    self._next = None
817 26d3fd2f Michael Hanselmann
    return result
818 26d3fd2f Michael Hanselmann
819 26d3fd2f Michael Hanselmann
820 b80cc518 Michael Hanselmann
class _OpExecContext:
821 26d3fd2f Michael Hanselmann
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
822 b80cc518 Michael Hanselmann
    """Initializes this class.
823 b80cc518 Michael Hanselmann

824 b80cc518 Michael Hanselmann
    """
825 b80cc518 Michael Hanselmann
    self.op = op
826 b80cc518 Michael Hanselmann
    self.index = index
827 b80cc518 Michael Hanselmann
    self.log_prefix = log_prefix
828 b80cc518 Michael Hanselmann
    self.summary = op.input.Summary()
829 b80cc518 Michael Hanselmann
830 b95479a5 Michael Hanselmann
    # Create local copy to modify
831 b95479a5 Michael Hanselmann
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
832 b95479a5 Michael Hanselmann
      self.jobdeps = op.input.depends[:]
833 b95479a5 Michael Hanselmann
    else:
834 b95479a5 Michael Hanselmann
      self.jobdeps = None
835 b95479a5 Michael Hanselmann
836 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = timeout_strategy_factory
837 26d3fd2f Michael Hanselmann
    self._ResetTimeoutStrategy()
838 26d3fd2f Michael Hanselmann
839 26d3fd2f Michael Hanselmann
  def _ResetTimeoutStrategy(self):
840 26d3fd2f Michael Hanselmann
    """Creates a new timeout strategy.
841 26d3fd2f Michael Hanselmann

842 26d3fd2f Michael Hanselmann
    """
843 26d3fd2f Michael Hanselmann
    self._timeout_strategy = \
844 26d3fd2f Michael Hanselmann
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
845 26d3fd2f Michael Hanselmann
846 26d3fd2f Michael Hanselmann
  def CheckPriorityIncrease(self):
847 26d3fd2f Michael Hanselmann
    """Checks whether priority can and should be increased.
848 26d3fd2f Michael Hanselmann

849 26d3fd2f Michael Hanselmann
    Called when locks couldn't be acquired.
850 26d3fd2f Michael Hanselmann

851 26d3fd2f Michael Hanselmann
    """
852 26d3fd2f Michael Hanselmann
    op = self.op
853 26d3fd2f Michael Hanselmann
854 26d3fd2f Michael Hanselmann
    # Exhausted all retries and next round should not use blocking acquire
855 26d3fd2f Michael Hanselmann
    # for locks?
856 26d3fd2f Michael Hanselmann
    if (self._timeout_strategy.Peek() is None and
857 26d3fd2f Michael Hanselmann
        op.priority > constants.OP_PRIO_HIGHEST):
858 26d3fd2f Michael Hanselmann
      logging.debug("Increasing priority")
859 26d3fd2f Michael Hanselmann
      op.priority -= 1
860 26d3fd2f Michael Hanselmann
      self._ResetTimeoutStrategy()
861 26d3fd2f Michael Hanselmann
      return True
862 26d3fd2f Michael Hanselmann
863 26d3fd2f Michael Hanselmann
    return False
864 26d3fd2f Michael Hanselmann
865 26d3fd2f Michael Hanselmann
  def GetNextLockTimeout(self):
866 26d3fd2f Michael Hanselmann
    """Returns the next lock acquire timeout.
867 26d3fd2f Michael Hanselmann

868 26d3fd2f Michael Hanselmann
    """
869 26d3fd2f Michael Hanselmann
    return self._timeout_strategy.Next()
870 26d3fd2f Michael Hanselmann
871 b80cc518 Michael Hanselmann
872 be760ba8 Michael Hanselmann
class _JobProcessor(object):
873 75d81fc8 Michael Hanselmann
  (DEFER,
874 75d81fc8 Michael Hanselmann
   WAITDEP,
875 75d81fc8 Michael Hanselmann
   FINISHED) = range(1, 4)
876 75d81fc8 Michael Hanselmann
877 26d3fd2f Michael Hanselmann
  def __init__(self, queue, opexec_fn, job,
878 26d3fd2f Michael Hanselmann
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
879 be760ba8 Michael Hanselmann
    """Initializes this class.
880 be760ba8 Michael Hanselmann

881 be760ba8 Michael Hanselmann
    """
882 be760ba8 Michael Hanselmann
    self.queue = queue
883 be760ba8 Michael Hanselmann
    self.opexec_fn = opexec_fn
884 be760ba8 Michael Hanselmann
    self.job = job
885 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = _timeout_strategy_factory
886 be760ba8 Michael Hanselmann
887 be760ba8 Michael Hanselmann
  @staticmethod
888 26d3fd2f Michael Hanselmann
  def _FindNextOpcode(job, timeout_strategy_factory):
889 be760ba8 Michael Hanselmann
    """Locates the next opcode to run.
890 be760ba8 Michael Hanselmann

891 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
892 be760ba8 Michael Hanselmann
    @param job: Job object
893 26d3fd2f Michael Hanselmann
    @param timeout_strategy_factory: Callable to create new timeout strategy
894 be760ba8 Michael Hanselmann

895 be760ba8 Michael Hanselmann
    """
896 be760ba8 Michael Hanselmann
    # Create some sort of a cache to speed up locating next opcode for future
897 be760ba8 Michael Hanselmann
    # lookups
898 be760ba8 Michael Hanselmann
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
899 be760ba8 Michael Hanselmann
    # pending and one for processed ops.
900 03b63608 Michael Hanselmann
    if job.ops_iter is None:
901 03b63608 Michael Hanselmann
      job.ops_iter = enumerate(job.ops)
902 be760ba8 Michael Hanselmann
903 be760ba8 Michael Hanselmann
    # Find next opcode to run
904 be760ba8 Michael Hanselmann
    while True:
905 be760ba8 Michael Hanselmann
      try:
906 03b63608 Michael Hanselmann
        (idx, op) = job.ops_iter.next()
907 be760ba8 Michael Hanselmann
      except StopIteration:
908 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for a finished job")
909 be760ba8 Michael Hanselmann
910 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_RUNNING:
911 be760ba8 Michael Hanselmann
        # Found an opcode already marked as running
912 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for job marked as running")
913 be760ba8 Michael Hanselmann
914 26d3fd2f Michael Hanselmann
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
915 26d3fd2f Michael Hanselmann
                             timeout_strategy_factory)
916 be760ba8 Michael Hanselmann
917 66bd7445 Michael Hanselmann
      if op.status not in constants.OPS_FINALIZED:
918 66bd7445 Michael Hanselmann
        return opctx
919 be760ba8 Michael Hanselmann
920 66bd7445 Michael Hanselmann
      # This is a job that was partially completed before master daemon
921 66bd7445 Michael Hanselmann
      # shutdown, so it can be expected that some opcodes are already
922 66bd7445 Michael Hanselmann
      # completed successfully (if any did error out, then the whole job
923 66bd7445 Michael Hanselmann
      # should have been aborted and not resubmitted for processing).
924 66bd7445 Michael Hanselmann
      logging.info("%s: opcode %s already processed, skipping",
925 66bd7445 Michael Hanselmann
                   opctx.log_prefix, opctx.summary)
926 be760ba8 Michael Hanselmann
927 be760ba8 Michael Hanselmann
  @staticmethod
928 be760ba8 Michael Hanselmann
  def _MarkWaitlock(job, op):
929 be760ba8 Michael Hanselmann
    """Marks an opcode as waiting for locks.
930 be760ba8 Michael Hanselmann

931 be760ba8 Michael Hanselmann
    The job's start timestamp is also set if necessary.
932 be760ba8 Michael Hanselmann

933 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
934 be760ba8 Michael Hanselmann
    @param job: Job object
935 a38e8674 Michael Hanselmann
    @type op: L{_QueuedOpCode}
936 a38e8674 Michael Hanselmann
    @param op: Opcode object
937 be760ba8 Michael Hanselmann

938 be760ba8 Michael Hanselmann
    """
939 be760ba8 Michael Hanselmann
    assert op in job.ops
940 5fd6b694 Michael Hanselmann
    assert op.status in (constants.OP_STATUS_QUEUED,
941 47099cd1 Michael Hanselmann
                         constants.OP_STATUS_WAITING)
942 5fd6b694 Michael Hanselmann
943 5fd6b694 Michael Hanselmann
    update = False
944 be760ba8 Michael Hanselmann
945 be760ba8 Michael Hanselmann
    op.result = None
946 5fd6b694 Michael Hanselmann
947 5fd6b694 Michael Hanselmann
    if op.status == constants.OP_STATUS_QUEUED:
948 47099cd1 Michael Hanselmann
      op.status = constants.OP_STATUS_WAITING
949 5fd6b694 Michael Hanselmann
      update = True
950 5fd6b694 Michael Hanselmann
951 5fd6b694 Michael Hanselmann
    if op.start_timestamp is None:
952 5fd6b694 Michael Hanselmann
      op.start_timestamp = TimeStampNow()
953 5fd6b694 Michael Hanselmann
      update = True
954 be760ba8 Michael Hanselmann
955 be760ba8 Michael Hanselmann
    if job.start_timestamp is None:
956 be760ba8 Michael Hanselmann
      job.start_timestamp = op.start_timestamp
957 5fd6b694 Michael Hanselmann
      update = True
958 5fd6b694 Michael Hanselmann
959 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
960 5fd6b694 Michael Hanselmann
961 5fd6b694 Michael Hanselmann
    return update
962 be760ba8 Michael Hanselmann
963 b95479a5 Michael Hanselmann
  @staticmethod
964 b95479a5 Michael Hanselmann
  def _CheckDependencies(queue, job, opctx):
965 b95479a5 Michael Hanselmann
    """Checks if an opcode has dependencies and if so, processes them.
966 b95479a5 Michael Hanselmann

967 b95479a5 Michael Hanselmann
    @type queue: L{JobQueue}
968 b95479a5 Michael Hanselmann
    @param queue: Queue object
969 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
970 b95479a5 Michael Hanselmann
    @param job: Job object
971 b95479a5 Michael Hanselmann
    @type opctx: L{_OpExecContext}
972 b95479a5 Michael Hanselmann
    @param opctx: Opcode execution context
973 b95479a5 Michael Hanselmann
    @rtype: bool
974 b95479a5 Michael Hanselmann
    @return: Whether opcode will be re-scheduled by dependency tracker
975 b95479a5 Michael Hanselmann

976 b95479a5 Michael Hanselmann
    """
977 b95479a5 Michael Hanselmann
    op = opctx.op
978 b95479a5 Michael Hanselmann
979 b95479a5 Michael Hanselmann
    result = False
980 b95479a5 Michael Hanselmann
981 b95479a5 Michael Hanselmann
    while opctx.jobdeps:
982 b95479a5 Michael Hanselmann
      (dep_job_id, dep_status) = opctx.jobdeps[0]
983 b95479a5 Michael Hanselmann
984 b95479a5 Michael Hanselmann
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
985 b95479a5 Michael Hanselmann
                                                          dep_status)
986 b95479a5 Michael Hanselmann
      assert ht.TNonEmptyString(depmsg), "No dependency message"
987 b95479a5 Michael Hanselmann
988 b95479a5 Michael Hanselmann
      logging.info("%s: %s", opctx.log_prefix, depmsg)
989 b95479a5 Michael Hanselmann
990 b95479a5 Michael Hanselmann
      if depresult == _JobDependencyManager.CONTINUE:
991 b95479a5 Michael Hanselmann
        # Remove dependency and continue
992 b95479a5 Michael Hanselmann
        opctx.jobdeps.pop(0)
993 b95479a5 Michael Hanselmann
994 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.WAIT:
995 b95479a5 Michael Hanselmann
        # Need to wait for notification, dependency tracker will re-add job
996 b95479a5 Michael Hanselmann
        # to workerpool
997 b95479a5 Michael Hanselmann
        result = True
998 b95479a5 Michael Hanselmann
        break
999 b95479a5 Michael Hanselmann
1000 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.CANCEL:
1001 b95479a5 Michael Hanselmann
        # Job was cancelled, cancel this job as well
1002 b95479a5 Michael Hanselmann
        job.Cancel()
1003 b95479a5 Michael Hanselmann
        assert op.status == constants.OP_STATUS_CANCELING
1004 b95479a5 Michael Hanselmann
        break
1005 b95479a5 Michael Hanselmann
1006 b95479a5 Michael Hanselmann
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1007 b95479a5 Michael Hanselmann
                         _JobDependencyManager.ERROR):
1008 b95479a5 Michael Hanselmann
        # Job failed or there was an error, this job must fail
1009 b95479a5 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1010 b95479a5 Michael Hanselmann
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1011 b95479a5 Michael Hanselmann
        break
1012 b95479a5 Michael Hanselmann
1013 b95479a5 Michael Hanselmann
      else:
1014 b95479a5 Michael Hanselmann
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1015 b95479a5 Michael Hanselmann
                                     depresult)
1016 b95479a5 Michael Hanselmann
1017 b95479a5 Michael Hanselmann
    return result
1018 b95479a5 Michael Hanselmann
1019 b80cc518 Michael Hanselmann
  def _ExecOpCodeUnlocked(self, opctx):
1020 be760ba8 Michael Hanselmann
    """Processes one opcode and returns the result.
1021 be760ba8 Michael Hanselmann

1022 be760ba8 Michael Hanselmann
    """
1023 b80cc518 Michael Hanselmann
    op = opctx.op
1024 b80cc518 Michael Hanselmann
1025 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1026 be760ba8 Michael Hanselmann
1027 26d3fd2f Michael Hanselmann
    timeout = opctx.GetNextLockTimeout()
1028 26d3fd2f Michael Hanselmann
1029 be760ba8 Michael Hanselmann
    try:
1030 be760ba8 Michael Hanselmann
      # Make sure not to hold queue lock while calling ExecOpCode
1031 be760ba8 Michael Hanselmann
      result = self.opexec_fn(op.input,
1032 26d3fd2f Michael Hanselmann
                              _OpExecCallbacks(self.queue, self.job, op),
1033 f23db633 Michael Hanselmann
                              timeout=timeout, priority=op.priority)
1034 26d3fd2f Michael Hanselmann
    except mcpu.LockAcquireTimeout:
1035 26d3fd2f Michael Hanselmann
      assert timeout is not None, "Received timeout for blocking acquire"
1036 26d3fd2f Michael Hanselmann
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1037 9e49dfc5 Michael Hanselmann
1038 47099cd1 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_WAITING,
1039 9e49dfc5 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1040 9e49dfc5 Michael Hanselmann
1041 9e49dfc5 Michael Hanselmann
      # Was job cancelled while we were waiting for the lock?
1042 9e49dfc5 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELING:
1043 9e49dfc5 Michael Hanselmann
        return (constants.OP_STATUS_CANCELING, None)
1044 9e49dfc5 Michael Hanselmann
1045 5fd6b694 Michael Hanselmann
      # Stay in waitlock while trying to re-acquire lock
1046 47099cd1 Michael Hanselmann
      return (constants.OP_STATUS_WAITING, None)
1047 be760ba8 Michael Hanselmann
    except CancelJob:
1048 b80cc518 Michael Hanselmann
      logging.exception("%s: Canceling job", opctx.log_prefix)
1049 be760ba8 Michael Hanselmann
      assert op.status == constants.OP_STATUS_CANCELING
1050 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_CANCELING, None)
1051 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1052 b80cc518 Michael Hanselmann
      logging.exception("%s: Caught exception in %s",
1053 b80cc518 Michael Hanselmann
                        opctx.log_prefix, opctx.summary)
1054 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1055 be760ba8 Michael Hanselmann
    else:
1056 b80cc518 Michael Hanselmann
      logging.debug("%s: %s successful",
1057 b80cc518 Michael Hanselmann
                    opctx.log_prefix, opctx.summary)
1058 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_SUCCESS, result)
1059 be760ba8 Michael Hanselmann
1060 26d3fd2f Michael Hanselmann
  def __call__(self, _nextop_fn=None):
1061 be760ba8 Michael Hanselmann
    """Continues execution of a job.
1062 be760ba8 Michael Hanselmann

1063 26d3fd2f Michael Hanselmann
    @param _nextop_fn: Callback function for tests
1064 75d81fc8 Michael Hanselmann
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1065 75d81fc8 Michael Hanselmann
      be deferred and C{WAITDEP} if the dependency manager
1066 75d81fc8 Michael Hanselmann
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1067 be760ba8 Michael Hanselmann

1068 be760ba8 Michael Hanselmann
    """
1069 be760ba8 Michael Hanselmann
    queue = self.queue
1070 be760ba8 Michael Hanselmann
    job = self.job
1071 be760ba8 Michael Hanselmann
1072 be760ba8 Michael Hanselmann
    logging.debug("Processing job %s", job.id)
1073 be760ba8 Michael Hanselmann
1074 be760ba8 Michael Hanselmann
    queue.acquire(shared=1)
1075 be760ba8 Michael Hanselmann
    try:
1076 be760ba8 Michael Hanselmann
      opcount = len(job.ops)
1077 be760ba8 Michael Hanselmann
1078 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Expected writable job"
1079 c0f6d0d8 Michael Hanselmann
1080 66bd7445 Michael Hanselmann
      # Don't do anything for finalized jobs
1081 66bd7445 Michael Hanselmann
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1082 75d81fc8 Michael Hanselmann
        return self.FINISHED
1083 66bd7445 Michael Hanselmann
1084 26d3fd2f Michael Hanselmann
      # Is a previous opcode still pending?
1085 26d3fd2f Michael Hanselmann
      if job.cur_opctx:
1086 26d3fd2f Michael Hanselmann
        opctx = job.cur_opctx
1087 5fd6b694 Michael Hanselmann
        job.cur_opctx = None
1088 26d3fd2f Michael Hanselmann
      else:
1089 26d3fd2f Michael Hanselmann
        if __debug__ and _nextop_fn:
1090 26d3fd2f Michael Hanselmann
          _nextop_fn()
1091 26d3fd2f Michael Hanselmann
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1092 26d3fd2f Michael Hanselmann
1093 b80cc518 Michael Hanselmann
      op = opctx.op
1094 be760ba8 Michael Hanselmann
1095 be760ba8 Michael Hanselmann
      # Consistency check
1096 be760ba8 Michael Hanselmann
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1097 66bd7445 Michael Hanselmann
                                     constants.OP_STATUS_CANCELING)
1098 5fd6b694 Michael Hanselmann
                        for i in job.ops[opctx.index + 1:])
1099 be760ba8 Michael Hanselmann
1100 be760ba8 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_QUEUED,
1101 47099cd1 Michael Hanselmann
                           constants.OP_STATUS_WAITING,
1102 66bd7445 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1103 be760ba8 Michael Hanselmann
1104 26d3fd2f Michael Hanselmann
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1105 26d3fd2f Michael Hanselmann
              op.priority >= constants.OP_PRIO_HIGHEST)
1106 26d3fd2f Michael Hanselmann
1107 b95479a5 Michael Hanselmann
      waitjob = None
1108 b95479a5 Michael Hanselmann
1109 66bd7445 Michael Hanselmann
      if op.status != constants.OP_STATUS_CANCELING:
1110 30c945d0 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
1111 47099cd1 Michael Hanselmann
                             constants.OP_STATUS_WAITING)
1112 30c945d0 Michael Hanselmann
1113 be760ba8 Michael Hanselmann
        # Prepare to start opcode
1114 5fd6b694 Michael Hanselmann
        if self._MarkWaitlock(job, op):
1115 5fd6b694 Michael Hanselmann
          # Write to disk
1116 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1117 be760ba8 Michael Hanselmann
1118 47099cd1 Michael Hanselmann
        assert op.status == constants.OP_STATUS_WAITING
1119 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1120 5fd6b694 Michael Hanselmann
        assert job.start_timestamp and op.start_timestamp
1121 b95479a5 Michael Hanselmann
        assert waitjob is None
1122 b95479a5 Michael Hanselmann
1123 b95479a5 Michael Hanselmann
        # Check if waiting for a job is necessary
1124 b95479a5 Michael Hanselmann
        waitjob = self._CheckDependencies(queue, job, opctx)
1125 be760ba8 Michael Hanselmann
1126 47099cd1 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_WAITING,
1127 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_CANCELING,
1128 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_ERROR)
1129 be760ba8 Michael Hanselmann
1130 b95479a5 Michael Hanselmann
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1131 b95479a5 Michael Hanselmann
                                         constants.OP_STATUS_ERROR)):
1132 b95479a5 Michael Hanselmann
          logging.info("%s: opcode %s waiting for locks",
1133 b95479a5 Michael Hanselmann
                       opctx.log_prefix, opctx.summary)
1134 be760ba8 Michael Hanselmann
1135 b95479a5 Michael Hanselmann
          assert not opctx.jobdeps, "Not all dependencies were removed"
1136 b95479a5 Michael Hanselmann
1137 b95479a5 Michael Hanselmann
          queue.release()
1138 b95479a5 Michael Hanselmann
          try:
1139 b95479a5 Michael Hanselmann
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1140 b95479a5 Michael Hanselmann
          finally:
1141 b95479a5 Michael Hanselmann
            queue.acquire(shared=1)
1142 b95479a5 Michael Hanselmann
1143 b95479a5 Michael Hanselmann
          op.status = op_status
1144 b95479a5 Michael Hanselmann
          op.result = op_result
1145 b95479a5 Michael Hanselmann
1146 b95479a5 Michael Hanselmann
          assert not waitjob
1147 be760ba8 Michael Hanselmann
1148 47099cd1 Michael Hanselmann
        if op.status == constants.OP_STATUS_WAITING:
1149 26d3fd2f Michael Hanselmann
          # Couldn't get locks in time
1150 26d3fd2f Michael Hanselmann
          assert not op.end_timestamp
1151 be760ba8 Michael Hanselmann
        else:
1152 26d3fd2f Michael Hanselmann
          # Finalize opcode
1153 26d3fd2f Michael Hanselmann
          op.end_timestamp = TimeStampNow()
1154 be760ba8 Michael Hanselmann
1155 26d3fd2f Michael Hanselmann
          if op.status == constants.OP_STATUS_CANCELING:
1156 26d3fd2f Michael Hanselmann
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1157 26d3fd2f Michael Hanselmann
                                  for i in job.ops[opctx.index:])
1158 26d3fd2f Michael Hanselmann
          else:
1159 26d3fd2f Michael Hanselmann
            assert op.status in constants.OPS_FINALIZED
1160 be760ba8 Michael Hanselmann
1161 47099cd1 Michael Hanselmann
      if op.status == constants.OP_STATUS_WAITING or waitjob:
1162 be760ba8 Michael Hanselmann
        finalize = False
1163 be760ba8 Michael Hanselmann
1164 b95479a5 Michael Hanselmann
        if not waitjob and opctx.CheckPriorityIncrease():
1165 5fd6b694 Michael Hanselmann
          # Priority was changed, need to update on-disk file
1166 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1167 be760ba8 Michael Hanselmann
1168 26d3fd2f Michael Hanselmann
        # Keep around for another round
1169 26d3fd2f Michael Hanselmann
        job.cur_opctx = opctx
1170 be760ba8 Michael Hanselmann
1171 26d3fd2f Michael Hanselmann
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1172 26d3fd2f Michael Hanselmann
                op.priority >= constants.OP_PRIO_HIGHEST)
1173 be760ba8 Michael Hanselmann
1174 26d3fd2f Michael Hanselmann
        # In no case must the status be finalized here
1175 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1176 be760ba8 Michael Hanselmann
1177 be760ba8 Michael Hanselmann
      else:
1178 26d3fd2f Michael Hanselmann
        # Ensure all opcodes so far have been successful
1179 26d3fd2f Michael Hanselmann
        assert (opctx.index == 0 or
1180 26d3fd2f Michael Hanselmann
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1181 26d3fd2f Michael Hanselmann
                           for i in job.ops[:opctx.index]))
1182 26d3fd2f Michael Hanselmann
1183 26d3fd2f Michael Hanselmann
        # Reset context
1184 26d3fd2f Michael Hanselmann
        job.cur_opctx = None
1185 26d3fd2f Michael Hanselmann
1186 26d3fd2f Michael Hanselmann
        if op.status == constants.OP_STATUS_SUCCESS:
1187 26d3fd2f Michael Hanselmann
          finalize = False
1188 26d3fd2f Michael Hanselmann
1189 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_ERROR:
1190 26d3fd2f Michael Hanselmann
          # Ensure failed opcode has an exception as its result
1191 26d3fd2f Michael Hanselmann
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1192 26d3fd2f Michael Hanselmann
1193 26d3fd2f Michael Hanselmann
          to_encode = errors.OpExecError("Preceding opcode failed")
1194 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1195 26d3fd2f Michael Hanselmann
                                _EncodeOpError(to_encode))
1196 26d3fd2f Michael Hanselmann
          finalize = True
1197 be760ba8 Michael Hanselmann
1198 26d3fd2f Michael Hanselmann
          # Consistency check
1199 26d3fd2f Michael Hanselmann
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1200 26d3fd2f Michael Hanselmann
                            errors.GetEncodedError(i.result)
1201 26d3fd2f Michael Hanselmann
                            for i in job.ops[opctx.index:])
1202 be760ba8 Michael Hanselmann
1203 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELING:
1204 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1205 26d3fd2f Michael Hanselmann
                                "Job canceled by request")
1206 26d3fd2f Michael Hanselmann
          finalize = True
1207 26d3fd2f Michael Hanselmann
1208 26d3fd2f Michael Hanselmann
        else:
1209 26d3fd2f Michael Hanselmann
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1210 26d3fd2f Michael Hanselmann
1211 66bd7445 Michael Hanselmann
        if opctx.index == (opcount - 1):
1212 66bd7445 Michael Hanselmann
          # Finalize on last opcode
1213 66bd7445 Michael Hanselmann
          finalize = True
1214 66bd7445 Michael Hanselmann
1215 66bd7445 Michael Hanselmann
        if finalize:
1216 26d3fd2f Michael Hanselmann
          # All opcodes have been run, finalize job
1217 66bd7445 Michael Hanselmann
          job.Finalize()
1218 26d3fd2f Michael Hanselmann
1219 26d3fd2f Michael Hanselmann
        # Write to disk. If the job status is final, this is the final write
1220 26d3fd2f Michael Hanselmann
        # allowed. Once the file has been written, it can be archived anytime.
1221 26d3fd2f Michael Hanselmann
        queue.UpdateJobUnlocked(job)
1222 be760ba8 Michael Hanselmann
1223 b95479a5 Michael Hanselmann
        assert not waitjob
1224 b95479a5 Michael Hanselmann
1225 66bd7445 Michael Hanselmann
        if finalize:
1226 26d3fd2f Michael Hanselmann
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1227 75d81fc8 Michael Hanselmann
          return self.FINISHED
1228 be760ba8 Michael Hanselmann
1229 b95479a5 Michael Hanselmann
      assert not waitjob or queue.depmgr.JobWaiting(job)
1230 b95479a5 Michael Hanselmann
1231 75d81fc8 Michael Hanselmann
      if waitjob:
1232 75d81fc8 Michael Hanselmann
        return self.WAITDEP
1233 75d81fc8 Michael Hanselmann
      else:
1234 75d81fc8 Michael Hanselmann
        return self.DEFER
1235 be760ba8 Michael Hanselmann
    finally:
1236 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Job became read-only while being processed"
1237 be760ba8 Michael Hanselmann
      queue.release()
1238 be760ba8 Michael Hanselmann
1239 be760ba8 Michael Hanselmann
1240 df5a5730 Michael Hanselmann
def _EvaluateJobProcessorResult(depmgr, job, result):
1241 df5a5730 Michael Hanselmann
  """Looks at a result from L{_JobProcessor} for a job.
1242 df5a5730 Michael Hanselmann

1243 df5a5730 Michael Hanselmann
  To be used in a L{_JobQueueWorker}.
1244 df5a5730 Michael Hanselmann

1245 df5a5730 Michael Hanselmann
  """
1246 df5a5730 Michael Hanselmann
  if result == _JobProcessor.FINISHED:
1247 df5a5730 Michael Hanselmann
    # Notify waiting jobs
1248 df5a5730 Michael Hanselmann
    depmgr.NotifyWaiters(job.id)
1249 df5a5730 Michael Hanselmann
1250 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.DEFER:
1251 df5a5730 Michael Hanselmann
    # Schedule again
1252 df5a5730 Michael Hanselmann
    raise workerpool.DeferTask(priority=job.CalcPriority())
1253 df5a5730 Michael Hanselmann
1254 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.WAITDEP:
1255 df5a5730 Michael Hanselmann
    # No-op, dependency manager will re-schedule
1256 df5a5730 Michael Hanselmann
    pass
1257 df5a5730 Michael Hanselmann
1258 df5a5730 Michael Hanselmann
  else:
1259 df5a5730 Michael Hanselmann
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1260 df5a5730 Michael Hanselmann
                                 (result, ))
1261 df5a5730 Michael Hanselmann
1262 df5a5730 Michael Hanselmann
1263 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
1264 031a3e57 Michael Hanselmann
  """The actual job workers.
1265 031a3e57 Michael Hanselmann

1266 031a3e57 Michael Hanselmann
  """
1267 b459a848 Andrea Spadaccini
  def RunTask(self, job): # pylint: disable=W0221
1268 e2715f69 Michael Hanselmann
    """Job executor.
1269 e2715f69 Michael Hanselmann

1270 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1271 ea03467c Iustin Pop
    @param job: the job to be processed
1272 ea03467c Iustin Pop

1273 e2715f69 Michael Hanselmann
    """
1274 f8a4adfa Michael Hanselmann
    assert job.writable, "Expected writable job"
1275 f8a4adfa Michael Hanselmann
1276 b95479a5 Michael Hanselmann
    # Ensure only one worker is active on a single job. If a job registers for
1277 b95479a5 Michael Hanselmann
    # a dependency job, and the other job notifies before the first worker is
1278 b95479a5 Michael Hanselmann
    # done, the job can end up in the tasklist more than once.
1279 b95479a5 Michael Hanselmann
    job.processor_lock.acquire()
1280 b95479a5 Michael Hanselmann
    try:
1281 b95479a5 Michael Hanselmann
      return self._RunTaskInner(job)
1282 b95479a5 Michael Hanselmann
    finally:
1283 b95479a5 Michael Hanselmann
      job.processor_lock.release()
1284 b95479a5 Michael Hanselmann
1285 b95479a5 Michael Hanselmann
  def _RunTaskInner(self, job):
1286 b95479a5 Michael Hanselmann
    """Executes a job.
1287 b95479a5 Michael Hanselmann

1288 b95479a5 Michael Hanselmann
    Must be called with per-job lock acquired.
1289 b95479a5 Michael Hanselmann

1290 b95479a5 Michael Hanselmann
    """
1291 be760ba8 Michael Hanselmann
    queue = job.queue
1292 be760ba8 Michael Hanselmann
    assert queue == self.pool.queue
1293 be760ba8 Michael Hanselmann
1294 0aeeb6e3 Michael Hanselmann
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1295 0aeeb6e3 Michael Hanselmann
    setname_fn(None)
1296 daba67c7 Michael Hanselmann
1297 be760ba8 Michael Hanselmann
    proc = mcpu.Processor(queue.context, job.id)
1298 be760ba8 Michael Hanselmann
1299 0aeeb6e3 Michael Hanselmann
    # Create wrapper for setting thread name
1300 0aeeb6e3 Michael Hanselmann
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1301 0aeeb6e3 Michael Hanselmann
                                    proc.ExecOpCode)
1302 0aeeb6e3 Michael Hanselmann
1303 df5a5730 Michael Hanselmann
    _EvaluateJobProcessorResult(queue.depmgr, job,
1304 df5a5730 Michael Hanselmann
                                _JobProcessor(queue, wrap_execop_fn, job)())
1305 75d81fc8 Michael Hanselmann
1306 0aeeb6e3 Michael Hanselmann
  @staticmethod
1307 0aeeb6e3 Michael Hanselmann
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1308 0aeeb6e3 Michael Hanselmann
    """Updates the worker thread name to include a short summary of the opcode.
1309 0aeeb6e3 Michael Hanselmann

1310 0aeeb6e3 Michael Hanselmann
    @param setname_fn: Callable setting worker thread name
1311 0aeeb6e3 Michael Hanselmann
    @param execop_fn: Callable for executing opcode (usually
1312 0aeeb6e3 Michael Hanselmann
                      L{mcpu.Processor.ExecOpCode})
1313 0aeeb6e3 Michael Hanselmann

1314 0aeeb6e3 Michael Hanselmann
    """
1315 0aeeb6e3 Michael Hanselmann
    setname_fn(op)
1316 0aeeb6e3 Michael Hanselmann
    try:
1317 0aeeb6e3 Michael Hanselmann
      return execop_fn(op, *args, **kwargs)
1318 0aeeb6e3 Michael Hanselmann
    finally:
1319 0aeeb6e3 Michael Hanselmann
      setname_fn(None)
1320 0aeeb6e3 Michael Hanselmann
1321 0aeeb6e3 Michael Hanselmann
  @staticmethod
1322 0aeeb6e3 Michael Hanselmann
  def _GetWorkerName(job, op):
1323 0aeeb6e3 Michael Hanselmann
    """Sets the worker thread name.
1324 0aeeb6e3 Michael Hanselmann

1325 0aeeb6e3 Michael Hanselmann
    @type job: L{_QueuedJob}
1326 0aeeb6e3 Michael Hanselmann
    @type op: L{opcodes.OpCode}
1327 0aeeb6e3 Michael Hanselmann

1328 0aeeb6e3 Michael Hanselmann
    """
1329 0aeeb6e3 Michael Hanselmann
    parts = ["Job%s" % job.id]
1330 0aeeb6e3 Michael Hanselmann
1331 0aeeb6e3 Michael Hanselmann
    if op:
1332 0aeeb6e3 Michael Hanselmann
      parts.append(op.TinySummary())
1333 0aeeb6e3 Michael Hanselmann
1334 0aeeb6e3 Michael Hanselmann
    return "/".join(parts)
1335 0aeeb6e3 Michael Hanselmann
1336 e2715f69 Michael Hanselmann
1337 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
1338 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
1339 ea03467c Iustin Pop

1340 ea03467c Iustin Pop
  """
1341 5bdce580 Michael Hanselmann
  def __init__(self, queue):
1342 0aeeb6e3 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("Jq",
1343 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
1344 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
1345 5bdce580 Michael Hanselmann
    self.queue = queue
1346 e2715f69 Michael Hanselmann
1347 e2715f69 Michael Hanselmann
1348 b95479a5 Michael Hanselmann
class _JobDependencyManager:
1349 b95479a5 Michael Hanselmann
  """Keeps track of job dependencies.
1350 b95479a5 Michael Hanselmann

1351 b95479a5 Michael Hanselmann
  """
1352 b95479a5 Michael Hanselmann
  (WAIT,
1353 b95479a5 Michael Hanselmann
   ERROR,
1354 b95479a5 Michael Hanselmann
   CANCEL,
1355 b95479a5 Michael Hanselmann
   CONTINUE,
1356 b95479a5 Michael Hanselmann
   WRONGSTATUS) = range(1, 6)
1357 b95479a5 Michael Hanselmann
1358 b95479a5 Michael Hanselmann
  def __init__(self, getstatus_fn, enqueue_fn):
1359 b95479a5 Michael Hanselmann
    """Initializes this class.
1360 b95479a5 Michael Hanselmann

1361 b95479a5 Michael Hanselmann
    """
1362 b95479a5 Michael Hanselmann
    self._getstatus_fn = getstatus_fn
1363 b95479a5 Michael Hanselmann
    self._enqueue_fn = enqueue_fn
1364 b95479a5 Michael Hanselmann
1365 b95479a5 Michael Hanselmann
    self._waiters = {}
1366 b95479a5 Michael Hanselmann
    self._lock = locking.SharedLock("JobDepMgr")
1367 b95479a5 Michael Hanselmann
1368 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1369 b459a848 Andrea Spadaccini
  def GetLockInfo(self, requested): # pylint: disable=W0613
1370 fcb21ad7 Michael Hanselmann
    """Retrieves information about waiting jobs.
1371 fcb21ad7 Michael Hanselmann

1372 fcb21ad7 Michael Hanselmann
    @type requested: set
1373 fcb21ad7 Michael Hanselmann
    @param requested: Requested information, see C{query.LQ_*}
1374 fcb21ad7 Michael Hanselmann

1375 fcb21ad7 Michael Hanselmann
    """
1376 fcb21ad7 Michael Hanselmann
    # No need to sort here, that's being done by the lock manager and query
1377 fcb21ad7 Michael Hanselmann
    # library. There are no priorities for notifying jobs, hence all show up as
1378 fcb21ad7 Michael Hanselmann
    # one item under "pending".
1379 fcb21ad7 Michael Hanselmann
    return [("job/%s" % job_id, None, None,
1380 fcb21ad7 Michael Hanselmann
             [("job", [job.id for job in waiters])])
1381 fcb21ad7 Michael Hanselmann
            for job_id, waiters in self._waiters.items()
1382 fcb21ad7 Michael Hanselmann
            if waiters]
1383 fcb21ad7 Michael Hanselmann
1384 fcb21ad7 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1385 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
1386 b95479a5 Michael Hanselmann
    """Checks if a job is waiting.
1387 b95479a5 Michael Hanselmann

1388 b95479a5 Michael Hanselmann
    """
1389 b95479a5 Michael Hanselmann
    return compat.any(job in jobs
1390 b95479a5 Michael Hanselmann
                      for jobs in self._waiters.values())
1391 b95479a5 Michael Hanselmann
1392 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1393 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1394 b95479a5 Michael Hanselmann
    """Checks if a dependency job has the requested status.
1395 b95479a5 Michael Hanselmann

1396 b95479a5 Michael Hanselmann
    If the other job is not yet in a finalized status, the calling job will be
1397 b95479a5 Michael Hanselmann
    notified (re-added to the workerpool) at a later point.
1398 b95479a5 Michael Hanselmann

1399 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1400 b95479a5 Michael Hanselmann
    @param job: Job object
1401 b95479a5 Michael Hanselmann
    @type dep_job_id: string
1402 b95479a5 Michael Hanselmann
    @param dep_job_id: ID of dependency job
1403 b95479a5 Michael Hanselmann
    @type dep_status: list
1404 b95479a5 Michael Hanselmann
    @param dep_status: Required status
1405 b95479a5 Michael Hanselmann

1406 b95479a5 Michael Hanselmann
    """
1407 b95479a5 Michael Hanselmann
    assert ht.TString(job.id)
1408 b95479a5 Michael Hanselmann
    assert ht.TString(dep_job_id)
1409 b95479a5 Michael Hanselmann
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1410 b95479a5 Michael Hanselmann
1411 b95479a5 Michael Hanselmann
    if job.id == dep_job_id:
1412 b95479a5 Michael Hanselmann
      return (self.ERROR, "Job can't depend on itself")
1413 b95479a5 Michael Hanselmann
1414 b95479a5 Michael Hanselmann
    # Get status of dependency job
1415 b95479a5 Michael Hanselmann
    try:
1416 b95479a5 Michael Hanselmann
      status = self._getstatus_fn(dep_job_id)
1417 b95479a5 Michael Hanselmann
    except errors.JobLost, err:
1418 b95479a5 Michael Hanselmann
      return (self.ERROR, "Dependency error: %s" % err)
1419 b95479a5 Michael Hanselmann
1420 b95479a5 Michael Hanselmann
    assert status in constants.JOB_STATUS_ALL
1421 b95479a5 Michael Hanselmann
1422 b95479a5 Michael Hanselmann
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1423 b95479a5 Michael Hanselmann
1424 b95479a5 Michael Hanselmann
    if status not in constants.JOBS_FINALIZED:
1425 b95479a5 Michael Hanselmann
      # Register for notification and wait for job to finish
1426 b95479a5 Michael Hanselmann
      job_id_waiters.add(job)
1427 b95479a5 Michael Hanselmann
      return (self.WAIT,
1428 b95479a5 Michael Hanselmann
              "Need to wait for job %s, wanted status '%s'" %
1429 b95479a5 Michael Hanselmann
              (dep_job_id, dep_status))
1430 b95479a5 Michael Hanselmann
1431 b95479a5 Michael Hanselmann
    # Remove from waiters list
1432 b95479a5 Michael Hanselmann
    if job in job_id_waiters:
1433 b95479a5 Michael Hanselmann
      job_id_waiters.remove(job)
1434 b95479a5 Michael Hanselmann
1435 b95479a5 Michael Hanselmann
    if (status == constants.JOB_STATUS_CANCELED and
1436 b95479a5 Michael Hanselmann
        constants.JOB_STATUS_CANCELED not in dep_status):
1437 b95479a5 Michael Hanselmann
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1438 b95479a5 Michael Hanselmann
1439 b95479a5 Michael Hanselmann
    elif not dep_status or status in dep_status:
1440 b95479a5 Michael Hanselmann
      return (self.CONTINUE,
1441 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s'" %
1442 b95479a5 Michael Hanselmann
              (dep_job_id, status))
1443 b95479a5 Michael Hanselmann
1444 b95479a5 Michael Hanselmann
    else:
1445 b95479a5 Michael Hanselmann
      return (self.WRONGSTATUS,
1446 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s',"
1447 b95479a5 Michael Hanselmann
              " not one of '%s' as required" %
1448 b95479a5 Michael Hanselmann
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1449 b95479a5 Michael Hanselmann
1450 37d76f1e Michael Hanselmann
  def _RemoveEmptyWaitersUnlocked(self):
1451 37d76f1e Michael Hanselmann
    """Remove all jobs without actual waiters.
1452 37d76f1e Michael Hanselmann

1453 37d76f1e Michael Hanselmann
    """
1454 37d76f1e Michael Hanselmann
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1455 37d76f1e Michael Hanselmann
                   if not waiters]:
1456 37d76f1e Michael Hanselmann
      del self._waiters[job_id]
1457 37d76f1e Michael Hanselmann
1458 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
1459 b95479a5 Michael Hanselmann
    """Notifies all jobs waiting for a certain job ID.
1460 b95479a5 Michael Hanselmann

1461 1316ebc2 Michael Hanselmann
    @attention: Do not call until L{CheckAndRegister} returned a status other
1462 1316ebc2 Michael Hanselmann
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1463 b95479a5 Michael Hanselmann
    @type job_id: string
1464 b95479a5 Michael Hanselmann
    @param job_id: Job ID
1465 b95479a5 Michael Hanselmann

1466 b95479a5 Michael Hanselmann
    """
1467 b95479a5 Michael Hanselmann
    assert ht.TString(job_id)
1468 b95479a5 Michael Hanselmann
1469 37d76f1e Michael Hanselmann
    self._lock.acquire()
1470 37d76f1e Michael Hanselmann
    try:
1471 37d76f1e Michael Hanselmann
      self._RemoveEmptyWaitersUnlocked()
1472 37d76f1e Michael Hanselmann
1473 37d76f1e Michael Hanselmann
      jobs = self._waiters.pop(job_id, None)
1474 37d76f1e Michael Hanselmann
    finally:
1475 37d76f1e Michael Hanselmann
      self._lock.release()
1476 37d76f1e Michael Hanselmann
1477 b95479a5 Michael Hanselmann
    if jobs:
1478 b95479a5 Michael Hanselmann
      # Re-add jobs to workerpool
1479 b95479a5 Michael Hanselmann
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1480 b95479a5 Michael Hanselmann
                    len(jobs), job_id)
1481 b95479a5 Michael Hanselmann
      self._enqueue_fn(jobs)
1482 b95479a5 Michael Hanselmann
1483 b95479a5 Michael Hanselmann
1484 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
1485 6c881c52 Iustin Pop
  """Decorator for "public" functions.
1486 ea03467c Iustin Pop

1487 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
1488 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
1489 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
1490 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
1491 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
1492 ea03467c Iustin Pop

1493 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
1494 f1da30e6 Michael Hanselmann

1495 6c881c52 Iustin Pop
  Example::
1496 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
1497 6c881c52 Iustin Pop
    @_RequireOpenQueue
1498 6c881c52 Iustin Pop
    def Example(self):
1499 6c881c52 Iustin Pop
      pass
1500 db37da70 Michael Hanselmann

1501 6c881c52 Iustin Pop
  """
1502 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
1503 b459a848 Andrea Spadaccini
    # pylint: disable=W0212
1504 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
1505 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
1506 6c881c52 Iustin Pop
  return wrapper
1507 db37da70 Michael Hanselmann
1508 db37da70 Michael Hanselmann
1509 c8d0be94 Michael Hanselmann
def _RequireNonDrainedQueue(fn):
1510 c8d0be94 Michael Hanselmann
  """Decorator checking for a non-drained queue.
1511 c8d0be94 Michael Hanselmann

1512 c8d0be94 Michael Hanselmann
  To be used with functions submitting new jobs.
1513 c8d0be94 Michael Hanselmann

1514 c8d0be94 Michael Hanselmann
  """
1515 c8d0be94 Michael Hanselmann
  def wrapper(self, *args, **kwargs):
1516 c8d0be94 Michael Hanselmann
    """Wrapper function.
1517 c8d0be94 Michael Hanselmann

1518 c8d0be94 Michael Hanselmann
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1519 c8d0be94 Michael Hanselmann

1520 c8d0be94 Michael Hanselmann
    """
1521 c8d0be94 Michael Hanselmann
    # Ok when sharing the big job queue lock, as the drain file is created when
1522 c8d0be94 Michael Hanselmann
    # the lock is exclusive.
1523 c8d0be94 Michael Hanselmann
    # Needs access to protected member, pylint: disable=W0212
1524 c8d0be94 Michael Hanselmann
    if self._drained:
1525 c8d0be94 Michael Hanselmann
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1526 6d5ea385 Michael Hanselmann
1527 6d5ea385 Michael Hanselmann
    if not self._accepting_jobs:
1528 6d5ea385 Michael Hanselmann
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1529 6d5ea385 Michael Hanselmann
1530 c8d0be94 Michael Hanselmann
    return fn(self, *args, **kwargs)
1531 c8d0be94 Michael Hanselmann
  return wrapper
1532 c8d0be94 Michael Hanselmann
1533 c8d0be94 Michael Hanselmann
1534 6c881c52 Iustin Pop
class JobQueue(object):
1535 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
1536 db37da70 Michael Hanselmann

1537 6c881c52 Iustin Pop
  """
1538 85f03e0d Michael Hanselmann
  def __init__(self, context):
1539 ea03467c Iustin Pop
    """Constructor for JobQueue.
1540 ea03467c Iustin Pop

1541 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
1542 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
1543 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
1544 ea03467c Iustin Pop
    running).
1545 ea03467c Iustin Pop

1546 ea03467c Iustin Pop
    @type context: GanetiContext
1547 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
1548 ea03467c Iustin Pop
        data and other ganeti objects
1549 ea03467c Iustin Pop

1550 ea03467c Iustin Pop
    """
1551 5bdce580 Michael Hanselmann
    self.context = context
1552 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
1553 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
1554 f1da30e6 Michael Hanselmann
1555 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
1556 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
1557 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
1558 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
1559 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
1560 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
1561 ebb80afa Guido Trotter
1562 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
1563 ebb80afa Guido Trotter
    self.release = self._lock.release
1564 85f03e0d Michael Hanselmann
1565 6d5ea385 Michael Hanselmann
    # Accept jobs by default
1566 6d5ea385 Michael Hanselmann
    self._accepting_jobs = True
1567 6d5ea385 Michael Hanselmann
1568 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
1569 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
1570 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1571 f1da30e6 Michael Hanselmann
1572 04ab05ce Michael Hanselmann
    # Read serial file
1573 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
1574 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
1575 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
1576 c4beba1c Iustin Pop
1577 23752136 Michael Hanselmann
    # Get initial list of nodes
1578 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
1579 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
1580 59303563 Iustin Pop
                       if n.master_candidate)
1581 8e00939c Michael Hanselmann
1582 8e00939c Michael Hanselmann
    # Remove master node
1583 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
1584 23752136 Michael Hanselmann
1585 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
1586 23752136 Michael Hanselmann
1587 6d5ea385 Michael Hanselmann
    self._queue_size = None
1588 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1589 6d5ea385 Michael Hanselmann
    assert ht.TInt(self._queue_size)
1590 ff699aa9 Michael Hanselmann
    self._drained = jstore.CheckDrainFlag()
1591 20571a26 Guido Trotter
1592 b95479a5 Michael Hanselmann
    # Job dependencies
1593 b95479a5 Michael Hanselmann
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1594 b95479a5 Michael Hanselmann
                                        self._EnqueueJobs)
1595 fcb21ad7 Michael Hanselmann
    self.context.glm.AddToLockMonitor(self.depmgr)
1596 b95479a5 Michael Hanselmann
1597 85f03e0d Michael Hanselmann
    # Setup worker pool
1598 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
1599 85f03e0d Michael Hanselmann
    try:
1600 de9d02c7 Michael Hanselmann
      self._InspectQueue()
1601 de9d02c7 Michael Hanselmann
    except:
1602 de9d02c7 Michael Hanselmann
      self._wpool.TerminateWorkers()
1603 de9d02c7 Michael Hanselmann
      raise
1604 711b5124 Michael Hanselmann
1605 de9d02c7 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1606 de9d02c7 Michael Hanselmann
  @_RequireOpenQueue
1607 de9d02c7 Michael Hanselmann
  def _InspectQueue(self):
1608 de9d02c7 Michael Hanselmann
    """Loads the whole job queue and resumes unfinished jobs.
1609 de9d02c7 Michael Hanselmann

1610 de9d02c7 Michael Hanselmann
    This function needs the lock here because WorkerPool.AddTask() may start a
1611 de9d02c7 Michael Hanselmann
    job while we're still doing our work.
1612 711b5124 Michael Hanselmann

1613 de9d02c7 Michael Hanselmann
    """
1614 de9d02c7 Michael Hanselmann
    logging.info("Inspecting job queue")
1615 de9d02c7 Michael Hanselmann
1616 7b5c4a69 Michael Hanselmann
    restartjobs = []
1617 7b5c4a69 Michael Hanselmann
1618 de9d02c7 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked()
1619 de9d02c7 Michael Hanselmann
    jobs_count = len(all_job_ids)
1620 de9d02c7 Michael Hanselmann
    lastinfo = time.time()
1621 de9d02c7 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1622 de9d02c7 Michael Hanselmann
      # Give an update every 1000 jobs or 10 seconds
1623 de9d02c7 Michael Hanselmann
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1624 de9d02c7 Michael Hanselmann
          idx == (jobs_count - 1)):
1625 de9d02c7 Michael Hanselmann
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1626 de9d02c7 Michael Hanselmann
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1627 711b5124 Michael Hanselmann
        lastinfo = time.time()
1628 94ed59a5 Iustin Pop
1629 de9d02c7 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1630 85f03e0d Michael Hanselmann
1631 de9d02c7 Michael Hanselmann
      # a failure in loading the job can cause 'None' to be returned
1632 de9d02c7 Michael Hanselmann
      if job is None:
1633 de9d02c7 Michael Hanselmann
        continue
1634 85f03e0d Michael Hanselmann
1635 de9d02c7 Michael Hanselmann
      status = job.CalcStatus()
1636 711b5124 Michael Hanselmann
1637 320d1daf Michael Hanselmann
      if status == constants.JOB_STATUS_QUEUED:
1638 7b5c4a69 Michael Hanselmann
        restartjobs.append(job)
1639 de9d02c7 Michael Hanselmann
1640 de9d02c7 Michael Hanselmann
      elif status in (constants.JOB_STATUS_RUNNING,
1641 47099cd1 Michael Hanselmann
                      constants.JOB_STATUS_WAITING,
1642 de9d02c7 Michael Hanselmann
                      constants.JOB_STATUS_CANCELING):
1643 de9d02c7 Michael Hanselmann
        logging.warning("Unfinished job %s found: %s", job.id, job)
1644 320d1daf Michael Hanselmann
1645 47099cd1 Michael Hanselmann
        if status == constants.JOB_STATUS_WAITING:
1646 320d1daf Michael Hanselmann
          # Restart job
1647 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1648 320d1daf Michael Hanselmann
          restartjobs.append(job)
1649 320d1daf Michael Hanselmann
        else:
1650 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1651 320d1daf Michael Hanselmann
                                "Unclean master daemon shutdown")
1652 45df0793 Michael Hanselmann
          job.Finalize()
1653 320d1daf Michael Hanselmann
1654 de9d02c7 Michael Hanselmann
        self.UpdateJobUnlocked(job)
1655 de9d02c7 Michael Hanselmann
1656 7b5c4a69 Michael Hanselmann
    if restartjobs:
1657 7b5c4a69 Michael Hanselmann
      logging.info("Restarting %s jobs", len(restartjobs))
1658 75d81fc8 Michael Hanselmann
      self._EnqueueJobsUnlocked(restartjobs)
1659 7b5c4a69 Michael Hanselmann
1660 de9d02c7 Michael Hanselmann
    logging.info("Job queue inspection finished")
1661 85f03e0d Michael Hanselmann
1662 fb1ffbca Michael Hanselmann
  def _GetRpc(self, address_list):
1663 fb1ffbca Michael Hanselmann
    """Gets RPC runner with context.
1664 fb1ffbca Michael Hanselmann

1665 fb1ffbca Michael Hanselmann
    """
1666 fb1ffbca Michael Hanselmann
    return rpc.JobQueueRunner(self.context, address_list)
1667 fb1ffbca Michael Hanselmann
1668 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1669 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1670 99aabbed Iustin Pop
  def AddNode(self, node):
1671 99aabbed Iustin Pop
    """Register a new node with the queue.
1672 99aabbed Iustin Pop

1673 99aabbed Iustin Pop
    @type node: L{objects.Node}
1674 99aabbed Iustin Pop
    @param node: the node object to be added
1675 99aabbed Iustin Pop

1676 99aabbed Iustin Pop
    """
1677 99aabbed Iustin Pop
    node_name = node.name
1678 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1679 23752136 Michael Hanselmann
1680 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1681 fb1ffbca Michael Hanselmann
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1682 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1683 c8457ce7 Iustin Pop
    if msg:
1684 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1685 c8457ce7 Iustin Pop
                      node_name, msg)
1686 23752136 Michael Hanselmann
1687 59303563 Iustin Pop
    if not node.master_candidate:
1688 59303563 Iustin Pop
      # remove if existing, ignoring errors
1689 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1690 59303563 Iustin Pop
      # and skip the replication of the job ids
1691 59303563 Iustin Pop
      return
1692 59303563 Iustin Pop
1693 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1694 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1695 23752136 Michael Hanselmann
1696 d2e03a33 Michael Hanselmann
    # Upload current serial file
1697 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1698 d2e03a33 Michael Hanselmann
1699 fb1ffbca Michael Hanselmann
    # Static address list
1700 fb1ffbca Michael Hanselmann
    addrs = [node.primary_ip]
1701 fb1ffbca Michael Hanselmann
1702 d2e03a33 Michael Hanselmann
    for file_name in files:
1703 9f774ee8 Michael Hanselmann
      # Read file content
1704 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1705 9f774ee8 Michael Hanselmann
1706 fb1ffbca Michael Hanselmann
      result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1707 fb1ffbca Michael Hanselmann
                                                        content)
1708 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1709 c8457ce7 Iustin Pop
      if msg:
1710 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1711 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1712 d2e03a33 Michael Hanselmann
1713 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1714 d2e03a33 Michael Hanselmann
1715 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1716 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1717 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1718 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1719 ea03467c Iustin Pop

1720 ea03467c Iustin Pop
    @type node_name: str
1721 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1722 ea03467c Iustin Pop

1723 ea03467c Iustin Pop
    """
1724 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1725 23752136 Michael Hanselmann
1726 7e950d31 Iustin Pop
  @staticmethod
1727 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1728 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1729 ea03467c Iustin Pop

1730 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1731 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1732 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1733 ea03467c Iustin Pop

1734 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1735 ea03467c Iustin Pop
    @type nodes: list
1736 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1737 ea03467c Iustin Pop
    @type failmsg: str
1738 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1739 ea03467c Iustin Pop

1740 ea03467c Iustin Pop
    """
1741 e74798c1 Michael Hanselmann
    failed = []
1742 e74798c1 Michael Hanselmann
    success = []
1743 e74798c1 Michael Hanselmann
1744 e74798c1 Michael Hanselmann
    for node in nodes:
1745 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1746 c8457ce7 Iustin Pop
      if msg:
1747 e74798c1 Michael Hanselmann
        failed.append(node)
1748 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1749 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1750 c8457ce7 Iustin Pop
      else:
1751 c8457ce7 Iustin Pop
        success.append(node)
1752 e74798c1 Michael Hanselmann
1753 e74798c1 Michael Hanselmann
    # +1 for the master node
1754 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1755 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1756 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1757 e74798c1 Michael Hanselmann
1758 99aabbed Iustin Pop
  def _GetNodeIp(self):
1759 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1760 99aabbed Iustin Pop

1761 ea03467c Iustin Pop
    @rtype: (list, list)
1762 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1763 ea03467c Iustin Pop
        names and the second one with the node addresses
1764 ea03467c Iustin Pop

1765 99aabbed Iustin Pop
    """
1766 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1767 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1768 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1769 99aabbed Iustin Pop
    return name_list, addr_list
1770 99aabbed Iustin Pop
1771 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1772 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1773 8e00939c Michael Hanselmann

1774 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1775 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1776 ea03467c Iustin Pop

1777 ea03467c Iustin Pop
    @type file_name: str
1778 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1779 ea03467c Iustin Pop
    @type data: str
1780 ea03467c Iustin Pop
    @param data: the new contents of the file
1781 4c36bdf5 Guido Trotter
    @type replicate: boolean
1782 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1783 ea03467c Iustin Pop

1784 8e00939c Michael Hanselmann
    """
1785 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1786 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1787 82b22e19 René Nussbaumer
                    gid=getents.masterd_gid)
1788 8e00939c Michael Hanselmann
1789 4c36bdf5 Guido Trotter
    if replicate:
1790 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1791 fb1ffbca Michael Hanselmann
      result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1792 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1793 23752136 Michael Hanselmann
1794 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1795 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1796 ea03467c Iustin Pop

1797 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1798 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1799 ea03467c Iustin Pop

1800 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1801 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1802 ea03467c Iustin Pop

1803 ea03467c Iustin Pop
    """
1804 dd875d32 Michael Hanselmann
    # Rename them locally
1805 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1806 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1807 abc1f2ce Michael Hanselmann
1808 dd875d32 Michael Hanselmann
    # ... and on all nodes
1809 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1810 fb1ffbca Michael Hanselmann
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1811 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1812 abc1f2ce Michael Hanselmann
1813 7e950d31 Iustin Pop
  @staticmethod
1814 7e950d31 Iustin Pop
  def _FormatJobID(job_id):
1815 ea03467c Iustin Pop
    """Convert a job ID to string format.
1816 ea03467c Iustin Pop

1817 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
1818 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
1819 ea03467c Iustin Pop
    abstract this change.
1820 ea03467c Iustin Pop

1821 ea03467c Iustin Pop
    @type job_id: int or long
1822 ea03467c Iustin Pop
    @param job_id: the numeric job id
1823 ea03467c Iustin Pop
    @rtype: str
1824 ea03467c Iustin Pop
    @return: the formatted job id
1825 ea03467c Iustin Pop

1826 ea03467c Iustin Pop
    """
1827 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
1828 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1829 85f03e0d Michael Hanselmann
    if job_id < 0:
1830 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1831 85f03e0d Michael Hanselmann
1832 85f03e0d Michael Hanselmann
    return str(job_id)
1833 85f03e0d Michael Hanselmann
1834 58b22b6e Michael Hanselmann
  @classmethod
1835 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
1836 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
1837 58b22b6e Michael Hanselmann

1838 58b22b6e Michael Hanselmann
    @type job_id: str
1839 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
1840 58b22b6e Michael Hanselmann
    @rtype: str
1841 58b22b6e Michael Hanselmann
    @return: Directory name
1842 58b22b6e Michael Hanselmann

1843 58b22b6e Michael Hanselmann
    """
1844 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1845 58b22b6e Michael Hanselmann
1846 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1847 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1848 f1da30e6 Michael Hanselmann

1849 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1850 f1da30e6 Michael Hanselmann

1851 009e73d0 Iustin Pop
    @type count: integer
1852 009e73d0 Iustin Pop
    @param count: how many serials to return
1853 ea03467c Iustin Pop
    @rtype: str
1854 ea03467c Iustin Pop
    @return: a string representing the job identifier.
1855 f1da30e6 Michael Hanselmann

1856 f1da30e6 Michael Hanselmann
    """
1857 719f8fba Michael Hanselmann
    assert ht.TPositiveInt(count)
1858 719f8fba Michael Hanselmann
1859 f1da30e6 Michael Hanselmann
    # New number
1860 009e73d0 Iustin Pop
    serial = self._last_serial + count
1861 f1da30e6 Michael Hanselmann
1862 f1da30e6 Michael Hanselmann
    # Write to file
1863 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1864 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1865 f1da30e6 Michael Hanselmann
1866 009e73d0 Iustin Pop
    result = [self._FormatJobID(v)
1867 3c88bf36 Michael Hanselmann
              for v in range(self._last_serial + 1, serial + 1)]
1868 3c88bf36 Michael Hanselmann
1869 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1870 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1871 f1da30e6 Michael Hanselmann
1872 3c88bf36 Michael Hanselmann
    assert len(result) == count
1873 3c88bf36 Michael Hanselmann
1874 009e73d0 Iustin Pop
    return result
1875 f1da30e6 Michael Hanselmann
1876 85f03e0d Michael Hanselmann
  @staticmethod
1877 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1878 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1879 ea03467c Iustin Pop

1880 ea03467c Iustin Pop
    @type job_id: str
1881 ea03467c Iustin Pop
    @param job_id: the job identifier
1882 ea03467c Iustin Pop
    @rtype: str
1883 ea03467c Iustin Pop
    @return: the path to the job file
1884 ea03467c Iustin Pop

1885 ea03467c Iustin Pop
    """
1886 c4feafe8 Iustin Pop
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1887 f1da30e6 Michael Hanselmann
1888 58b22b6e Michael Hanselmann
  @classmethod
1889 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
1890 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1891 ea03467c Iustin Pop

1892 ea03467c Iustin Pop
    @type job_id: str
1893 ea03467c Iustin Pop
    @param job_id: the job identifier
1894 ea03467c Iustin Pop
    @rtype: str
1895 ea03467c Iustin Pop
    @return: the path to the archived job file
1896 ea03467c Iustin Pop

1897 ea03467c Iustin Pop
    """
1898 0411c011 Iustin Pop
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1899 0411c011 Iustin Pop
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1900 0cb94105 Michael Hanselmann
1901 cb66225d Michael Hanselmann
  @staticmethod
1902 cb66225d Michael Hanselmann
  def _GetJobIDsUnlocked(sort=True):
1903 911a495b Iustin Pop
    """Return all known job IDs.
1904 911a495b Iustin Pop

1905 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1906 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1907 ac0930b9 Iustin Pop
    extra IDs).
1908 ac0930b9 Iustin Pop

1909 85a1c57d Guido Trotter
    @type sort: boolean
1910 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1911 ea03467c Iustin Pop
    @rtype: list
1912 ea03467c Iustin Pop
    @return: the list of job IDs
1913 ea03467c Iustin Pop

1914 911a495b Iustin Pop
    """
1915 85a1c57d Guido Trotter
    jlist = []
1916 b5b8309d Guido Trotter
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1917 cb66225d Michael Hanselmann
      m = constants.JOB_FILE_RE.match(filename)
1918 85a1c57d Guido Trotter
      if m:
1919 85a1c57d Guido Trotter
        jlist.append(m.group(1))
1920 85a1c57d Guido Trotter
    if sort:
1921 85a1c57d Guido Trotter
      jlist = utils.NiceSort(jlist)
1922 f0d874fe Iustin Pop
    return jlist
1923 911a495b Iustin Pop
1924 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1925 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1926 ea03467c Iustin Pop

1927 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1928 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1929 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1930 ea03467c Iustin Pop

1931 ea03467c Iustin Pop
    @param job_id: the job id
1932 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1933 ea03467c Iustin Pop
    @return: either None or the job object
1934 ea03467c Iustin Pop

1935 ea03467c Iustin Pop
    """
1936 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
1937 5685c1a5 Michael Hanselmann
    if job:
1938 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
1939 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Found read-only job in memcache"
1940 5685c1a5 Michael Hanselmann
      return job
1941 ac0930b9 Iustin Pop
1942 3d6c5566 Guido Trotter
    try:
1943 194c8ca4 Michael Hanselmann
      job = self._LoadJobFromDisk(job_id, False)
1944 aa9f8167 Iustin Pop
      if job is None:
1945 aa9f8167 Iustin Pop
        return job
1946 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
1947 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
1948 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
1949 3d6c5566 Guido Trotter
      if old_path == new_path:
1950 3d6c5566 Guido Trotter
        # job already archived (future case)
1951 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
1952 3d6c5566 Guido Trotter
      else:
1953 3d6c5566 Guido Trotter
        # non-archived case
1954 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
1955 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
1956 3d6c5566 Guido Trotter
      return None
1957 162c8636 Guido Trotter
1958 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Job just loaded is not writable"
1959 c0f6d0d8 Michael Hanselmann
1960 162c8636 Guido Trotter
    self._memcache[job_id] = job
1961 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
1962 162c8636 Guido Trotter
    return job
1963 162c8636 Guido Trotter
1964 c0f6d0d8 Michael Hanselmann
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1965 162c8636 Guido Trotter
    """Load the given job file from disk.
1966 162c8636 Guido Trotter

1967 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1968 162c8636 Guido Trotter

1969 162c8636 Guido Trotter
    @type job_id: string
1970 162c8636 Guido Trotter
    @param job_id: job identifier
1971 194c8ca4 Michael Hanselmann
    @type try_archived: bool
1972 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
1973 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
1974 162c8636 Guido Trotter
    @return: either None or the job object
1975 162c8636 Guido Trotter

1976 162c8636 Guido Trotter
    """
1977 c0f6d0d8 Michael Hanselmann
    path_functions = [(self._GetJobPath, True)]
1978 194c8ca4 Michael Hanselmann
1979 194c8ca4 Michael Hanselmann
    if try_archived:
1980 c0f6d0d8 Michael Hanselmann
      path_functions.append((self._GetArchivedJobPath, False))
1981 194c8ca4 Michael Hanselmann
1982 194c8ca4 Michael Hanselmann
    raw_data = None
1983 c0f6d0d8 Michael Hanselmann
    writable_default = None
1984 194c8ca4 Michael Hanselmann
1985 c0f6d0d8 Michael Hanselmann
    for (fn, writable_default) in path_functions:
1986 194c8ca4 Michael Hanselmann
      filepath = fn(job_id)
1987 194c8ca4 Michael Hanselmann
      logging.debug("Loading job from %s", filepath)
1988 194c8ca4 Michael Hanselmann
      try:
1989 194c8ca4 Michael Hanselmann
        raw_data = utils.ReadFile(filepath)
1990 194c8ca4 Michael Hanselmann
      except EnvironmentError, err:
1991 194c8ca4 Michael Hanselmann
        if err.errno != errno.ENOENT:
1992 194c8ca4 Michael Hanselmann
          raise
1993 194c8ca4 Michael Hanselmann
      else:
1994 194c8ca4 Michael Hanselmann
        break
1995 194c8ca4 Michael Hanselmann
1996 194c8ca4 Michael Hanselmann
    if not raw_data:
1997 194c8ca4 Michael Hanselmann
      return None
1998 13998ef2 Michael Hanselmann
1999 c0f6d0d8 Michael Hanselmann
    if writable is None:
2000 c0f6d0d8 Michael Hanselmann
      writable = writable_default
2001 c0f6d0d8 Michael Hanselmann
2002 94ed59a5 Iustin Pop
    try:
2003 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
2004 c0f6d0d8 Michael Hanselmann
      job = _QueuedJob.Restore(self, data, writable)
2005 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
2006 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
2007 94ed59a5 Iustin Pop
2008 ac0930b9 Iustin Pop
    return job
2009 f1da30e6 Michael Hanselmann
2010 c0f6d0d8 Michael Hanselmann
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2011 0f9c08dc Guido Trotter
    """Load the given job file from disk.
2012 0f9c08dc Guido Trotter

2013 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2014 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
2015 0f9c08dc Guido Trotter
    exception is logged.
2016 0f9c08dc Guido Trotter

2017 0f9c08dc Guido Trotter
    @type job_id: string
2018 0f9c08dc Guido Trotter
    @param job_id: job identifier
2019 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2020 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2021 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
2022 0f9c08dc Guido Trotter
    @return: either None or the job object
2023 0f9c08dc Guido Trotter

2024 0f9c08dc Guido Trotter
    """
2025 0f9c08dc Guido Trotter
    try:
2026 c0f6d0d8 Michael Hanselmann
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2027 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
2028 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
2029 0f9c08dc Guido Trotter
      return None
2030 0f9c08dc Guido Trotter
2031 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
2032 20571a26 Guido Trotter
    """Update the queue size.
2033 20571a26 Guido Trotter

2034 20571a26 Guido Trotter
    """
2035 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2036 20571a26 Guido Trotter
2037 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2038 20571a26 Guido Trotter
  @_RequireOpenQueue
2039 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
2040 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
2041 3ccafd0e Iustin Pop

2042 ea03467c Iustin Pop
    @type drain_flag: boolean
2043 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
2044 ea03467c Iustin Pop

2045 3ccafd0e Iustin Pop
    """
2046 ff699aa9 Michael Hanselmann
    jstore.SetDrainFlag(drain_flag)
2047 20571a26 Guido Trotter
2048 20571a26 Guido Trotter
    self._drained = drain_flag
2049 20571a26 Guido Trotter
2050 3ccafd0e Iustin Pop
    return True
2051 3ccafd0e Iustin Pop
2052 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2053 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
2054 85f03e0d Michael Hanselmann
    """Create and store a new job.
2055 f1da30e6 Michael Hanselmann

2056 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
2057 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
2058 c3f0a12f Iustin Pop

2059 009e73d0 Iustin Pop
    @type job_id: job ID
2060 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
2061 c3f0a12f Iustin Pop
    @type ops: list
2062 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
2063 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
2064 7beb1e53 Guido Trotter
    @return: the job object to be queued
2065 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2066 e71c8147 Michael Hanselmann
    @raise errors.GenericError: If an opcode is not valid
2067 c3f0a12f Iustin Pop

2068 c3f0a12f Iustin Pop
    """
2069 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2070 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
2071 f87b405e Michael Hanselmann
2072 c0f6d0d8 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops, True)
2073 f1da30e6 Michael Hanselmann
2074 e71c8147 Michael Hanselmann
    # Check priority
2075 e71c8147 Michael Hanselmann
    for idx, op in enumerate(job.ops):
2076 e71c8147 Michael Hanselmann
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2077 e71c8147 Michael Hanselmann
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2078 e71c8147 Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2079 e71c8147 Michael Hanselmann
                                  " are %s" % (idx, op.priority, allowed))
2080 e71c8147 Michael Hanselmann
2081 b247c6fc Michael Hanselmann
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2082 b247c6fc Michael Hanselmann
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2083 b247c6fc Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2084 b247c6fc Michael Hanselmann
                                  " match %s: %s" %
2085 b247c6fc Michael Hanselmann
                                  (idx, opcodes.TNoRelativeJobDependencies,
2086 b247c6fc Michael Hanselmann
                                   dependencies))
2087 b247c6fc Michael Hanselmann
2088 f1da30e6 Michael Hanselmann
    # Write to disk
2089 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
2090 f1da30e6 Michael Hanselmann
2091 20571a26 Guido Trotter
    self._queue_size += 1
2092 20571a26 Guido Trotter
2093 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
2094 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
2095 ac0930b9 Iustin Pop
2096 7beb1e53 Guido Trotter
    return job
2097 f1da30e6 Michael Hanselmann
2098 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2099 2971c913 Iustin Pop
  @_RequireOpenQueue
2100 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2101 2971c913 Iustin Pop
  def SubmitJob(self, ops):
2102 2971c913 Iustin Pop
    """Create and store a new job.
2103 2971c913 Iustin Pop

2104 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2105 2971c913 Iustin Pop

2106 2971c913 Iustin Pop
    """
2107 b247c6fc Michael Hanselmann
    (job_id, ) = self._NewSerialsUnlocked(1)
2108 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2109 7beb1e53 Guido Trotter
    return job_id
2110 2971c913 Iustin Pop
2111 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2112 2971c913 Iustin Pop
  @_RequireOpenQueue
2113 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2114 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
2115 2971c913 Iustin Pop
    """Create and store multiple jobs.
2116 2971c913 Iustin Pop

2117 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2118 2971c913 Iustin Pop

2119 2971c913 Iustin Pop
    """
2120 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2121 b247c6fc Michael Hanselmann
2122 b247c6fc Michael Hanselmann
    (results, added_jobs) = \
2123 b247c6fc Michael Hanselmann
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2124 7b5c4a69 Michael Hanselmann
2125 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked(added_jobs)
2126 2971c913 Iustin Pop
2127 2971c913 Iustin Pop
    return results
2128 2971c913 Iustin Pop
2129 b247c6fc Michael Hanselmann
  @staticmethod
2130 b247c6fc Michael Hanselmann
  def _FormatSubmitError(msg, ops):
2131 b247c6fc Michael Hanselmann
    """Formats errors which occurred while submitting a job.
2132 b247c6fc Michael Hanselmann

2133 b247c6fc Michael Hanselmann
    """
2134 b247c6fc Michael Hanselmann
    return ("%s; opcodes %s" %
2135 b247c6fc Michael Hanselmann
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2136 b247c6fc Michael Hanselmann
2137 b247c6fc Michael Hanselmann
  @staticmethod
2138 b247c6fc Michael Hanselmann
  def _ResolveJobDependencies(resolve_fn, deps):
2139 b247c6fc Michael Hanselmann
    """Resolves relative job IDs in dependencies.
2140 b247c6fc Michael Hanselmann

2141 b247c6fc Michael Hanselmann
    @type resolve_fn: callable
2142 b247c6fc Michael Hanselmann
    @param resolve_fn: Function to resolve a relative job ID
2143 b247c6fc Michael Hanselmann
    @type deps: list
2144 b247c6fc Michael Hanselmann
    @param deps: Dependencies
2145 b247c6fc Michael Hanselmann
    @rtype: list
2146 b247c6fc Michael Hanselmann
    @return: Resolved dependencies
2147 b247c6fc Michael Hanselmann

2148 b247c6fc Michael Hanselmann
    """
2149 b247c6fc Michael Hanselmann
    result = []
2150 b247c6fc Michael Hanselmann
2151 b247c6fc Michael Hanselmann
    for (dep_job_id, dep_status) in deps:
2152 b247c6fc Michael Hanselmann
      if ht.TRelativeJobId(dep_job_id):
2153 b247c6fc Michael Hanselmann
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2154 b247c6fc Michael Hanselmann
        try:
2155 b247c6fc Michael Hanselmann
          job_id = resolve_fn(dep_job_id)
2156 b247c6fc Michael Hanselmann
        except IndexError:
2157 b247c6fc Michael Hanselmann
          # Abort
2158 b247c6fc Michael Hanselmann
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2159 b247c6fc Michael Hanselmann
      else:
2160 b247c6fc Michael Hanselmann
        job_id = dep_job_id
2161 b247c6fc Michael Hanselmann
2162 b247c6fc Michael Hanselmann
      result.append((job_id, dep_status))
2163 b247c6fc Michael Hanselmann
2164 b247c6fc Michael Hanselmann
    return (True, result)
2165 b247c6fc Michael Hanselmann
2166 b247c6fc Michael Hanselmann
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2167 b247c6fc Michael Hanselmann
    """Create and store multiple jobs.
2168 b247c6fc Michael Hanselmann

2169 b247c6fc Michael Hanselmann
    @see: L{_SubmitJobUnlocked}
2170 b247c6fc Michael Hanselmann

2171 b247c6fc Michael Hanselmann
    """
2172 b247c6fc Michael Hanselmann
    results = []
2173 b247c6fc Michael Hanselmann
    added_jobs = []
2174 b247c6fc Michael Hanselmann
2175 b247c6fc Michael Hanselmann
    def resolve_fn(job_idx, reljobid):
2176 b247c6fc Michael Hanselmann
      assert reljobid < 0
2177 b247c6fc Michael Hanselmann
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2178 b247c6fc Michael Hanselmann
2179 b247c6fc Michael Hanselmann
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2180 b247c6fc Michael Hanselmann
      for op in ops:
2181 b247c6fc Michael Hanselmann
        if getattr(op, opcodes.DEPEND_ATTR, None):
2182 b247c6fc Michael Hanselmann
          (status, data) = \
2183 b247c6fc Michael Hanselmann
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2184 b247c6fc Michael Hanselmann
                                         op.depends)
2185 b247c6fc Michael Hanselmann
          if not status:
2186 b247c6fc Michael Hanselmann
            # Abort resolving dependencies
2187 b247c6fc Michael Hanselmann
            assert ht.TNonEmptyString(data), "No error message"
2188 b247c6fc Michael Hanselmann
            break
2189 b247c6fc Michael Hanselmann
          # Use resolved dependencies
2190 b247c6fc Michael Hanselmann
          op.depends = data
2191 b247c6fc Michael Hanselmann
      else:
2192 b247c6fc Michael Hanselmann
        try:
2193 b247c6fc Michael Hanselmann
          job = self._SubmitJobUnlocked(job_id, ops)
2194 b247c6fc Michael Hanselmann
        except errors.GenericError, err:
2195 b247c6fc Michael Hanselmann
          status = False
2196 b247c6fc Michael Hanselmann
          data = self._FormatSubmitError(str(err), ops)
2197 b247c6fc Michael Hanselmann
        else:
2198 b247c6fc Michael Hanselmann
          status = True
2199 b247c6fc Michael Hanselmann
          data = job_id
2200 b247c6fc Michael Hanselmann
          added_jobs.append(job)
2201 b247c6fc Michael Hanselmann
2202 b247c6fc Michael Hanselmann
      results.append((status, data))
2203 b247c6fc Michael Hanselmann
2204 b247c6fc Michael Hanselmann
    return (results, added_jobs)
2205 b247c6fc Michael Hanselmann
2206 75d81fc8 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2207 7b5c4a69 Michael Hanselmann
  def _EnqueueJobs(self, jobs):
2208 7b5c4a69 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2209 7b5c4a69 Michael Hanselmann

2210 7b5c4a69 Michael Hanselmann
    @type jobs: list
2211 7b5c4a69 Michael Hanselmann
    @param jobs: List of all jobs
2212 7b5c4a69 Michael Hanselmann

2213 7b5c4a69 Michael Hanselmann
    """
2214 75d81fc8 Michael Hanselmann
    return self._EnqueueJobsUnlocked(jobs)
2215 75d81fc8 Michael Hanselmann
2216 75d81fc8 Michael Hanselmann
  def _EnqueueJobsUnlocked(self, jobs):
2217 75d81fc8 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2218 75d81fc8 Michael Hanselmann

2219 75d81fc8 Michael Hanselmann
    @type jobs: list
2220 75d81fc8 Michael Hanselmann
    @param jobs: List of all jobs
2221 75d81fc8 Michael Hanselmann

2222 75d81fc8 Michael Hanselmann
    """
2223 75d81fc8 Michael Hanselmann
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2224 7b5c4a69 Michael Hanselmann
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2225 7b5c4a69 Michael Hanselmann
                             priority=[job.CalcPriority() for job in jobs])
2226 7b5c4a69 Michael Hanselmann
2227 b95479a5 Michael Hanselmann
  def _GetJobStatusForDependencies(self, job_id):
2228 b95479a5 Michael Hanselmann
    """Gets the status of a job for dependencies.
2229 b95479a5 Michael Hanselmann

2230 b95479a5 Michael Hanselmann
    @type job_id: string
2231 b95479a5 Michael Hanselmann
    @param job_id: Job ID
2232 b95479a5 Michael Hanselmann
    @raise errors.JobLost: If job can't be found
2233 b95479a5 Michael Hanselmann

2234 b95479a5 Michael Hanselmann
    """
2235 b95479a5 Michael Hanselmann
    if not isinstance(job_id, basestring):
2236 b95479a5 Michael Hanselmann
      job_id = self._FormatJobID(job_id)
2237 b95479a5 Michael Hanselmann
2238 b95479a5 Michael Hanselmann
    # Not using in-memory cache as doing so would require an exclusive lock
2239 b95479a5 Michael Hanselmann
2240 b95479a5 Michael Hanselmann
    # Try to load from disk
2241 c0f6d0d8 Michael Hanselmann
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2242 c0f6d0d8 Michael Hanselmann
2243 17385bd2 Andrea Spadaccini
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2244 b95479a5 Michael Hanselmann
2245 b95479a5 Michael Hanselmann
    if job:
2246 b95479a5 Michael Hanselmann
      return job.CalcStatus()
2247 b95479a5 Michael Hanselmann
2248 b95479a5 Michael Hanselmann
    raise errors.JobLost("Job %s not found" % job_id)
2249 b95479a5 Michael Hanselmann
2250 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2251 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
2252 ea03467c Iustin Pop
    """Update a job's on disk storage.
2253 ea03467c Iustin Pop

2254 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
2255 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
2256 ea03467c Iustin Pop
    nodes.
2257 ea03467c Iustin Pop

2258 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
2259 ea03467c Iustin Pop
    @param job: the changed job
2260 4c36bdf5 Guido Trotter
    @type replicate: boolean
2261 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
2262 ea03467c Iustin Pop

2263 ea03467c Iustin Pop
    """
2264 66bd7445 Michael Hanselmann
    if __debug__:
2265 66bd7445 Michael Hanselmann
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2266 66bd7445 Michael Hanselmann
      assert (finalized ^ (job.end_timestamp is None))
2267 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't update read-only job"
2268 66bd7445 Michael Hanselmann
2269 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
2270 a182a3ed Michael Hanselmann
    data = serializer.DumpJson(job.Serialize())
2271 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
2272 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
2273 ac0930b9 Iustin Pop
2274 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2275 5c735209 Iustin Pop
                        timeout):
2276 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
2277 6c5a7090 Michael Hanselmann

2278 6c5a7090 Michael Hanselmann
    @type job_id: string
2279 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
2280 6c5a7090 Michael Hanselmann
    @type fields: list of strings
2281 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
2282 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
2283 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
2284 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
2285 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
2286 5c735209 Iustin Pop
    @type timeout: float
2287 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
2288 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
2289 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
2290 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
2291 ea03467c Iustin Pop

2292 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
2293 ea03467c Iustin Pop
        we instead return a special value,
2294 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2295 ea03467c Iustin Pop
        as such by the clients
2296 6c5a7090 Michael Hanselmann

2297 6c5a7090 Michael Hanselmann
    """
2298 c0f6d0d8 Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2299 c0f6d0d8 Michael Hanselmann
                             writable=False)
2300 989a8bee Michael Hanselmann
2301 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
2302 989a8bee Michael Hanselmann
2303 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
2304 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
2305 dfe57c22 Michael Hanselmann
2306 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2307 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2308 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
2309 188c5e0a Michael Hanselmann
    """Cancels a job.
2310 188c5e0a Michael Hanselmann

2311 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
2312 ea03467c Iustin Pop

2313 188c5e0a Michael Hanselmann
    @type job_id: string
2314 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
2315 188c5e0a Michael Hanselmann

2316 188c5e0a Michael Hanselmann
    """
2317 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
2318 188c5e0a Michael Hanselmann
2319 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2320 188c5e0a Michael Hanselmann
    if not job:
2321 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2322 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
2323 fbf0262f Michael Hanselmann
2324 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Can't cancel read-only job"
2325 c0f6d0d8 Michael Hanselmann
2326 099b2870 Michael Hanselmann
    (success, msg) = job.Cancel()
2327 188c5e0a Michael Hanselmann
2328 099b2870 Michael Hanselmann
    if success:
2329 66bd7445 Michael Hanselmann
      # If the job was finalized (e.g. cancelled), this is the final write
2330 66bd7445 Michael Hanselmann
      # allowed. The job can be archived anytime.
2331 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
2332 fbf0262f Michael Hanselmann
2333 099b2870 Michael Hanselmann
    return (success, msg)
2334 fbf0262f Michael Hanselmann
2335 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
2336 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
2337 d7fd1f28 Michael Hanselmann
    """Archives jobs.
2338 c609f802 Michael Hanselmann

2339 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
2340 25e7b43f Iustin Pop
    @param jobs: Job objects
2341 d7fd1f28 Michael Hanselmann
    @rtype: int
2342 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
2343 c609f802 Michael Hanselmann

2344 c609f802 Michael Hanselmann
    """
2345 d7fd1f28 Michael Hanselmann
    archive_jobs = []
2346 d7fd1f28 Michael Hanselmann
    rename_files = []
2347 d7fd1f28 Michael Hanselmann
    for job in jobs:
2348 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't archive read-only job"
2349 c0f6d0d8 Michael Hanselmann
2350 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2351 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
2352 d7fd1f28 Michael Hanselmann
        continue
2353 c609f802 Michael Hanselmann
2354 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
2355 c609f802 Michael Hanselmann
2356 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
2357 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
2358 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
2359 c609f802 Michael Hanselmann
2360 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
2361 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
2362 f1da30e6 Michael Hanselmann
2363 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
2364 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
2365 d7fd1f28 Michael Hanselmann
2366 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2367 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
2368 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
2369 20571a26 Guido Trotter
    # archived jobs to fix this.
2370 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
2371 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
2372 78d12585 Michael Hanselmann
2373 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2374 07cd723a Iustin Pop
  @_RequireOpenQueue
2375 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
2376 07cd723a Iustin Pop
    """Archives a job.
2377 07cd723a Iustin Pop

2378 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2379 ea03467c Iustin Pop

2380 07cd723a Iustin Pop
    @type job_id: string
2381 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
2382 78d12585 Michael Hanselmann
    @rtype: bool
2383 78d12585 Michael Hanselmann
    @return: Whether job was archived
2384 07cd723a Iustin Pop

2385 07cd723a Iustin Pop
    """
2386 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
2387 78d12585 Michael Hanselmann
2388 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2389 78d12585 Michael Hanselmann
    if not job:
2390 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2391 78d12585 Michael Hanselmann
      return False
2392 78d12585 Michael Hanselmann
2393 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
2394 07cd723a Iustin Pop
2395 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2396 07cd723a Iustin Pop
  @_RequireOpenQueue
2397 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
2398 07cd723a Iustin Pop
    """Archives all jobs based on age.
2399 07cd723a Iustin Pop

2400 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
2401 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
2402 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
2403 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
2404 07cd723a Iustin Pop

2405 07cd723a Iustin Pop
    @type age: int
2406 07cd723a Iustin Pop
    @param age: the minimum age in seconds
2407 07cd723a Iustin Pop

2408 07cd723a Iustin Pop
    """
2409 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
2410 07cd723a Iustin Pop
2411 07cd723a Iustin Pop
    now = time.time()
2412 f8ad5591 Michael Hanselmann
    end_time = now + timeout
2413 f8ad5591 Michael Hanselmann
    archived_count = 0
2414 f8ad5591 Michael Hanselmann
    last_touched = 0
2415 f8ad5591 Michael Hanselmann
2416 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
2417 d7fd1f28 Michael Hanselmann
    pending = []
2418 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
2419 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
2420 f8ad5591 Michael Hanselmann
2421 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
2422 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
2423 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
2424 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
2425 f8ad5591 Michael Hanselmann
        break
2426 f8ad5591 Michael Hanselmann
2427 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
2428 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
2429 f8ad5591 Michael Hanselmann
      if job:
2430 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
2431 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
2432 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
2433 f8ad5591 Michael Hanselmann
          else:
2434 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
2435 07cd723a Iustin Pop
        else:
2436 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
2437 f8ad5591 Michael Hanselmann
2438 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
2439 d7fd1f28 Michael Hanselmann
          pending.append(job)
2440 d7fd1f28 Michael Hanselmann
2441 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
2442 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
2443 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
2444 d7fd1f28 Michael Hanselmann
            pending = []
2445 f8ad5591 Michael Hanselmann
2446 d7fd1f28 Michael Hanselmann
    if pending:
2447 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
2448 07cd723a Iustin Pop
2449 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
2450 07cd723a Iustin Pop
2451 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
2452 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
2453 e2715f69 Michael Hanselmann

2454 ea03467c Iustin Pop
    @type job_ids: list
2455 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
2456 ea03467c Iustin Pop
    @type fields: list
2457 ea03467c Iustin Pop
    @param fields: names of fields to return
2458 ea03467c Iustin Pop
    @rtype: list
2459 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
2460 ea03467c Iustin Pop
        the requested fields
2461 e2715f69 Michael Hanselmann

2462 e2715f69 Michael Hanselmann
    """
2463 85f03e0d Michael Hanselmann
    jobs = []
2464 9f7b4967 Guido Trotter
    list_all = False
2465 9f7b4967 Guido Trotter
    if not job_ids:
2466 9f7b4967 Guido Trotter
      # Since files are added to/removed from the queue atomically, there's no
2467 9f7b4967 Guido Trotter
      # risk of getting the job ids in an inconsistent state.
2468 9f7b4967 Guido Trotter
      job_ids = self._GetJobIDsUnlocked()
2469 9f7b4967 Guido Trotter
      list_all = True
2470 e2715f69 Michael Hanselmann
2471 9f7b4967 Guido Trotter
    for job_id in job_ids:
2472 194c8ca4 Michael Hanselmann
      job = self.SafeLoadJobFromDisk(job_id, True)
2473 9f7b4967 Guido Trotter
      if job is not None:
2474 6a290889 Guido Trotter
        jobs.append(job.GetInfo(fields))
2475 9f7b4967 Guido Trotter
      elif not list_all:
2476 9f7b4967 Guido Trotter
        jobs.append(None)
2477 e2715f69 Michael Hanselmann
2478 85f03e0d Michael Hanselmann
    return jobs
2479 e2715f69 Michael Hanselmann
2480 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2481 6d5ea385 Michael Hanselmann
  def PrepareShutdown(self):
2482 6d5ea385 Michael Hanselmann
    """Prepare to stop the job queue.
2483 6d5ea385 Michael Hanselmann

2484 6d5ea385 Michael Hanselmann
    Disables execution of jobs in the workerpool and returns whether there are
2485 6d5ea385 Michael Hanselmann
    any jobs currently running. If the latter is the case, the job queue is not
2486 6d5ea385 Michael Hanselmann
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2487 6d5ea385 Michael Hanselmann
    be called without interfering with any job. Queued and unfinished jobs will
2488 6d5ea385 Michael Hanselmann
    be resumed next time.
2489 6d5ea385 Michael Hanselmann

2490 6d5ea385 Michael Hanselmann
    Once this function has been called no new job submissions will be accepted
2491 6d5ea385 Michael Hanselmann
    (see L{_RequireNonDrainedQueue}).
2492 6d5ea385 Michael Hanselmann

2493 6d5ea385 Michael Hanselmann
    @rtype: bool
2494 6d5ea385 Michael Hanselmann
    @return: Whether there are any running jobs
2495 6d5ea385 Michael Hanselmann

2496 6d5ea385 Michael Hanselmann
    """
2497 6d5ea385 Michael Hanselmann
    if self._accepting_jobs:
2498 6d5ea385 Michael Hanselmann
      self._accepting_jobs = False
2499 6d5ea385 Michael Hanselmann
2500 6d5ea385 Michael Hanselmann
      # Tell worker pool to stop processing pending tasks
2501 6d5ea385 Michael Hanselmann
      self._wpool.SetActive(False)
2502 6d5ea385 Michael Hanselmann
2503 6d5ea385 Michael Hanselmann
    return self._wpool.HasRunningTasks()
2504 6d5ea385 Michael Hanselmann
2505 6d5ea385 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2506 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2507 e2715f69 Michael Hanselmann
  def Shutdown(self):
2508 e2715f69 Michael Hanselmann
    """Stops the job queue.
2509 e2715f69 Michael Hanselmann

2510 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
2511 ea03467c Iustin Pop

2512 e2715f69 Michael Hanselmann
    """
2513 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
2514 85f03e0d Michael Hanselmann
2515 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
2516 a71f9c7d Guido Trotter
    self._queue_filelock = None