Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 80b207df

History | View | Annotate | Download (74.8 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 942e2262 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 e2715f69 Michael Hanselmann
import logging
33 f1da30e6 Michael Hanselmann
import errno
34 f1048938 Iustin Pop
import time
35 5685c1a5 Michael Hanselmann
import weakref
36 b95479a5 Michael Hanselmann
import threading
37 dfc8824a Michael Hanselmann
import itertools
38 498ae1cc Iustin Pop
39 6c2549d6 Guido Trotter
try:
40 b459a848 Andrea Spadaccini
  # pylint: disable=E0611
41 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
42 6c2549d6 Guido Trotter
except ImportError:
43 6c2549d6 Guido Trotter
  import pyinotify
44 6c2549d6 Guido Trotter
45 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
46 e2715f69 Michael Hanselmann
from ganeti import constants
47 f1da30e6 Michael Hanselmann
from ganeti import serializer
48 e2715f69 Michael Hanselmann
from ganeti import workerpool
49 99bd4f0a Guido Trotter
from ganeti import locking
50 f1da30e6 Michael Hanselmann
from ganeti import opcodes
51 7a1ecaed Iustin Pop
from ganeti import errors
52 e2715f69 Michael Hanselmann
from ganeti import mcpu
53 7996a135 Iustin Pop
from ganeti import utils
54 04ab05ce Michael Hanselmann
from ganeti import jstore
55 c3f0a12f Iustin Pop
from ganeti import rpc
56 82b22e19 René Nussbaumer
from ganeti import runtime
57 a744b676 Manuel Franceschini
from ganeti import netutils
58 989a8bee Michael Hanselmann
from ganeti import compat
59 b95479a5 Michael Hanselmann
from ganeti import ht
60 a06c6ae8 Michael Hanselmann
from ganeti import query
61 a06c6ae8 Michael Hanselmann
from ganeti import qlang
62 e2715f69 Michael Hanselmann
63 fbf0262f Michael Hanselmann
64 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
65 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
66 e2715f69 Michael Hanselmann
67 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
68 ebb80afa Guido Trotter
_LOCK = "_lock"
69 ebb80afa Guido Trotter
_QUEUE = "_queue"
70 99bd4f0a Guido Trotter
71 498ae1cc Iustin Pop
72 9728ae5d Iustin Pop
class CancelJob(Exception):
73 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
74 fbf0262f Michael Hanselmann

75 fbf0262f Michael Hanselmann
  """
76 fbf0262f Michael Hanselmann
77 fbf0262f Michael Hanselmann
78 942e2262 Michael Hanselmann
class QueueShutdown(Exception):
79 942e2262 Michael Hanselmann
  """Special exception to abort a job when the job queue is shutting down.
80 942e2262 Michael Hanselmann

81 942e2262 Michael Hanselmann
  """
82 942e2262 Michael Hanselmann
83 942e2262 Michael Hanselmann
84 70552c46 Michael Hanselmann
def TimeStampNow():
85 ea03467c Iustin Pop
  """Returns the current timestamp.
86 ea03467c Iustin Pop

87 ea03467c Iustin Pop
  @rtype: tuple
88 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
89 ea03467c Iustin Pop

90 ea03467c Iustin Pop
  """
91 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
92 70552c46 Michael Hanselmann
93 70552c46 Michael Hanselmann
94 a06c6ae8 Michael Hanselmann
class _SimpleJobQuery:
95 a06c6ae8 Michael Hanselmann
  """Wrapper for job queries.
96 a06c6ae8 Michael Hanselmann

97 a06c6ae8 Michael Hanselmann
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
98 a06c6ae8 Michael Hanselmann

99 a06c6ae8 Michael Hanselmann
  """
100 a06c6ae8 Michael Hanselmann
  def __init__(self, fields):
101 a06c6ae8 Michael Hanselmann
    """Initializes this class.
102 a06c6ae8 Michael Hanselmann

103 a06c6ae8 Michael Hanselmann
    """
104 a06c6ae8 Michael Hanselmann
    self._query = query.Query(query.JOB_FIELDS, fields)
105 a06c6ae8 Michael Hanselmann
106 a06c6ae8 Michael Hanselmann
  def __call__(self, job):
107 a06c6ae8 Michael Hanselmann
    """Executes a job query using cached field list.
108 a06c6ae8 Michael Hanselmann

109 a06c6ae8 Michael Hanselmann
    """
110 a06c6ae8 Michael Hanselmann
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
111 a06c6ae8 Michael Hanselmann
112 a06c6ae8 Michael Hanselmann
113 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
114 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
115 e2715f69 Michael Hanselmann

116 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
117 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
118 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
119 ea03467c Iustin Pop
  @ivar status: the current status
120 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
121 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
122 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
123 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
124 f1048938 Iustin Pop

125 e2715f69 Michael Hanselmann
  """
126 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
127 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
128 66d895a8 Iustin Pop
               "__weakref__"]
129 66d895a8 Iustin Pop
130 85f03e0d Michael Hanselmann
  def __init__(self, op):
131 66abb9ff Michael Hanselmann
    """Initializes instances of this class.
132 ea03467c Iustin Pop

133 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
134 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
135 ea03467c Iustin Pop

136 ea03467c Iustin Pop
    """
137 85f03e0d Michael Hanselmann
    self.input = op
138 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
139 85f03e0d Michael Hanselmann
    self.result = None
140 85f03e0d Michael Hanselmann
    self.log = []
141 70552c46 Michael Hanselmann
    self.start_timestamp = None
142 b9b5abcb Iustin Pop
    self.exec_timestamp = None
143 70552c46 Michael Hanselmann
    self.end_timestamp = None
144 f1da30e6 Michael Hanselmann
145 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
146 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
147 8f5c488d Michael Hanselmann
148 f1da30e6 Michael Hanselmann
  @classmethod
149 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
150 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
151 ea03467c Iustin Pop

152 ea03467c Iustin Pop
    @type state: dict
153 ea03467c Iustin Pop
    @param state: the serialized state
154 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
155 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
156 ea03467c Iustin Pop

157 ea03467c Iustin Pop
    """
158 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
159 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
160 85f03e0d Michael Hanselmann
    obj.status = state["status"]
161 85f03e0d Michael Hanselmann
    obj.result = state["result"]
162 85f03e0d Michael Hanselmann
    obj.log = state["log"]
163 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
164 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
165 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
166 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
167 f1da30e6 Michael Hanselmann
    return obj
168 f1da30e6 Michael Hanselmann
169 f1da30e6 Michael Hanselmann
  def Serialize(self):
170 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
171 ea03467c Iustin Pop

172 ea03467c Iustin Pop
    @rtype: dict
173 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
174 ea03467c Iustin Pop

175 ea03467c Iustin Pop
    """
176 6c5a7090 Michael Hanselmann
    return {
177 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
178 6c5a7090 Michael Hanselmann
      "status": self.status,
179 6c5a7090 Michael Hanselmann
      "result": self.result,
180 6c5a7090 Michael Hanselmann
      "log": self.log,
181 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
182 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
183 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
184 8f5c488d Michael Hanselmann
      "priority": self.priority,
185 6c5a7090 Michael Hanselmann
      }
186 f1048938 Iustin Pop
187 e2715f69 Michael Hanselmann
188 e2715f69 Michael Hanselmann
class _QueuedJob(object):
189 e2715f69 Michael Hanselmann
  """In-memory job representation.
190 e2715f69 Michael Hanselmann

191 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
192 ea03467c Iustin Pop
  be taken care of by users of this class.
193 ea03467c Iustin Pop

194 ea03467c Iustin Pop
  @type queue: L{JobQueue}
195 ea03467c Iustin Pop
  @ivar queue: the parent queue
196 ea03467c Iustin Pop
  @ivar id: the job ID
197 ea03467c Iustin Pop
  @type ops: list
198 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
199 ea03467c Iustin Pop
  @type log_serial: int
200 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
201 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
202 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
203 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
204 c0f6d0d8 Michael Hanselmann
  @ivar writable: Whether the job is allowed to be modified
205 e2715f69 Michael Hanselmann

206 e2715f69 Michael Hanselmann
  """
207 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
208 26d3fd2f Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
209 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
210 c0f6d0d8 Michael Hanselmann
               "__weakref__", "processor_lock", "writable"]
211 66d895a8 Iustin Pop
212 c0f6d0d8 Michael Hanselmann
  def __init__(self, queue, job_id, ops, writable):
213 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
214 ea03467c Iustin Pop

215 ea03467c Iustin Pop
    @type queue: L{JobQueue}
216 ea03467c Iustin Pop
    @param queue: our parent queue
217 ea03467c Iustin Pop
    @type job_id: job_id
218 ea03467c Iustin Pop
    @param job_id: our job id
219 ea03467c Iustin Pop
    @type ops: list
220 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
221 ea03467c Iustin Pop
        in _QueuedOpCodes
222 c0f6d0d8 Michael Hanselmann
    @type writable: bool
223 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
224 ea03467c Iustin Pop

225 ea03467c Iustin Pop
    """
226 e2715f69 Michael Hanselmann
    if not ops:
227 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
228 e2715f69 Michael Hanselmann
229 85f03e0d Michael Hanselmann
    self.queue = queue
230 f1da30e6 Michael Hanselmann
    self.id = job_id
231 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
232 6c5a7090 Michael Hanselmann
    self.log_serial = 0
233 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
234 c56ec146 Iustin Pop
    self.start_timestamp = None
235 c56ec146 Iustin Pop
    self.end_timestamp = None
236 6c5a7090 Michael Hanselmann
237 c0f6d0d8 Michael Hanselmann
    self._InitInMemory(self, writable)
238 fa4aa6b4 Michael Hanselmann
239 fa4aa6b4 Michael Hanselmann
  @staticmethod
240 c0f6d0d8 Michael Hanselmann
  def _InitInMemory(obj, writable):
241 fa4aa6b4 Michael Hanselmann
    """Initializes in-memory variables.
242 fa4aa6b4 Michael Hanselmann

243 fa4aa6b4 Michael Hanselmann
    """
244 c0f6d0d8 Michael Hanselmann
    obj.writable = writable
245 03b63608 Michael Hanselmann
    obj.ops_iter = None
246 26d3fd2f Michael Hanselmann
    obj.cur_opctx = None
247 f8a4adfa Michael Hanselmann
248 f8a4adfa Michael Hanselmann
    # Read-only jobs are not processed and therefore don't need a lock
249 f8a4adfa Michael Hanselmann
    if writable:
250 f8a4adfa Michael Hanselmann
      obj.processor_lock = threading.Lock()
251 f8a4adfa Michael Hanselmann
    else:
252 f8a4adfa Michael Hanselmann
      obj.processor_lock = None
253 be760ba8 Michael Hanselmann
254 9fa2e150 Michael Hanselmann
  def __repr__(self):
255 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
256 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
257 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
258 9fa2e150 Michael Hanselmann
259 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
260 9fa2e150 Michael Hanselmann
261 f1da30e6 Michael Hanselmann
  @classmethod
262 c0f6d0d8 Michael Hanselmann
  def Restore(cls, queue, state, writable):
263 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
264 ea03467c Iustin Pop

265 ea03467c Iustin Pop
    @type queue: L{JobQueue}
266 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
267 ea03467c Iustin Pop
    @type state: dict
268 ea03467c Iustin Pop
    @param state: the serialized state
269 c0f6d0d8 Michael Hanselmann
    @type writable: bool
270 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
271 ea03467c Iustin Pop
    @rtype: _JobQueue
272 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
273 ea03467c Iustin Pop

274 ea03467c Iustin Pop
    """
275 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
276 85f03e0d Michael Hanselmann
    obj.queue = queue
277 85f03e0d Michael Hanselmann
    obj.id = state["id"]
278 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
279 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
280 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
281 6c5a7090 Michael Hanselmann
282 6c5a7090 Michael Hanselmann
    obj.ops = []
283 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
284 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
285 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
286 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
287 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
288 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
289 6c5a7090 Michael Hanselmann
290 c0f6d0d8 Michael Hanselmann
    cls._InitInMemory(obj, writable)
291 be760ba8 Michael Hanselmann
292 f1da30e6 Michael Hanselmann
    return obj
293 f1da30e6 Michael Hanselmann
294 f1da30e6 Michael Hanselmann
  def Serialize(self):
295 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
296 ea03467c Iustin Pop

297 ea03467c Iustin Pop
    @rtype: dict
298 ea03467c Iustin Pop
    @return: the serialized state
299 ea03467c Iustin Pop

300 ea03467c Iustin Pop
    """
301 f1da30e6 Michael Hanselmann
    return {
302 f1da30e6 Michael Hanselmann
      "id": self.id,
303 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
304 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
305 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
306 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
307 f1da30e6 Michael Hanselmann
      }
308 f1da30e6 Michael Hanselmann
309 85f03e0d Michael Hanselmann
  def CalcStatus(self):
310 ea03467c Iustin Pop
    """Compute the status of this job.
311 ea03467c Iustin Pop

312 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
313 ea03467c Iustin Pop
    based on their status, computes the job status.
314 ea03467c Iustin Pop

315 ea03467c Iustin Pop
    The algorithm is:
316 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
317 ea03467c Iustin Pop
        status will be the same
318 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
319 ea03467c Iustin Pop
          - waitlock
320 fbf0262f Michael Hanselmann
          - canceling
321 ea03467c Iustin Pop
          - running
322 ea03467c Iustin Pop

323 ea03467c Iustin Pop
        will determine the job status
324 ea03467c Iustin Pop

325 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
326 ea03467c Iustin Pop
        and the job status will be the same
327 ea03467c Iustin Pop

328 ea03467c Iustin Pop
    @return: the job status
329 ea03467c Iustin Pop

330 ea03467c Iustin Pop
    """
331 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
332 e2715f69 Michael Hanselmann
333 e2715f69 Michael Hanselmann
    all_success = True
334 85f03e0d Michael Hanselmann
    for op in self.ops:
335 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
336 e2715f69 Michael Hanselmann
        continue
337 e2715f69 Michael Hanselmann
338 e2715f69 Michael Hanselmann
      all_success = False
339 e2715f69 Michael Hanselmann
340 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
341 e2715f69 Michael Hanselmann
        pass
342 47099cd1 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING:
343 47099cd1 Michael Hanselmann
        status = constants.JOB_STATUS_WAITING
344 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
345 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
346 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
347 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
348 fbf0262f Michael Hanselmann
        break
349 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
350 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
351 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
352 f1da30e6 Michael Hanselmann
        break
353 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
354 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
355 4cb1d919 Michael Hanselmann
        break
356 e2715f69 Michael Hanselmann
357 e2715f69 Michael Hanselmann
    if all_success:
358 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
359 e2715f69 Michael Hanselmann
360 e2715f69 Michael Hanselmann
    return status
361 e2715f69 Michael Hanselmann
362 8f5c488d Michael Hanselmann
  def CalcPriority(self):
363 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
364 8f5c488d Michael Hanselmann

365 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
366 8f5c488d Michael Hanselmann
    priority is used.
367 8f5c488d Michael Hanselmann

368 8f5c488d Michael Hanselmann
    @rtype: int
369 8f5c488d Michael Hanselmann

370 8f5c488d Michael Hanselmann
    """
371 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
372 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
373 8f5c488d Michael Hanselmann
374 8f5c488d Michael Hanselmann
    if not priorities:
375 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
376 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
377 8f5c488d Michael Hanselmann
378 8f5c488d Michael Hanselmann
    return min(priorities)
379 8f5c488d Michael Hanselmann
380 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
381 ea03467c Iustin Pop
    """Selectively returns the log entries.
382 ea03467c Iustin Pop

383 ea03467c Iustin Pop
    @type newer_than: None or int
384 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
385 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
386 ea03467c Iustin Pop
        than this value
387 ea03467c Iustin Pop
    @rtype: list
388 ea03467c Iustin Pop
    @return: the list of the log entries selected
389 ea03467c Iustin Pop

390 ea03467c Iustin Pop
    """
391 6c5a7090 Michael Hanselmann
    if newer_than is None:
392 6c5a7090 Michael Hanselmann
      serial = -1
393 6c5a7090 Michael Hanselmann
    else:
394 6c5a7090 Michael Hanselmann
      serial = newer_than
395 6c5a7090 Michael Hanselmann
396 6c5a7090 Michael Hanselmann
    entries = []
397 6c5a7090 Michael Hanselmann
    for op in self.ops:
398 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
399 6c5a7090 Michael Hanselmann
400 6c5a7090 Michael Hanselmann
    return entries
401 6c5a7090 Michael Hanselmann
402 6a290889 Guido Trotter
  def GetInfo(self, fields):
403 6a290889 Guido Trotter
    """Returns information about a job.
404 6a290889 Guido Trotter

405 6a290889 Guido Trotter
    @type fields: list
406 6a290889 Guido Trotter
    @param fields: names of fields to return
407 6a290889 Guido Trotter
    @rtype: list
408 6a290889 Guido Trotter
    @return: list with one element for each field
409 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
410 6a290889 Guido Trotter
        has been passed
411 6a290889 Guido Trotter

412 6a290889 Guido Trotter
    """
413 a06c6ae8 Michael Hanselmann
    return _SimpleJobQuery(fields)(self)
414 6a290889 Guido Trotter
415 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
416 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
417 34327f51 Iustin Pop

418 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
419 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
420 34327f51 Iustin Pop
    finalised are not changed.
421 34327f51 Iustin Pop

422 34327f51 Iustin Pop
    @param status: a given opcode status
423 34327f51 Iustin Pop
    @param result: the opcode result
424 34327f51 Iustin Pop

425 34327f51 Iustin Pop
    """
426 747f6113 Michael Hanselmann
    not_marked = True
427 747f6113 Michael Hanselmann
    for op in self.ops:
428 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
429 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
430 747f6113 Michael Hanselmann
        continue
431 747f6113 Michael Hanselmann
      op.status = status
432 747f6113 Michael Hanselmann
      op.result = result
433 747f6113 Michael Hanselmann
      not_marked = False
434 34327f51 Iustin Pop
435 66bd7445 Michael Hanselmann
  def Finalize(self):
436 66bd7445 Michael Hanselmann
    """Marks the job as finalized.
437 66bd7445 Michael Hanselmann

438 66bd7445 Michael Hanselmann
    """
439 66bd7445 Michael Hanselmann
    self.end_timestamp = TimeStampNow()
440 66bd7445 Michael Hanselmann
441 099b2870 Michael Hanselmann
  def Cancel(self):
442 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
443 a0d2fe2c Michael Hanselmann

444 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
445 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
446 a0d2fe2c Michael Hanselmann
      as canceling and a text message
447 a0d2fe2c Michael Hanselmann

448 a0d2fe2c Michael Hanselmann
    """
449 099b2870 Michael Hanselmann
    status = self.CalcStatus()
450 099b2870 Michael Hanselmann
451 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
452 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
453 099b2870 Michael Hanselmann
                             "Job canceled by request")
454 66bd7445 Michael Hanselmann
      self.Finalize()
455 86b16e9d Michael Hanselmann
      return (True, "Job %s canceled" % self.id)
456 099b2870 Michael Hanselmann
457 47099cd1 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITING:
458 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
459 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
460 86b16e9d Michael Hanselmann
      return (True, "Job %s will be canceled" % self.id)
461 099b2870 Michael Hanselmann
462 86b16e9d Michael Hanselmann
    else:
463 86b16e9d Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
464 86b16e9d Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
465 099b2870 Michael Hanselmann
466 f1048938 Iustin Pop
467 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
468 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
469 031a3e57 Michael Hanselmann
    """Initializes this class.
470 ea03467c Iustin Pop

471 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
472 031a3e57 Michael Hanselmann
    @param queue: Job queue
473 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
474 031a3e57 Michael Hanselmann
    @param job: Job object
475 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
476 031a3e57 Michael Hanselmann
    @param op: OpCode
477 031a3e57 Michael Hanselmann

478 031a3e57 Michael Hanselmann
    """
479 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
480 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
481 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
482 031a3e57 Michael Hanselmann
483 031a3e57 Michael Hanselmann
    self._queue = queue
484 031a3e57 Michael Hanselmann
    self._job = job
485 031a3e57 Michael Hanselmann
    self._op = op
486 031a3e57 Michael Hanselmann
487 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
488 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
489 dc1e2262 Michael Hanselmann

490 dc1e2262 Michael Hanselmann
    """
491 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
492 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
493 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
494 dc1e2262 Michael Hanselmann
      raise CancelJob()
495 dc1e2262 Michael Hanselmann
496 942e2262 Michael Hanselmann
    # See if queue is shutting down
497 942e2262 Michael Hanselmann
    if not self._queue.AcceptingJobsUnlocked():
498 942e2262 Michael Hanselmann
      logging.debug("Queue is shutting down")
499 942e2262 Michael Hanselmann
      raise QueueShutdown()
500 942e2262 Michael Hanselmann
501 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
502 031a3e57 Michael Hanselmann
  def NotifyStart(self):
503 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
504 e92376d7 Iustin Pop

505 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
506 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
507 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
508 47099cd1 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
509 e92376d7 Iustin Pop

510 e92376d7 Iustin Pop
    """
511 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
512 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
513 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
514 fbf0262f Michael Hanselmann
515 271daef8 Iustin Pop
    # Cancel here if we were asked to
516 dc1e2262 Michael Hanselmann
    self._CheckCancel()
517 fbf0262f Michael Hanselmann
518 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
519 9bdab621 Michael Hanselmann
520 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
521 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
522 271daef8 Iustin Pop
523 271daef8 Iustin Pop
    # And finally replicate the job status
524 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
525 031a3e57 Michael Hanselmann
526 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
527 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
528 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
529 9bf5e01f Guido Trotter

530 9bf5e01f Guido Trotter
    """
531 9bf5e01f Guido Trotter
    self._job.log_serial += 1
532 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
533 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
534 9bf5e01f Guido Trotter
535 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
536 031a3e57 Michael Hanselmann
    """Append a log entry.
537 031a3e57 Michael Hanselmann

538 031a3e57 Michael Hanselmann
    """
539 031a3e57 Michael Hanselmann
    assert len(args) < 3
540 031a3e57 Michael Hanselmann
541 031a3e57 Michael Hanselmann
    if len(args) == 1:
542 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
543 031a3e57 Michael Hanselmann
      log_msg = args[0]
544 031a3e57 Michael Hanselmann
    else:
545 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
546 031a3e57 Michael Hanselmann
547 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
548 031a3e57 Michael Hanselmann
    # precision.
549 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
550 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
551 031a3e57 Michael Hanselmann
552 acf931b7 Michael Hanselmann
  def CheckCancel(self):
553 acf931b7 Michael Hanselmann
    """Check whether job has been cancelled.
554 ef2df7d3 Michael Hanselmann

555 ef2df7d3 Michael Hanselmann
    """
556 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
557 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
558 dc1e2262 Michael Hanselmann
559 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
560 dc1e2262 Michael Hanselmann
    self._CheckCancel()
561 dc1e2262 Michael Hanselmann
562 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
563 6a373640 Michael Hanselmann
    """Submits jobs for processing.
564 6a373640 Michael Hanselmann

565 6a373640 Michael Hanselmann
    See L{JobQueue.SubmitManyJobs}.
566 6a373640 Michael Hanselmann

567 6a373640 Michael Hanselmann
    """
568 6a373640 Michael Hanselmann
    # Locking is done in job queue
569 6a373640 Michael Hanselmann
    return self._queue.SubmitManyJobs(jobs)
570 6a373640 Michael Hanselmann
571 031a3e57 Michael Hanselmann
572 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
573 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
574 989a8bee Michael Hanselmann
    """Initializes this class.
575 6c2549d6 Guido Trotter

576 989a8bee Michael Hanselmann
    @type fields: list of strings
577 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
578 989a8bee Michael Hanselmann
    @type prev_job_info: string
579 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
580 989a8bee Michael Hanselmann
    @type prev_log_serial: string
581 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
582 6c2549d6 Guido Trotter

583 989a8bee Michael Hanselmann
    """
584 dc2879ea Michael Hanselmann
    self._squery = _SimpleJobQuery(fields)
585 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
586 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
587 6c2549d6 Guido Trotter
588 989a8bee Michael Hanselmann
  def __call__(self, job):
589 989a8bee Michael Hanselmann
    """Checks whether job has changed.
590 6c2549d6 Guido Trotter

591 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
592 989a8bee Michael Hanselmann
    @param job: Job object
593 6c2549d6 Guido Trotter

594 6c2549d6 Guido Trotter
    """
595 c0f6d0d8 Michael Hanselmann
    assert not job.writable, "Expected read-only job"
596 c0f6d0d8 Michael Hanselmann
597 989a8bee Michael Hanselmann
    status = job.CalcStatus()
598 dc2879ea Michael Hanselmann
    job_info = self._squery(job)
599 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
600 6c2549d6 Guido Trotter
601 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
602 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
603 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
604 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
605 6c2549d6 Guido Trotter
    # significantly different.
606 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
607 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
608 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
609 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
610 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
611 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
612 6c2549d6 Guido Trotter
613 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
614 6c2549d6 Guido Trotter
    # no changes.
615 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
616 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
617 47099cd1 Michael Hanselmann
                       constants.JOB_STATUS_WAITING) or
618 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
619 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
620 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
621 989a8bee Michael Hanselmann
      return (job_info, log_entries)
622 6c2549d6 Guido Trotter
623 989a8bee Michael Hanselmann
    return None
624 989a8bee Michael Hanselmann
625 989a8bee Michael Hanselmann
626 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
627 989a8bee Michael Hanselmann
  def __init__(self, filename):
628 989a8bee Michael Hanselmann
    """Initializes this class.
629 989a8bee Michael Hanselmann

630 989a8bee Michael Hanselmann
    @type filename: string
631 989a8bee Michael Hanselmann
    @param filename: Path to job file
632 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
633 6c2549d6 Guido Trotter

634 989a8bee Michael Hanselmann
    """
635 989a8bee Michael Hanselmann
    self._wm = pyinotify.WatchManager()
636 989a8bee Michael Hanselmann
    self._inotify_handler = \
637 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
638 989a8bee Michael Hanselmann
    self._notifier = \
639 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
640 989a8bee Michael Hanselmann
    try:
641 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
642 989a8bee Michael Hanselmann
    except Exception:
643 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
644 989a8bee Michael Hanselmann
      self._notifier.stop()
645 989a8bee Michael Hanselmann
      raise
646 989a8bee Michael Hanselmann
647 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
648 989a8bee Michael Hanselmann
    """Callback for inotify.
649 989a8bee Michael Hanselmann

650 989a8bee Michael Hanselmann
    """
651 6c2549d6 Guido Trotter
    if not notifier_enabled:
652 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
653 989a8bee Michael Hanselmann
654 989a8bee Michael Hanselmann
  def Wait(self, timeout):
655 989a8bee Michael Hanselmann
    """Waits for the job file to change.
656 989a8bee Michael Hanselmann

657 989a8bee Michael Hanselmann
    @type timeout: float
658 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
659 989a8bee Michael Hanselmann
    @return: Whether there have been events
660 989a8bee Michael Hanselmann

661 989a8bee Michael Hanselmann
    """
662 989a8bee Michael Hanselmann
    assert timeout >= 0
663 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
664 989a8bee Michael Hanselmann
    if have_events:
665 989a8bee Michael Hanselmann
      self._notifier.read_events()
666 989a8bee Michael Hanselmann
    self._notifier.process_events()
667 989a8bee Michael Hanselmann
    return have_events
668 989a8bee Michael Hanselmann
669 989a8bee Michael Hanselmann
  def Close(self):
670 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
671 989a8bee Michael Hanselmann

672 989a8bee Michael Hanselmann
    """
673 989a8bee Michael Hanselmann
    self._notifier.stop()
674 989a8bee Michael Hanselmann
675 989a8bee Michael Hanselmann
676 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
677 989a8bee Michael Hanselmann
  def __init__(self, filename):
678 989a8bee Michael Hanselmann
    """Initializes this class.
679 989a8bee Michael Hanselmann

680 989a8bee Michael Hanselmann
    @type filename: string
681 989a8bee Michael Hanselmann
    @param filename: Path to job file
682 989a8bee Michael Hanselmann

683 989a8bee Michael Hanselmann
    """
684 989a8bee Michael Hanselmann
    self._filewaiter = None
685 989a8bee Michael Hanselmann
    self._filename = filename
686 6c2549d6 Guido Trotter
687 989a8bee Michael Hanselmann
  def Wait(self, timeout):
688 989a8bee Michael Hanselmann
    """Waits for a job to change.
689 6c2549d6 Guido Trotter

690 989a8bee Michael Hanselmann
    @type timeout: float
691 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
692 989a8bee Michael Hanselmann
    @return: Whether there have been events
693 989a8bee Michael Hanselmann

694 989a8bee Michael Hanselmann
    """
695 989a8bee Michael Hanselmann
    if self._filewaiter:
696 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
697 989a8bee Michael Hanselmann
698 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
699 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
700 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
701 989a8bee Michael Hanselmann
    # race condition.
702 989a8bee Michael Hanselmann
    self._filewaiter = _JobFileChangesWaiter(self._filename)
703 989a8bee Michael Hanselmann
704 989a8bee Michael Hanselmann
    return True
705 989a8bee Michael Hanselmann
706 989a8bee Michael Hanselmann
  def Close(self):
707 989a8bee Michael Hanselmann
    """Closes underlying waiter.
708 989a8bee Michael Hanselmann

709 989a8bee Michael Hanselmann
    """
710 989a8bee Michael Hanselmann
    if self._filewaiter:
711 989a8bee Michael Hanselmann
      self._filewaiter.Close()
712 989a8bee Michael Hanselmann
713 989a8bee Michael Hanselmann
714 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
715 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
716 989a8bee Michael Hanselmann

717 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
718 989a8bee Michael Hanselmann
  the current job status has changed.
719 989a8bee Michael Hanselmann

720 989a8bee Michael Hanselmann
  """
721 989a8bee Michael Hanselmann
  @staticmethod
722 dfc8824a Michael Hanselmann
  def _CheckForChanges(counter, job_load_fn, check_fn):
723 dfc8824a Michael Hanselmann
    if counter.next() > 0:
724 dfc8824a Michael Hanselmann
      # If this isn't the first check the job is given some more time to change
725 dfc8824a Michael Hanselmann
      # again. This gives better performance for jobs generating many
726 dfc8824a Michael Hanselmann
      # changes/messages.
727 dfc8824a Michael Hanselmann
      time.sleep(0.1)
728 dfc8824a Michael Hanselmann
729 989a8bee Michael Hanselmann
    job = job_load_fn()
730 989a8bee Michael Hanselmann
    if not job:
731 989a8bee Michael Hanselmann
      raise errors.JobLost()
732 989a8bee Michael Hanselmann
733 989a8bee Michael Hanselmann
    result = check_fn(job)
734 989a8bee Michael Hanselmann
    if result is None:
735 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
736 989a8bee Michael Hanselmann
737 989a8bee Michael Hanselmann
    return result
738 989a8bee Michael Hanselmann
739 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
740 989a8bee Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout):
741 989a8bee Michael Hanselmann
    """Waits for changes on a job.
742 989a8bee Michael Hanselmann

743 989a8bee Michael Hanselmann
    @type filename: string
744 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
745 989a8bee Michael Hanselmann
    @type job_load_fn: callable
746 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
747 989a8bee Michael Hanselmann
    @type fields: list of strings
748 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
749 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
750 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
751 989a8bee Michael Hanselmann
    @type prev_log_serial: int
752 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
753 989a8bee Michael Hanselmann
    @type timeout: float
754 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
755 989a8bee Michael Hanselmann

756 989a8bee Michael Hanselmann
    """
757 dfc8824a Michael Hanselmann
    counter = itertools.count()
758 6c2549d6 Guido Trotter
    try:
759 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
760 989a8bee Michael Hanselmann
      waiter = _JobChangesWaiter(filename)
761 989a8bee Michael Hanselmann
      try:
762 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
763 dfc8824a Michael Hanselmann
                                          counter, job_load_fn, check_fn),
764 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
765 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
766 989a8bee Michael Hanselmann
      finally:
767 989a8bee Michael Hanselmann
        waiter.Close()
768 6c2549d6 Guido Trotter
    except (errors.InotifyError, errors.JobLost):
769 6c2549d6 Guido Trotter
      return None
770 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
771 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
772 6c2549d6 Guido Trotter
773 6c2549d6 Guido Trotter
774 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
775 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
776 6760e4ed Michael Hanselmann

777 6760e4ed Michael Hanselmann
  """
778 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
779 6760e4ed Michael Hanselmann
    to_encode = err
780 6760e4ed Michael Hanselmann
  else:
781 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
782 6760e4ed Michael Hanselmann
783 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
784 6760e4ed Michael Hanselmann
785 6760e4ed Michael Hanselmann
786 26d3fd2f Michael Hanselmann
class _TimeoutStrategyWrapper:
787 26d3fd2f Michael Hanselmann
  def __init__(self, fn):
788 26d3fd2f Michael Hanselmann
    """Initializes this class.
789 26d3fd2f Michael Hanselmann

790 26d3fd2f Michael Hanselmann
    """
791 26d3fd2f Michael Hanselmann
    self._fn = fn
792 26d3fd2f Michael Hanselmann
    self._next = None
793 26d3fd2f Michael Hanselmann
794 26d3fd2f Michael Hanselmann
  def _Advance(self):
795 26d3fd2f Michael Hanselmann
    """Gets the next timeout if necessary.
796 26d3fd2f Michael Hanselmann

797 26d3fd2f Michael Hanselmann
    """
798 26d3fd2f Michael Hanselmann
    if self._next is None:
799 26d3fd2f Michael Hanselmann
      self._next = self._fn()
800 26d3fd2f Michael Hanselmann
801 26d3fd2f Michael Hanselmann
  def Peek(self):
802 26d3fd2f Michael Hanselmann
    """Returns the next timeout.
803 26d3fd2f Michael Hanselmann

804 26d3fd2f Michael Hanselmann
    """
805 26d3fd2f Michael Hanselmann
    self._Advance()
806 26d3fd2f Michael Hanselmann
    return self._next
807 26d3fd2f Michael Hanselmann
808 26d3fd2f Michael Hanselmann
  def Next(self):
809 26d3fd2f Michael Hanselmann
    """Returns the current timeout and advances the internal state.
810 26d3fd2f Michael Hanselmann

811 26d3fd2f Michael Hanselmann
    """
812 26d3fd2f Michael Hanselmann
    self._Advance()
813 26d3fd2f Michael Hanselmann
    result = self._next
814 26d3fd2f Michael Hanselmann
    self._next = None
815 26d3fd2f Michael Hanselmann
    return result
816 26d3fd2f Michael Hanselmann
817 26d3fd2f Michael Hanselmann
818 b80cc518 Michael Hanselmann
class _OpExecContext:
819 26d3fd2f Michael Hanselmann
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
820 b80cc518 Michael Hanselmann
    """Initializes this class.
821 b80cc518 Michael Hanselmann

822 b80cc518 Michael Hanselmann
    """
823 b80cc518 Michael Hanselmann
    self.op = op
824 b80cc518 Michael Hanselmann
    self.index = index
825 b80cc518 Michael Hanselmann
    self.log_prefix = log_prefix
826 b80cc518 Michael Hanselmann
    self.summary = op.input.Summary()
827 b80cc518 Michael Hanselmann
828 b95479a5 Michael Hanselmann
    # Create local copy to modify
829 b95479a5 Michael Hanselmann
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
830 b95479a5 Michael Hanselmann
      self.jobdeps = op.input.depends[:]
831 b95479a5 Michael Hanselmann
    else:
832 b95479a5 Michael Hanselmann
      self.jobdeps = None
833 b95479a5 Michael Hanselmann
834 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = timeout_strategy_factory
835 26d3fd2f Michael Hanselmann
    self._ResetTimeoutStrategy()
836 26d3fd2f Michael Hanselmann
837 26d3fd2f Michael Hanselmann
  def _ResetTimeoutStrategy(self):
838 26d3fd2f Michael Hanselmann
    """Creates a new timeout strategy.
839 26d3fd2f Michael Hanselmann

840 26d3fd2f Michael Hanselmann
    """
841 26d3fd2f Michael Hanselmann
    self._timeout_strategy = \
842 26d3fd2f Michael Hanselmann
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
843 26d3fd2f Michael Hanselmann
844 26d3fd2f Michael Hanselmann
  def CheckPriorityIncrease(self):
845 26d3fd2f Michael Hanselmann
    """Checks whether priority can and should be increased.
846 26d3fd2f Michael Hanselmann

847 26d3fd2f Michael Hanselmann
    Called when locks couldn't be acquired.
848 26d3fd2f Michael Hanselmann

849 26d3fd2f Michael Hanselmann
    """
850 26d3fd2f Michael Hanselmann
    op = self.op
851 26d3fd2f Michael Hanselmann
852 26d3fd2f Michael Hanselmann
    # Exhausted all retries and next round should not use blocking acquire
853 26d3fd2f Michael Hanselmann
    # for locks?
854 26d3fd2f Michael Hanselmann
    if (self._timeout_strategy.Peek() is None and
855 26d3fd2f Michael Hanselmann
        op.priority > constants.OP_PRIO_HIGHEST):
856 26d3fd2f Michael Hanselmann
      logging.debug("Increasing priority")
857 26d3fd2f Michael Hanselmann
      op.priority -= 1
858 26d3fd2f Michael Hanselmann
      self._ResetTimeoutStrategy()
859 26d3fd2f Michael Hanselmann
      return True
860 26d3fd2f Michael Hanselmann
861 26d3fd2f Michael Hanselmann
    return False
862 26d3fd2f Michael Hanselmann
863 26d3fd2f Michael Hanselmann
  def GetNextLockTimeout(self):
864 26d3fd2f Michael Hanselmann
    """Returns the next lock acquire timeout.
865 26d3fd2f Michael Hanselmann

866 26d3fd2f Michael Hanselmann
    """
867 26d3fd2f Michael Hanselmann
    return self._timeout_strategy.Next()
868 26d3fd2f Michael Hanselmann
869 b80cc518 Michael Hanselmann
870 be760ba8 Michael Hanselmann
class _JobProcessor(object):
871 75d81fc8 Michael Hanselmann
  (DEFER,
872 75d81fc8 Michael Hanselmann
   WAITDEP,
873 75d81fc8 Michael Hanselmann
   FINISHED) = range(1, 4)
874 75d81fc8 Michael Hanselmann
875 26d3fd2f Michael Hanselmann
  def __init__(self, queue, opexec_fn, job,
876 26d3fd2f Michael Hanselmann
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
877 be760ba8 Michael Hanselmann
    """Initializes this class.
878 be760ba8 Michael Hanselmann

879 be760ba8 Michael Hanselmann
    """
880 be760ba8 Michael Hanselmann
    self.queue = queue
881 be760ba8 Michael Hanselmann
    self.opexec_fn = opexec_fn
882 be760ba8 Michael Hanselmann
    self.job = job
883 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = _timeout_strategy_factory
884 be760ba8 Michael Hanselmann
885 be760ba8 Michael Hanselmann
  @staticmethod
886 26d3fd2f Michael Hanselmann
  def _FindNextOpcode(job, timeout_strategy_factory):
887 be760ba8 Michael Hanselmann
    """Locates the next opcode to run.
888 be760ba8 Michael Hanselmann

889 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
890 be760ba8 Michael Hanselmann
    @param job: Job object
891 26d3fd2f Michael Hanselmann
    @param timeout_strategy_factory: Callable to create new timeout strategy
892 be760ba8 Michael Hanselmann

893 be760ba8 Michael Hanselmann
    """
894 be760ba8 Michael Hanselmann
    # Create some sort of a cache to speed up locating next opcode for future
895 be760ba8 Michael Hanselmann
    # lookups
896 be760ba8 Michael Hanselmann
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
897 be760ba8 Michael Hanselmann
    # pending and one for processed ops.
898 03b63608 Michael Hanselmann
    if job.ops_iter is None:
899 03b63608 Michael Hanselmann
      job.ops_iter = enumerate(job.ops)
900 be760ba8 Michael Hanselmann
901 be760ba8 Michael Hanselmann
    # Find next opcode to run
902 be760ba8 Michael Hanselmann
    while True:
903 be760ba8 Michael Hanselmann
      try:
904 03b63608 Michael Hanselmann
        (idx, op) = job.ops_iter.next()
905 be760ba8 Michael Hanselmann
      except StopIteration:
906 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for a finished job")
907 be760ba8 Michael Hanselmann
908 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_RUNNING:
909 be760ba8 Michael Hanselmann
        # Found an opcode already marked as running
910 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for job marked as running")
911 be760ba8 Michael Hanselmann
912 26d3fd2f Michael Hanselmann
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
913 26d3fd2f Michael Hanselmann
                             timeout_strategy_factory)
914 be760ba8 Michael Hanselmann
915 66bd7445 Michael Hanselmann
      if op.status not in constants.OPS_FINALIZED:
916 66bd7445 Michael Hanselmann
        return opctx
917 be760ba8 Michael Hanselmann
918 66bd7445 Michael Hanselmann
      # This is a job that was partially completed before master daemon
919 66bd7445 Michael Hanselmann
      # shutdown, so it can be expected that some opcodes are already
920 66bd7445 Michael Hanselmann
      # completed successfully (if any did error out, then the whole job
921 66bd7445 Michael Hanselmann
      # should have been aborted and not resubmitted for processing).
922 66bd7445 Michael Hanselmann
      logging.info("%s: opcode %s already processed, skipping",
923 66bd7445 Michael Hanselmann
                   opctx.log_prefix, opctx.summary)
924 be760ba8 Michael Hanselmann
925 be760ba8 Michael Hanselmann
  @staticmethod
926 be760ba8 Michael Hanselmann
  def _MarkWaitlock(job, op):
927 be760ba8 Michael Hanselmann
    """Marks an opcode as waiting for locks.
928 be760ba8 Michael Hanselmann

929 be760ba8 Michael Hanselmann
    The job's start timestamp is also set if necessary.
930 be760ba8 Michael Hanselmann

931 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
932 be760ba8 Michael Hanselmann
    @param job: Job object
933 a38e8674 Michael Hanselmann
    @type op: L{_QueuedOpCode}
934 a38e8674 Michael Hanselmann
    @param op: Opcode object
935 be760ba8 Michael Hanselmann

936 be760ba8 Michael Hanselmann
    """
937 be760ba8 Michael Hanselmann
    assert op in job.ops
938 5fd6b694 Michael Hanselmann
    assert op.status in (constants.OP_STATUS_QUEUED,
939 47099cd1 Michael Hanselmann
                         constants.OP_STATUS_WAITING)
940 5fd6b694 Michael Hanselmann
941 5fd6b694 Michael Hanselmann
    update = False
942 be760ba8 Michael Hanselmann
943 be760ba8 Michael Hanselmann
    op.result = None
944 5fd6b694 Michael Hanselmann
945 5fd6b694 Michael Hanselmann
    if op.status == constants.OP_STATUS_QUEUED:
946 47099cd1 Michael Hanselmann
      op.status = constants.OP_STATUS_WAITING
947 5fd6b694 Michael Hanselmann
      update = True
948 5fd6b694 Michael Hanselmann
949 5fd6b694 Michael Hanselmann
    if op.start_timestamp is None:
950 5fd6b694 Michael Hanselmann
      op.start_timestamp = TimeStampNow()
951 5fd6b694 Michael Hanselmann
      update = True
952 be760ba8 Michael Hanselmann
953 be760ba8 Michael Hanselmann
    if job.start_timestamp is None:
954 be760ba8 Michael Hanselmann
      job.start_timestamp = op.start_timestamp
955 5fd6b694 Michael Hanselmann
      update = True
956 5fd6b694 Michael Hanselmann
957 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
958 5fd6b694 Michael Hanselmann
959 5fd6b694 Michael Hanselmann
    return update
960 be760ba8 Michael Hanselmann
961 b95479a5 Michael Hanselmann
  @staticmethod
962 b95479a5 Michael Hanselmann
  def _CheckDependencies(queue, job, opctx):
963 b95479a5 Michael Hanselmann
    """Checks if an opcode has dependencies and if so, processes them.
964 b95479a5 Michael Hanselmann

965 b95479a5 Michael Hanselmann
    @type queue: L{JobQueue}
966 b95479a5 Michael Hanselmann
    @param queue: Queue object
967 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
968 b95479a5 Michael Hanselmann
    @param job: Job object
969 b95479a5 Michael Hanselmann
    @type opctx: L{_OpExecContext}
970 b95479a5 Michael Hanselmann
    @param opctx: Opcode execution context
971 b95479a5 Michael Hanselmann
    @rtype: bool
972 b95479a5 Michael Hanselmann
    @return: Whether opcode will be re-scheduled by dependency tracker
973 b95479a5 Michael Hanselmann

974 b95479a5 Michael Hanselmann
    """
975 b95479a5 Michael Hanselmann
    op = opctx.op
976 b95479a5 Michael Hanselmann
977 b95479a5 Michael Hanselmann
    result = False
978 b95479a5 Michael Hanselmann
979 b95479a5 Michael Hanselmann
    while opctx.jobdeps:
980 b95479a5 Michael Hanselmann
      (dep_job_id, dep_status) = opctx.jobdeps[0]
981 b95479a5 Michael Hanselmann
982 b95479a5 Michael Hanselmann
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
983 b95479a5 Michael Hanselmann
                                                          dep_status)
984 b95479a5 Michael Hanselmann
      assert ht.TNonEmptyString(depmsg), "No dependency message"
985 b95479a5 Michael Hanselmann
986 b95479a5 Michael Hanselmann
      logging.info("%s: %s", opctx.log_prefix, depmsg)
987 b95479a5 Michael Hanselmann
988 b95479a5 Michael Hanselmann
      if depresult == _JobDependencyManager.CONTINUE:
989 b95479a5 Michael Hanselmann
        # Remove dependency and continue
990 b95479a5 Michael Hanselmann
        opctx.jobdeps.pop(0)
991 b95479a5 Michael Hanselmann
992 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.WAIT:
993 b95479a5 Michael Hanselmann
        # Need to wait for notification, dependency tracker will re-add job
994 b95479a5 Michael Hanselmann
        # to workerpool
995 b95479a5 Michael Hanselmann
        result = True
996 b95479a5 Michael Hanselmann
        break
997 b95479a5 Michael Hanselmann
998 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.CANCEL:
999 b95479a5 Michael Hanselmann
        # Job was cancelled, cancel this job as well
1000 b95479a5 Michael Hanselmann
        job.Cancel()
1001 b95479a5 Michael Hanselmann
        assert op.status == constants.OP_STATUS_CANCELING
1002 b95479a5 Michael Hanselmann
        break
1003 b95479a5 Michael Hanselmann
1004 b95479a5 Michael Hanselmann
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1005 b95479a5 Michael Hanselmann
                         _JobDependencyManager.ERROR):
1006 b95479a5 Michael Hanselmann
        # Job failed or there was an error, this job must fail
1007 b95479a5 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1008 b95479a5 Michael Hanselmann
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1009 b95479a5 Michael Hanselmann
        break
1010 b95479a5 Michael Hanselmann
1011 b95479a5 Michael Hanselmann
      else:
1012 b95479a5 Michael Hanselmann
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1013 b95479a5 Michael Hanselmann
                                     depresult)
1014 b95479a5 Michael Hanselmann
1015 b95479a5 Michael Hanselmann
    return result
1016 b95479a5 Michael Hanselmann
1017 b80cc518 Michael Hanselmann
  def _ExecOpCodeUnlocked(self, opctx):
1018 be760ba8 Michael Hanselmann
    """Processes one opcode and returns the result.
1019 be760ba8 Michael Hanselmann

1020 be760ba8 Michael Hanselmann
    """
1021 b80cc518 Michael Hanselmann
    op = opctx.op
1022 b80cc518 Michael Hanselmann
1023 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1024 be760ba8 Michael Hanselmann
1025 26d3fd2f Michael Hanselmann
    timeout = opctx.GetNextLockTimeout()
1026 26d3fd2f Michael Hanselmann
1027 be760ba8 Michael Hanselmann
    try:
1028 be760ba8 Michael Hanselmann
      # Make sure not to hold queue lock while calling ExecOpCode
1029 be760ba8 Michael Hanselmann
      result = self.opexec_fn(op.input,
1030 26d3fd2f Michael Hanselmann
                              _OpExecCallbacks(self.queue, self.job, op),
1031 f23db633 Michael Hanselmann
                              timeout=timeout, priority=op.priority)
1032 26d3fd2f Michael Hanselmann
    except mcpu.LockAcquireTimeout:
1033 26d3fd2f Michael Hanselmann
      assert timeout is not None, "Received timeout for blocking acquire"
1034 26d3fd2f Michael Hanselmann
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1035 9e49dfc5 Michael Hanselmann
1036 47099cd1 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_WAITING,
1037 9e49dfc5 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1038 9e49dfc5 Michael Hanselmann
1039 9e49dfc5 Michael Hanselmann
      # Was job cancelled while we were waiting for the lock?
1040 9e49dfc5 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELING:
1041 9e49dfc5 Michael Hanselmann
        return (constants.OP_STATUS_CANCELING, None)
1042 9e49dfc5 Michael Hanselmann
1043 942e2262 Michael Hanselmann
      # Queue is shutting down, return to queued
1044 942e2262 Michael Hanselmann
      if not self.queue.AcceptingJobsUnlocked():
1045 942e2262 Michael Hanselmann
        return (constants.OP_STATUS_QUEUED, None)
1046 942e2262 Michael Hanselmann
1047 5fd6b694 Michael Hanselmann
      # Stay in waitlock while trying to re-acquire lock
1048 47099cd1 Michael Hanselmann
      return (constants.OP_STATUS_WAITING, None)
1049 be760ba8 Michael Hanselmann
    except CancelJob:
1050 b80cc518 Michael Hanselmann
      logging.exception("%s: Canceling job", opctx.log_prefix)
1051 be760ba8 Michael Hanselmann
      assert op.status == constants.OP_STATUS_CANCELING
1052 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_CANCELING, None)
1053 942e2262 Michael Hanselmann
1054 942e2262 Michael Hanselmann
    except QueueShutdown:
1055 942e2262 Michael Hanselmann
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1056 942e2262 Michael Hanselmann
1057 942e2262 Michael Hanselmann
      assert op.status == constants.OP_STATUS_WAITING
1058 942e2262 Michael Hanselmann
1059 942e2262 Michael Hanselmann
      # Job hadn't been started yet, so it should return to the queue
1060 942e2262 Michael Hanselmann
      return (constants.OP_STATUS_QUEUED, None)
1061 942e2262 Michael Hanselmann
1062 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1063 b80cc518 Michael Hanselmann
      logging.exception("%s: Caught exception in %s",
1064 b80cc518 Michael Hanselmann
                        opctx.log_prefix, opctx.summary)
1065 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1066 be760ba8 Michael Hanselmann
    else:
1067 b80cc518 Michael Hanselmann
      logging.debug("%s: %s successful",
1068 b80cc518 Michael Hanselmann
                    opctx.log_prefix, opctx.summary)
1069 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_SUCCESS, result)
1070 be760ba8 Michael Hanselmann
1071 26d3fd2f Michael Hanselmann
  def __call__(self, _nextop_fn=None):
1072 be760ba8 Michael Hanselmann
    """Continues execution of a job.
1073 be760ba8 Michael Hanselmann

1074 26d3fd2f Michael Hanselmann
    @param _nextop_fn: Callback function for tests
1075 75d81fc8 Michael Hanselmann
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1076 75d81fc8 Michael Hanselmann
      be deferred and C{WAITDEP} if the dependency manager
1077 75d81fc8 Michael Hanselmann
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1078 be760ba8 Michael Hanselmann

1079 be760ba8 Michael Hanselmann
    """
1080 be760ba8 Michael Hanselmann
    queue = self.queue
1081 be760ba8 Michael Hanselmann
    job = self.job
1082 be760ba8 Michael Hanselmann
1083 be760ba8 Michael Hanselmann
    logging.debug("Processing job %s", job.id)
1084 be760ba8 Michael Hanselmann
1085 be760ba8 Michael Hanselmann
    queue.acquire(shared=1)
1086 be760ba8 Michael Hanselmann
    try:
1087 be760ba8 Michael Hanselmann
      opcount = len(job.ops)
1088 be760ba8 Michael Hanselmann
1089 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Expected writable job"
1090 c0f6d0d8 Michael Hanselmann
1091 66bd7445 Michael Hanselmann
      # Don't do anything for finalized jobs
1092 66bd7445 Michael Hanselmann
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1093 75d81fc8 Michael Hanselmann
        return self.FINISHED
1094 66bd7445 Michael Hanselmann
1095 26d3fd2f Michael Hanselmann
      # Is a previous opcode still pending?
1096 26d3fd2f Michael Hanselmann
      if job.cur_opctx:
1097 26d3fd2f Michael Hanselmann
        opctx = job.cur_opctx
1098 5fd6b694 Michael Hanselmann
        job.cur_opctx = None
1099 26d3fd2f Michael Hanselmann
      else:
1100 26d3fd2f Michael Hanselmann
        if __debug__ and _nextop_fn:
1101 26d3fd2f Michael Hanselmann
          _nextop_fn()
1102 26d3fd2f Michael Hanselmann
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1103 26d3fd2f Michael Hanselmann
1104 b80cc518 Michael Hanselmann
      op = opctx.op
1105 be760ba8 Michael Hanselmann
1106 be760ba8 Michael Hanselmann
      # Consistency check
1107 be760ba8 Michael Hanselmann
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1108 66bd7445 Michael Hanselmann
                                     constants.OP_STATUS_CANCELING)
1109 5fd6b694 Michael Hanselmann
                        for i in job.ops[opctx.index + 1:])
1110 be760ba8 Michael Hanselmann
1111 be760ba8 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_QUEUED,
1112 47099cd1 Michael Hanselmann
                           constants.OP_STATUS_WAITING,
1113 66bd7445 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1114 be760ba8 Michael Hanselmann
1115 26d3fd2f Michael Hanselmann
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1116 26d3fd2f Michael Hanselmann
              op.priority >= constants.OP_PRIO_HIGHEST)
1117 26d3fd2f Michael Hanselmann
1118 b95479a5 Michael Hanselmann
      waitjob = None
1119 b95479a5 Michael Hanselmann
1120 66bd7445 Michael Hanselmann
      if op.status != constants.OP_STATUS_CANCELING:
1121 30c945d0 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
1122 47099cd1 Michael Hanselmann
                             constants.OP_STATUS_WAITING)
1123 30c945d0 Michael Hanselmann
1124 be760ba8 Michael Hanselmann
        # Prepare to start opcode
1125 5fd6b694 Michael Hanselmann
        if self._MarkWaitlock(job, op):
1126 5fd6b694 Michael Hanselmann
          # Write to disk
1127 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1128 be760ba8 Michael Hanselmann
1129 47099cd1 Michael Hanselmann
        assert op.status == constants.OP_STATUS_WAITING
1130 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1131 5fd6b694 Michael Hanselmann
        assert job.start_timestamp and op.start_timestamp
1132 b95479a5 Michael Hanselmann
        assert waitjob is None
1133 b95479a5 Michael Hanselmann
1134 b95479a5 Michael Hanselmann
        # Check if waiting for a job is necessary
1135 b95479a5 Michael Hanselmann
        waitjob = self._CheckDependencies(queue, job, opctx)
1136 be760ba8 Michael Hanselmann
1137 47099cd1 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_WAITING,
1138 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_CANCELING,
1139 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_ERROR)
1140 be760ba8 Michael Hanselmann
1141 b95479a5 Michael Hanselmann
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1142 b95479a5 Michael Hanselmann
                                         constants.OP_STATUS_ERROR)):
1143 b95479a5 Michael Hanselmann
          logging.info("%s: opcode %s waiting for locks",
1144 b95479a5 Michael Hanselmann
                       opctx.log_prefix, opctx.summary)
1145 be760ba8 Michael Hanselmann
1146 b95479a5 Michael Hanselmann
          assert not opctx.jobdeps, "Not all dependencies were removed"
1147 b95479a5 Michael Hanselmann
1148 b95479a5 Michael Hanselmann
          queue.release()
1149 b95479a5 Michael Hanselmann
          try:
1150 b95479a5 Michael Hanselmann
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1151 b95479a5 Michael Hanselmann
          finally:
1152 b95479a5 Michael Hanselmann
            queue.acquire(shared=1)
1153 b95479a5 Michael Hanselmann
1154 b95479a5 Michael Hanselmann
          op.status = op_status
1155 b95479a5 Michael Hanselmann
          op.result = op_result
1156 b95479a5 Michael Hanselmann
1157 b95479a5 Michael Hanselmann
          assert not waitjob
1158 be760ba8 Michael Hanselmann
1159 942e2262 Michael Hanselmann
        if op.status in (constants.OP_STATUS_WAITING,
1160 942e2262 Michael Hanselmann
                         constants.OP_STATUS_QUEUED):
1161 942e2262 Michael Hanselmann
          # waiting: Couldn't get locks in time
1162 942e2262 Michael Hanselmann
          # queued: Queue is shutting down
1163 26d3fd2f Michael Hanselmann
          assert not op.end_timestamp
1164 be760ba8 Michael Hanselmann
        else:
1165 26d3fd2f Michael Hanselmann
          # Finalize opcode
1166 26d3fd2f Michael Hanselmann
          op.end_timestamp = TimeStampNow()
1167 be760ba8 Michael Hanselmann
1168 26d3fd2f Michael Hanselmann
          if op.status == constants.OP_STATUS_CANCELING:
1169 26d3fd2f Michael Hanselmann
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1170 26d3fd2f Michael Hanselmann
                                  for i in job.ops[opctx.index:])
1171 26d3fd2f Michael Hanselmann
          else:
1172 26d3fd2f Michael Hanselmann
            assert op.status in constants.OPS_FINALIZED
1173 be760ba8 Michael Hanselmann
1174 942e2262 Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
1175 942e2262 Michael Hanselmann
        # Queue is shutting down
1176 942e2262 Michael Hanselmann
        assert not waitjob
1177 942e2262 Michael Hanselmann
1178 942e2262 Michael Hanselmann
        finalize = False
1179 942e2262 Michael Hanselmann
1180 942e2262 Michael Hanselmann
        # Reset context
1181 942e2262 Michael Hanselmann
        job.cur_opctx = None
1182 942e2262 Michael Hanselmann
1183 942e2262 Michael Hanselmann
        # In no case must the status be finalized here
1184 942e2262 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1185 942e2262 Michael Hanselmann
1186 942e2262 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1187 be760ba8 Michael Hanselmann
        finalize = False
1188 be760ba8 Michael Hanselmann
1189 b95479a5 Michael Hanselmann
        if not waitjob and opctx.CheckPriorityIncrease():
1190 5fd6b694 Michael Hanselmann
          # Priority was changed, need to update on-disk file
1191 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1192 be760ba8 Michael Hanselmann
1193 26d3fd2f Michael Hanselmann
        # Keep around for another round
1194 26d3fd2f Michael Hanselmann
        job.cur_opctx = opctx
1195 be760ba8 Michael Hanselmann
1196 26d3fd2f Michael Hanselmann
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1197 26d3fd2f Michael Hanselmann
                op.priority >= constants.OP_PRIO_HIGHEST)
1198 be760ba8 Michael Hanselmann
1199 26d3fd2f Michael Hanselmann
        # In no case must the status be finalized here
1200 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1201 be760ba8 Michael Hanselmann
1202 be760ba8 Michael Hanselmann
      else:
1203 26d3fd2f Michael Hanselmann
        # Ensure all opcodes so far have been successful
1204 26d3fd2f Michael Hanselmann
        assert (opctx.index == 0 or
1205 26d3fd2f Michael Hanselmann
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1206 26d3fd2f Michael Hanselmann
                           for i in job.ops[:opctx.index]))
1207 26d3fd2f Michael Hanselmann
1208 26d3fd2f Michael Hanselmann
        # Reset context
1209 26d3fd2f Michael Hanselmann
        job.cur_opctx = None
1210 26d3fd2f Michael Hanselmann
1211 26d3fd2f Michael Hanselmann
        if op.status == constants.OP_STATUS_SUCCESS:
1212 26d3fd2f Michael Hanselmann
          finalize = False
1213 26d3fd2f Michael Hanselmann
1214 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_ERROR:
1215 26d3fd2f Michael Hanselmann
          # Ensure failed opcode has an exception as its result
1216 26d3fd2f Michael Hanselmann
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1217 26d3fd2f Michael Hanselmann
1218 26d3fd2f Michael Hanselmann
          to_encode = errors.OpExecError("Preceding opcode failed")
1219 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1220 26d3fd2f Michael Hanselmann
                                _EncodeOpError(to_encode))
1221 26d3fd2f Michael Hanselmann
          finalize = True
1222 be760ba8 Michael Hanselmann
1223 26d3fd2f Michael Hanselmann
          # Consistency check
1224 26d3fd2f Michael Hanselmann
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1225 26d3fd2f Michael Hanselmann
                            errors.GetEncodedError(i.result)
1226 26d3fd2f Michael Hanselmann
                            for i in job.ops[opctx.index:])
1227 be760ba8 Michael Hanselmann
1228 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELING:
1229 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1230 26d3fd2f Michael Hanselmann
                                "Job canceled by request")
1231 26d3fd2f Michael Hanselmann
          finalize = True
1232 26d3fd2f Michael Hanselmann
1233 26d3fd2f Michael Hanselmann
        else:
1234 26d3fd2f Michael Hanselmann
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1235 26d3fd2f Michael Hanselmann
1236 66bd7445 Michael Hanselmann
        if opctx.index == (opcount - 1):
1237 66bd7445 Michael Hanselmann
          # Finalize on last opcode
1238 66bd7445 Michael Hanselmann
          finalize = True
1239 66bd7445 Michael Hanselmann
1240 66bd7445 Michael Hanselmann
        if finalize:
1241 26d3fd2f Michael Hanselmann
          # All opcodes have been run, finalize job
1242 66bd7445 Michael Hanselmann
          job.Finalize()
1243 26d3fd2f Michael Hanselmann
1244 26d3fd2f Michael Hanselmann
        # Write to disk. If the job status is final, this is the final write
1245 26d3fd2f Michael Hanselmann
        # allowed. Once the file has been written, it can be archived anytime.
1246 26d3fd2f Michael Hanselmann
        queue.UpdateJobUnlocked(job)
1247 be760ba8 Michael Hanselmann
1248 b95479a5 Michael Hanselmann
        assert not waitjob
1249 b95479a5 Michael Hanselmann
1250 66bd7445 Michael Hanselmann
        if finalize:
1251 26d3fd2f Michael Hanselmann
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1252 75d81fc8 Michael Hanselmann
          return self.FINISHED
1253 be760ba8 Michael Hanselmann
1254 b95479a5 Michael Hanselmann
      assert not waitjob or queue.depmgr.JobWaiting(job)
1255 b95479a5 Michael Hanselmann
1256 75d81fc8 Michael Hanselmann
      if waitjob:
1257 75d81fc8 Michael Hanselmann
        return self.WAITDEP
1258 75d81fc8 Michael Hanselmann
      else:
1259 75d81fc8 Michael Hanselmann
        return self.DEFER
1260 be760ba8 Michael Hanselmann
    finally:
1261 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Job became read-only while being processed"
1262 be760ba8 Michael Hanselmann
      queue.release()
1263 be760ba8 Michael Hanselmann
1264 be760ba8 Michael Hanselmann
1265 df5a5730 Michael Hanselmann
def _EvaluateJobProcessorResult(depmgr, job, result):
1266 df5a5730 Michael Hanselmann
  """Looks at a result from L{_JobProcessor} for a job.
1267 df5a5730 Michael Hanselmann

1268 df5a5730 Michael Hanselmann
  To be used in a L{_JobQueueWorker}.
1269 df5a5730 Michael Hanselmann

1270 df5a5730 Michael Hanselmann
  """
1271 df5a5730 Michael Hanselmann
  if result == _JobProcessor.FINISHED:
1272 df5a5730 Michael Hanselmann
    # Notify waiting jobs
1273 df5a5730 Michael Hanselmann
    depmgr.NotifyWaiters(job.id)
1274 df5a5730 Michael Hanselmann
1275 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.DEFER:
1276 df5a5730 Michael Hanselmann
    # Schedule again
1277 df5a5730 Michael Hanselmann
    raise workerpool.DeferTask(priority=job.CalcPriority())
1278 df5a5730 Michael Hanselmann
1279 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.WAITDEP:
1280 df5a5730 Michael Hanselmann
    # No-op, dependency manager will re-schedule
1281 df5a5730 Michael Hanselmann
    pass
1282 df5a5730 Michael Hanselmann
1283 df5a5730 Michael Hanselmann
  else:
1284 df5a5730 Michael Hanselmann
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1285 df5a5730 Michael Hanselmann
                                 (result, ))
1286 df5a5730 Michael Hanselmann
1287 df5a5730 Michael Hanselmann
1288 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
1289 031a3e57 Michael Hanselmann
  """The actual job workers.
1290 031a3e57 Michael Hanselmann

1291 031a3e57 Michael Hanselmann
  """
1292 b459a848 Andrea Spadaccini
  def RunTask(self, job): # pylint: disable=W0221
1293 e2715f69 Michael Hanselmann
    """Job executor.
1294 e2715f69 Michael Hanselmann

1295 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1296 ea03467c Iustin Pop
    @param job: the job to be processed
1297 ea03467c Iustin Pop

1298 e2715f69 Michael Hanselmann
    """
1299 f8a4adfa Michael Hanselmann
    assert job.writable, "Expected writable job"
1300 f8a4adfa Michael Hanselmann
1301 b95479a5 Michael Hanselmann
    # Ensure only one worker is active on a single job. If a job registers for
1302 b95479a5 Michael Hanselmann
    # a dependency job, and the other job notifies before the first worker is
1303 b95479a5 Michael Hanselmann
    # done, the job can end up in the tasklist more than once.
1304 b95479a5 Michael Hanselmann
    job.processor_lock.acquire()
1305 b95479a5 Michael Hanselmann
    try:
1306 b95479a5 Michael Hanselmann
      return self._RunTaskInner(job)
1307 b95479a5 Michael Hanselmann
    finally:
1308 b95479a5 Michael Hanselmann
      job.processor_lock.release()
1309 b95479a5 Michael Hanselmann
1310 b95479a5 Michael Hanselmann
  def _RunTaskInner(self, job):
1311 b95479a5 Michael Hanselmann
    """Executes a job.
1312 b95479a5 Michael Hanselmann

1313 b95479a5 Michael Hanselmann
    Must be called with per-job lock acquired.
1314 b95479a5 Michael Hanselmann

1315 b95479a5 Michael Hanselmann
    """
1316 be760ba8 Michael Hanselmann
    queue = job.queue
1317 be760ba8 Michael Hanselmann
    assert queue == self.pool.queue
1318 be760ba8 Michael Hanselmann
1319 0aeeb6e3 Michael Hanselmann
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1320 0aeeb6e3 Michael Hanselmann
    setname_fn(None)
1321 daba67c7 Michael Hanselmann
1322 be760ba8 Michael Hanselmann
    proc = mcpu.Processor(queue.context, job.id)
1323 be760ba8 Michael Hanselmann
1324 0aeeb6e3 Michael Hanselmann
    # Create wrapper for setting thread name
1325 0aeeb6e3 Michael Hanselmann
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1326 0aeeb6e3 Michael Hanselmann
                                    proc.ExecOpCode)
1327 0aeeb6e3 Michael Hanselmann
1328 df5a5730 Michael Hanselmann
    _EvaluateJobProcessorResult(queue.depmgr, job,
1329 df5a5730 Michael Hanselmann
                                _JobProcessor(queue, wrap_execop_fn, job)())
1330 75d81fc8 Michael Hanselmann
1331 0aeeb6e3 Michael Hanselmann
  @staticmethod
1332 0aeeb6e3 Michael Hanselmann
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1333 0aeeb6e3 Michael Hanselmann
    """Updates the worker thread name to include a short summary of the opcode.
1334 0aeeb6e3 Michael Hanselmann

1335 0aeeb6e3 Michael Hanselmann
    @param setname_fn: Callable setting worker thread name
1336 0aeeb6e3 Michael Hanselmann
    @param execop_fn: Callable for executing opcode (usually
1337 0aeeb6e3 Michael Hanselmann
                      L{mcpu.Processor.ExecOpCode})
1338 0aeeb6e3 Michael Hanselmann

1339 0aeeb6e3 Michael Hanselmann
    """
1340 0aeeb6e3 Michael Hanselmann
    setname_fn(op)
1341 0aeeb6e3 Michael Hanselmann
    try:
1342 0aeeb6e3 Michael Hanselmann
      return execop_fn(op, *args, **kwargs)
1343 0aeeb6e3 Michael Hanselmann
    finally:
1344 0aeeb6e3 Michael Hanselmann
      setname_fn(None)
1345 0aeeb6e3 Michael Hanselmann
1346 0aeeb6e3 Michael Hanselmann
  @staticmethod
1347 0aeeb6e3 Michael Hanselmann
  def _GetWorkerName(job, op):
1348 0aeeb6e3 Michael Hanselmann
    """Sets the worker thread name.
1349 0aeeb6e3 Michael Hanselmann

1350 0aeeb6e3 Michael Hanselmann
    @type job: L{_QueuedJob}
1351 0aeeb6e3 Michael Hanselmann
    @type op: L{opcodes.OpCode}
1352 0aeeb6e3 Michael Hanselmann

1353 0aeeb6e3 Michael Hanselmann
    """
1354 0aeeb6e3 Michael Hanselmann
    parts = ["Job%s" % job.id]
1355 0aeeb6e3 Michael Hanselmann
1356 0aeeb6e3 Michael Hanselmann
    if op:
1357 0aeeb6e3 Michael Hanselmann
      parts.append(op.TinySummary())
1358 0aeeb6e3 Michael Hanselmann
1359 0aeeb6e3 Michael Hanselmann
    return "/".join(parts)
1360 0aeeb6e3 Michael Hanselmann
1361 e2715f69 Michael Hanselmann
1362 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
1363 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
1364 ea03467c Iustin Pop

1365 ea03467c Iustin Pop
  """
1366 5bdce580 Michael Hanselmann
  def __init__(self, queue):
1367 0aeeb6e3 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("Jq",
1368 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
1369 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
1370 5bdce580 Michael Hanselmann
    self.queue = queue
1371 e2715f69 Michael Hanselmann
1372 e2715f69 Michael Hanselmann
1373 b95479a5 Michael Hanselmann
class _JobDependencyManager:
1374 b95479a5 Michael Hanselmann
  """Keeps track of job dependencies.
1375 b95479a5 Michael Hanselmann

1376 b95479a5 Michael Hanselmann
  """
1377 b95479a5 Michael Hanselmann
  (WAIT,
1378 b95479a5 Michael Hanselmann
   ERROR,
1379 b95479a5 Michael Hanselmann
   CANCEL,
1380 b95479a5 Michael Hanselmann
   CONTINUE,
1381 b95479a5 Michael Hanselmann
   WRONGSTATUS) = range(1, 6)
1382 b95479a5 Michael Hanselmann
1383 b95479a5 Michael Hanselmann
  def __init__(self, getstatus_fn, enqueue_fn):
1384 b95479a5 Michael Hanselmann
    """Initializes this class.
1385 b95479a5 Michael Hanselmann

1386 b95479a5 Michael Hanselmann
    """
1387 b95479a5 Michael Hanselmann
    self._getstatus_fn = getstatus_fn
1388 b95479a5 Michael Hanselmann
    self._enqueue_fn = enqueue_fn
1389 b95479a5 Michael Hanselmann
1390 b95479a5 Michael Hanselmann
    self._waiters = {}
1391 b95479a5 Michael Hanselmann
    self._lock = locking.SharedLock("JobDepMgr")
1392 b95479a5 Michael Hanselmann
1393 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1394 b459a848 Andrea Spadaccini
  def GetLockInfo(self, requested): # pylint: disable=W0613
1395 fcb21ad7 Michael Hanselmann
    """Retrieves information about waiting jobs.
1396 fcb21ad7 Michael Hanselmann

1397 fcb21ad7 Michael Hanselmann
    @type requested: set
1398 fcb21ad7 Michael Hanselmann
    @param requested: Requested information, see C{query.LQ_*}
1399 fcb21ad7 Michael Hanselmann

1400 fcb21ad7 Michael Hanselmann
    """
1401 fcb21ad7 Michael Hanselmann
    # No need to sort here, that's being done by the lock manager and query
1402 fcb21ad7 Michael Hanselmann
    # library. There are no priorities for notifying jobs, hence all show up as
1403 fcb21ad7 Michael Hanselmann
    # one item under "pending".
1404 fcb21ad7 Michael Hanselmann
    return [("job/%s" % job_id, None, None,
1405 fcb21ad7 Michael Hanselmann
             [("job", [job.id for job in waiters])])
1406 fcb21ad7 Michael Hanselmann
            for job_id, waiters in self._waiters.items()
1407 fcb21ad7 Michael Hanselmann
            if waiters]
1408 fcb21ad7 Michael Hanselmann
1409 fcb21ad7 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1410 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
1411 b95479a5 Michael Hanselmann
    """Checks if a job is waiting.
1412 b95479a5 Michael Hanselmann

1413 b95479a5 Michael Hanselmann
    """
1414 b95479a5 Michael Hanselmann
    return compat.any(job in jobs
1415 b95479a5 Michael Hanselmann
                      for jobs in self._waiters.values())
1416 b95479a5 Michael Hanselmann
1417 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1418 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1419 b95479a5 Michael Hanselmann
    """Checks if a dependency job has the requested status.
1420 b95479a5 Michael Hanselmann

1421 b95479a5 Michael Hanselmann
    If the other job is not yet in a finalized status, the calling job will be
1422 b95479a5 Michael Hanselmann
    notified (re-added to the workerpool) at a later point.
1423 b95479a5 Michael Hanselmann

1424 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1425 b95479a5 Michael Hanselmann
    @param job: Job object
1426 b95479a5 Michael Hanselmann
    @type dep_job_id: string
1427 b95479a5 Michael Hanselmann
    @param dep_job_id: ID of dependency job
1428 b95479a5 Michael Hanselmann
    @type dep_status: list
1429 b95479a5 Michael Hanselmann
    @param dep_status: Required status
1430 b95479a5 Michael Hanselmann

1431 b95479a5 Michael Hanselmann
    """
1432 b95479a5 Michael Hanselmann
    assert ht.TString(job.id)
1433 b95479a5 Michael Hanselmann
    assert ht.TString(dep_job_id)
1434 b95479a5 Michael Hanselmann
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1435 b95479a5 Michael Hanselmann
1436 b95479a5 Michael Hanselmann
    if job.id == dep_job_id:
1437 b95479a5 Michael Hanselmann
      return (self.ERROR, "Job can't depend on itself")
1438 b95479a5 Michael Hanselmann
1439 b95479a5 Michael Hanselmann
    # Get status of dependency job
1440 b95479a5 Michael Hanselmann
    try:
1441 b95479a5 Michael Hanselmann
      status = self._getstatus_fn(dep_job_id)
1442 b95479a5 Michael Hanselmann
    except errors.JobLost, err:
1443 b95479a5 Michael Hanselmann
      return (self.ERROR, "Dependency error: %s" % err)
1444 b95479a5 Michael Hanselmann
1445 b95479a5 Michael Hanselmann
    assert status in constants.JOB_STATUS_ALL
1446 b95479a5 Michael Hanselmann
1447 b95479a5 Michael Hanselmann
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1448 b95479a5 Michael Hanselmann
1449 b95479a5 Michael Hanselmann
    if status not in constants.JOBS_FINALIZED:
1450 b95479a5 Michael Hanselmann
      # Register for notification and wait for job to finish
1451 b95479a5 Michael Hanselmann
      job_id_waiters.add(job)
1452 b95479a5 Michael Hanselmann
      return (self.WAIT,
1453 b95479a5 Michael Hanselmann
              "Need to wait for job %s, wanted status '%s'" %
1454 b95479a5 Michael Hanselmann
              (dep_job_id, dep_status))
1455 b95479a5 Michael Hanselmann
1456 b95479a5 Michael Hanselmann
    # Remove from waiters list
1457 b95479a5 Michael Hanselmann
    if job in job_id_waiters:
1458 b95479a5 Michael Hanselmann
      job_id_waiters.remove(job)
1459 b95479a5 Michael Hanselmann
1460 b95479a5 Michael Hanselmann
    if (status == constants.JOB_STATUS_CANCELED and
1461 b95479a5 Michael Hanselmann
        constants.JOB_STATUS_CANCELED not in dep_status):
1462 b95479a5 Michael Hanselmann
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1463 b95479a5 Michael Hanselmann
1464 b95479a5 Michael Hanselmann
    elif not dep_status or status in dep_status:
1465 b95479a5 Michael Hanselmann
      return (self.CONTINUE,
1466 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s'" %
1467 b95479a5 Michael Hanselmann
              (dep_job_id, status))
1468 b95479a5 Michael Hanselmann
1469 b95479a5 Michael Hanselmann
    else:
1470 b95479a5 Michael Hanselmann
      return (self.WRONGSTATUS,
1471 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s',"
1472 b95479a5 Michael Hanselmann
              " not one of '%s' as required" %
1473 b95479a5 Michael Hanselmann
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1474 b95479a5 Michael Hanselmann
1475 37d76f1e Michael Hanselmann
  def _RemoveEmptyWaitersUnlocked(self):
1476 37d76f1e Michael Hanselmann
    """Remove all jobs without actual waiters.
1477 37d76f1e Michael Hanselmann

1478 37d76f1e Michael Hanselmann
    """
1479 37d76f1e Michael Hanselmann
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1480 37d76f1e Michael Hanselmann
                   if not waiters]:
1481 37d76f1e Michael Hanselmann
      del self._waiters[job_id]
1482 37d76f1e Michael Hanselmann
1483 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
1484 b95479a5 Michael Hanselmann
    """Notifies all jobs waiting for a certain job ID.
1485 b95479a5 Michael Hanselmann

1486 1316ebc2 Michael Hanselmann
    @attention: Do not call until L{CheckAndRegister} returned a status other
1487 1316ebc2 Michael Hanselmann
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1488 b95479a5 Michael Hanselmann
    @type job_id: string
1489 b95479a5 Michael Hanselmann
    @param job_id: Job ID
1490 b95479a5 Michael Hanselmann

1491 b95479a5 Michael Hanselmann
    """
1492 b95479a5 Michael Hanselmann
    assert ht.TString(job_id)
1493 b95479a5 Michael Hanselmann
1494 37d76f1e Michael Hanselmann
    self._lock.acquire()
1495 37d76f1e Michael Hanselmann
    try:
1496 37d76f1e Michael Hanselmann
      self._RemoveEmptyWaitersUnlocked()
1497 37d76f1e Michael Hanselmann
1498 37d76f1e Michael Hanselmann
      jobs = self._waiters.pop(job_id, None)
1499 37d76f1e Michael Hanselmann
    finally:
1500 37d76f1e Michael Hanselmann
      self._lock.release()
1501 37d76f1e Michael Hanselmann
1502 b95479a5 Michael Hanselmann
    if jobs:
1503 b95479a5 Michael Hanselmann
      # Re-add jobs to workerpool
1504 b95479a5 Michael Hanselmann
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1505 b95479a5 Michael Hanselmann
                    len(jobs), job_id)
1506 b95479a5 Michael Hanselmann
      self._enqueue_fn(jobs)
1507 b95479a5 Michael Hanselmann
1508 b95479a5 Michael Hanselmann
1509 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
1510 6c881c52 Iustin Pop
  """Decorator for "public" functions.
1511 ea03467c Iustin Pop

1512 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
1513 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
1514 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
1515 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
1516 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
1517 ea03467c Iustin Pop

1518 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
1519 f1da30e6 Michael Hanselmann

1520 6c881c52 Iustin Pop
  Example::
1521 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
1522 6c881c52 Iustin Pop
    @_RequireOpenQueue
1523 6c881c52 Iustin Pop
    def Example(self):
1524 6c881c52 Iustin Pop
      pass
1525 db37da70 Michael Hanselmann

1526 6c881c52 Iustin Pop
  """
1527 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
1528 b459a848 Andrea Spadaccini
    # pylint: disable=W0212
1529 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
1530 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
1531 6c881c52 Iustin Pop
  return wrapper
1532 db37da70 Michael Hanselmann
1533 db37da70 Michael Hanselmann
1534 c8d0be94 Michael Hanselmann
def _RequireNonDrainedQueue(fn):
1535 c8d0be94 Michael Hanselmann
  """Decorator checking for a non-drained queue.
1536 c8d0be94 Michael Hanselmann

1537 c8d0be94 Michael Hanselmann
  To be used with functions submitting new jobs.
1538 c8d0be94 Michael Hanselmann

1539 c8d0be94 Michael Hanselmann
  """
1540 c8d0be94 Michael Hanselmann
  def wrapper(self, *args, **kwargs):
1541 c8d0be94 Michael Hanselmann
    """Wrapper function.
1542 c8d0be94 Michael Hanselmann

1543 c8d0be94 Michael Hanselmann
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1544 c8d0be94 Michael Hanselmann

1545 c8d0be94 Michael Hanselmann
    """
1546 c8d0be94 Michael Hanselmann
    # Ok when sharing the big job queue lock, as the drain file is created when
1547 c8d0be94 Michael Hanselmann
    # the lock is exclusive.
1548 c8d0be94 Michael Hanselmann
    # Needs access to protected member, pylint: disable=W0212
1549 c8d0be94 Michael Hanselmann
    if self._drained:
1550 c8d0be94 Michael Hanselmann
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1551 6d5ea385 Michael Hanselmann
1552 6d5ea385 Michael Hanselmann
    if not self._accepting_jobs:
1553 6d5ea385 Michael Hanselmann
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1554 6d5ea385 Michael Hanselmann
1555 c8d0be94 Michael Hanselmann
    return fn(self, *args, **kwargs)
1556 c8d0be94 Michael Hanselmann
  return wrapper
1557 c8d0be94 Michael Hanselmann
1558 c8d0be94 Michael Hanselmann
1559 6c881c52 Iustin Pop
class JobQueue(object):
1560 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
1561 db37da70 Michael Hanselmann

1562 6c881c52 Iustin Pop
  """
1563 85f03e0d Michael Hanselmann
  def __init__(self, context):
1564 ea03467c Iustin Pop
    """Constructor for JobQueue.
1565 ea03467c Iustin Pop

1566 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
1567 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
1568 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
1569 ea03467c Iustin Pop
    running).
1570 ea03467c Iustin Pop

1571 ea03467c Iustin Pop
    @type context: GanetiContext
1572 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
1573 ea03467c Iustin Pop
        data and other ganeti objects
1574 ea03467c Iustin Pop

1575 ea03467c Iustin Pop
    """
1576 5bdce580 Michael Hanselmann
    self.context = context
1577 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
1578 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
1579 f1da30e6 Michael Hanselmann
1580 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
1581 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
1582 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
1583 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
1584 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
1585 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
1586 ebb80afa Guido Trotter
1587 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
1588 ebb80afa Guido Trotter
    self.release = self._lock.release
1589 85f03e0d Michael Hanselmann
1590 6d5ea385 Michael Hanselmann
    # Accept jobs by default
1591 6d5ea385 Michael Hanselmann
    self._accepting_jobs = True
1592 6d5ea385 Michael Hanselmann
1593 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
1594 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
1595 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1596 f1da30e6 Michael Hanselmann
1597 04ab05ce Michael Hanselmann
    # Read serial file
1598 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
1599 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
1600 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
1601 c4beba1c Iustin Pop
1602 23752136 Michael Hanselmann
    # Get initial list of nodes
1603 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
1604 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
1605 59303563 Iustin Pop
                       if n.master_candidate)
1606 8e00939c Michael Hanselmann
1607 8e00939c Michael Hanselmann
    # Remove master node
1608 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
1609 23752136 Michael Hanselmann
1610 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
1611 23752136 Michael Hanselmann
1612 6d5ea385 Michael Hanselmann
    self._queue_size = None
1613 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1614 6d5ea385 Michael Hanselmann
    assert ht.TInt(self._queue_size)
1615 ff699aa9 Michael Hanselmann
    self._drained = jstore.CheckDrainFlag()
1616 20571a26 Guido Trotter
1617 b95479a5 Michael Hanselmann
    # Job dependencies
1618 b95479a5 Michael Hanselmann
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1619 b95479a5 Michael Hanselmann
                                        self._EnqueueJobs)
1620 fcb21ad7 Michael Hanselmann
    self.context.glm.AddToLockMonitor(self.depmgr)
1621 b95479a5 Michael Hanselmann
1622 85f03e0d Michael Hanselmann
    # Setup worker pool
1623 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
1624 85f03e0d Michael Hanselmann
    try:
1625 de9d02c7 Michael Hanselmann
      self._InspectQueue()
1626 de9d02c7 Michael Hanselmann
    except:
1627 de9d02c7 Michael Hanselmann
      self._wpool.TerminateWorkers()
1628 de9d02c7 Michael Hanselmann
      raise
1629 711b5124 Michael Hanselmann
1630 de9d02c7 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1631 de9d02c7 Michael Hanselmann
  @_RequireOpenQueue
1632 de9d02c7 Michael Hanselmann
  def _InspectQueue(self):
1633 de9d02c7 Michael Hanselmann
    """Loads the whole job queue and resumes unfinished jobs.
1634 de9d02c7 Michael Hanselmann

1635 de9d02c7 Michael Hanselmann
    This function needs the lock here because WorkerPool.AddTask() may start a
1636 de9d02c7 Michael Hanselmann
    job while we're still doing our work.
1637 711b5124 Michael Hanselmann

1638 de9d02c7 Michael Hanselmann
    """
1639 de9d02c7 Michael Hanselmann
    logging.info("Inspecting job queue")
1640 de9d02c7 Michael Hanselmann
1641 7b5c4a69 Michael Hanselmann
    restartjobs = []
1642 7b5c4a69 Michael Hanselmann
1643 de9d02c7 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked()
1644 de9d02c7 Michael Hanselmann
    jobs_count = len(all_job_ids)
1645 de9d02c7 Michael Hanselmann
    lastinfo = time.time()
1646 de9d02c7 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1647 de9d02c7 Michael Hanselmann
      # Give an update every 1000 jobs or 10 seconds
1648 de9d02c7 Michael Hanselmann
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1649 de9d02c7 Michael Hanselmann
          idx == (jobs_count - 1)):
1650 de9d02c7 Michael Hanselmann
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1651 de9d02c7 Michael Hanselmann
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1652 711b5124 Michael Hanselmann
        lastinfo = time.time()
1653 94ed59a5 Iustin Pop
1654 de9d02c7 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1655 85f03e0d Michael Hanselmann
1656 de9d02c7 Michael Hanselmann
      # a failure in loading the job can cause 'None' to be returned
1657 de9d02c7 Michael Hanselmann
      if job is None:
1658 de9d02c7 Michael Hanselmann
        continue
1659 85f03e0d Michael Hanselmann
1660 de9d02c7 Michael Hanselmann
      status = job.CalcStatus()
1661 711b5124 Michael Hanselmann
1662 320d1daf Michael Hanselmann
      if status == constants.JOB_STATUS_QUEUED:
1663 7b5c4a69 Michael Hanselmann
        restartjobs.append(job)
1664 de9d02c7 Michael Hanselmann
1665 de9d02c7 Michael Hanselmann
      elif status in (constants.JOB_STATUS_RUNNING,
1666 47099cd1 Michael Hanselmann
                      constants.JOB_STATUS_WAITING,
1667 de9d02c7 Michael Hanselmann
                      constants.JOB_STATUS_CANCELING):
1668 de9d02c7 Michael Hanselmann
        logging.warning("Unfinished job %s found: %s", job.id, job)
1669 320d1daf Michael Hanselmann
1670 47099cd1 Michael Hanselmann
        if status == constants.JOB_STATUS_WAITING:
1671 320d1daf Michael Hanselmann
          # Restart job
1672 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1673 320d1daf Michael Hanselmann
          restartjobs.append(job)
1674 320d1daf Michael Hanselmann
        else:
1675 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1676 320d1daf Michael Hanselmann
                                "Unclean master daemon shutdown")
1677 45df0793 Michael Hanselmann
          job.Finalize()
1678 320d1daf Michael Hanselmann
1679 de9d02c7 Michael Hanselmann
        self.UpdateJobUnlocked(job)
1680 de9d02c7 Michael Hanselmann
1681 7b5c4a69 Michael Hanselmann
    if restartjobs:
1682 7b5c4a69 Michael Hanselmann
      logging.info("Restarting %s jobs", len(restartjobs))
1683 75d81fc8 Michael Hanselmann
      self._EnqueueJobsUnlocked(restartjobs)
1684 7b5c4a69 Michael Hanselmann
1685 de9d02c7 Michael Hanselmann
    logging.info("Job queue inspection finished")
1686 85f03e0d Michael Hanselmann
1687 fb1ffbca Michael Hanselmann
  def _GetRpc(self, address_list):
1688 fb1ffbca Michael Hanselmann
    """Gets RPC runner with context.
1689 fb1ffbca Michael Hanselmann

1690 fb1ffbca Michael Hanselmann
    """
1691 fb1ffbca Michael Hanselmann
    return rpc.JobQueueRunner(self.context, address_list)
1692 fb1ffbca Michael Hanselmann
1693 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1694 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1695 99aabbed Iustin Pop
  def AddNode(self, node):
1696 99aabbed Iustin Pop
    """Register a new node with the queue.
1697 99aabbed Iustin Pop

1698 99aabbed Iustin Pop
    @type node: L{objects.Node}
1699 99aabbed Iustin Pop
    @param node: the node object to be added
1700 99aabbed Iustin Pop

1701 99aabbed Iustin Pop
    """
1702 99aabbed Iustin Pop
    node_name = node.name
1703 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1704 23752136 Michael Hanselmann
1705 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1706 fb1ffbca Michael Hanselmann
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1707 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1708 c8457ce7 Iustin Pop
    if msg:
1709 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1710 c8457ce7 Iustin Pop
                      node_name, msg)
1711 23752136 Michael Hanselmann
1712 59303563 Iustin Pop
    if not node.master_candidate:
1713 59303563 Iustin Pop
      # remove if existing, ignoring errors
1714 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1715 59303563 Iustin Pop
      # and skip the replication of the job ids
1716 59303563 Iustin Pop
      return
1717 59303563 Iustin Pop
1718 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1719 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1720 23752136 Michael Hanselmann
1721 d2e03a33 Michael Hanselmann
    # Upload current serial file
1722 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1723 d2e03a33 Michael Hanselmann
1724 fb1ffbca Michael Hanselmann
    # Static address list
1725 fb1ffbca Michael Hanselmann
    addrs = [node.primary_ip]
1726 fb1ffbca Michael Hanselmann
1727 d2e03a33 Michael Hanselmann
    for file_name in files:
1728 9f774ee8 Michael Hanselmann
      # Read file content
1729 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1730 9f774ee8 Michael Hanselmann
1731 fb1ffbca Michael Hanselmann
      result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1732 fb1ffbca Michael Hanselmann
                                                        content)
1733 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1734 c8457ce7 Iustin Pop
      if msg:
1735 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1736 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1737 d2e03a33 Michael Hanselmann
1738 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1739 d2e03a33 Michael Hanselmann
1740 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1741 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1742 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1743 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1744 ea03467c Iustin Pop

1745 ea03467c Iustin Pop
    @type node_name: str
1746 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1747 ea03467c Iustin Pop

1748 ea03467c Iustin Pop
    """
1749 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1750 23752136 Michael Hanselmann
1751 7e950d31 Iustin Pop
  @staticmethod
1752 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1753 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1754 ea03467c Iustin Pop

1755 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1756 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1757 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1758 ea03467c Iustin Pop

1759 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1760 ea03467c Iustin Pop
    @type nodes: list
1761 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1762 ea03467c Iustin Pop
    @type failmsg: str
1763 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1764 ea03467c Iustin Pop

1765 ea03467c Iustin Pop
    """
1766 e74798c1 Michael Hanselmann
    failed = []
1767 e74798c1 Michael Hanselmann
    success = []
1768 e74798c1 Michael Hanselmann
1769 e74798c1 Michael Hanselmann
    for node in nodes:
1770 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1771 c8457ce7 Iustin Pop
      if msg:
1772 e74798c1 Michael Hanselmann
        failed.append(node)
1773 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1774 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1775 c8457ce7 Iustin Pop
      else:
1776 c8457ce7 Iustin Pop
        success.append(node)
1777 e74798c1 Michael Hanselmann
1778 e74798c1 Michael Hanselmann
    # +1 for the master node
1779 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1780 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1781 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1782 e74798c1 Michael Hanselmann
1783 99aabbed Iustin Pop
  def _GetNodeIp(self):
1784 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1785 99aabbed Iustin Pop

1786 ea03467c Iustin Pop
    @rtype: (list, list)
1787 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1788 ea03467c Iustin Pop
        names and the second one with the node addresses
1789 ea03467c Iustin Pop

1790 99aabbed Iustin Pop
    """
1791 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1792 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1793 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1794 99aabbed Iustin Pop
    return name_list, addr_list
1795 99aabbed Iustin Pop
1796 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1797 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1798 8e00939c Michael Hanselmann

1799 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1800 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1801 ea03467c Iustin Pop

1802 ea03467c Iustin Pop
    @type file_name: str
1803 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1804 ea03467c Iustin Pop
    @type data: str
1805 ea03467c Iustin Pop
    @param data: the new contents of the file
1806 4c36bdf5 Guido Trotter
    @type replicate: boolean
1807 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1808 ea03467c Iustin Pop

1809 8e00939c Michael Hanselmann
    """
1810 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1811 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1812 82b22e19 René Nussbaumer
                    gid=getents.masterd_gid)
1813 8e00939c Michael Hanselmann
1814 4c36bdf5 Guido Trotter
    if replicate:
1815 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1816 fb1ffbca Michael Hanselmann
      result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1817 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1818 23752136 Michael Hanselmann
1819 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1820 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1821 ea03467c Iustin Pop

1822 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1823 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1824 ea03467c Iustin Pop

1825 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1826 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1827 ea03467c Iustin Pop

1828 ea03467c Iustin Pop
    """
1829 dd875d32 Michael Hanselmann
    # Rename them locally
1830 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1831 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1832 abc1f2ce Michael Hanselmann
1833 dd875d32 Michael Hanselmann
    # ... and on all nodes
1834 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1835 fb1ffbca Michael Hanselmann
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1836 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1837 abc1f2ce Michael Hanselmann
1838 7e950d31 Iustin Pop
  @staticmethod
1839 7e950d31 Iustin Pop
  def _FormatJobID(job_id):
1840 ea03467c Iustin Pop
    """Convert a job ID to string format.
1841 ea03467c Iustin Pop

1842 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
1843 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
1844 ea03467c Iustin Pop
    abstract this change.
1845 ea03467c Iustin Pop

1846 ea03467c Iustin Pop
    @type job_id: int or long
1847 ea03467c Iustin Pop
    @param job_id: the numeric job id
1848 ea03467c Iustin Pop
    @rtype: str
1849 ea03467c Iustin Pop
    @return: the formatted job id
1850 ea03467c Iustin Pop

1851 ea03467c Iustin Pop
    """
1852 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
1853 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1854 85f03e0d Michael Hanselmann
    if job_id < 0:
1855 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1856 85f03e0d Michael Hanselmann
1857 85f03e0d Michael Hanselmann
    return str(job_id)
1858 85f03e0d Michael Hanselmann
1859 58b22b6e Michael Hanselmann
  @classmethod
1860 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
1861 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
1862 58b22b6e Michael Hanselmann

1863 58b22b6e Michael Hanselmann
    @type job_id: str
1864 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
1865 58b22b6e Michael Hanselmann
    @rtype: str
1866 58b22b6e Michael Hanselmann
    @return: Directory name
1867 58b22b6e Michael Hanselmann

1868 58b22b6e Michael Hanselmann
    """
1869 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1870 58b22b6e Michael Hanselmann
1871 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1872 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1873 f1da30e6 Michael Hanselmann

1874 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1875 f1da30e6 Michael Hanselmann

1876 009e73d0 Iustin Pop
    @type count: integer
1877 009e73d0 Iustin Pop
    @param count: how many serials to return
1878 ea03467c Iustin Pop
    @rtype: str
1879 ea03467c Iustin Pop
    @return: a string representing the job identifier.
1880 f1da30e6 Michael Hanselmann

1881 f1da30e6 Michael Hanselmann
    """
1882 719f8fba Michael Hanselmann
    assert ht.TPositiveInt(count)
1883 719f8fba Michael Hanselmann
1884 f1da30e6 Michael Hanselmann
    # New number
1885 009e73d0 Iustin Pop
    serial = self._last_serial + count
1886 f1da30e6 Michael Hanselmann
1887 f1da30e6 Michael Hanselmann
    # Write to file
1888 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1889 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1890 f1da30e6 Michael Hanselmann
1891 009e73d0 Iustin Pop
    result = [self._FormatJobID(v)
1892 3c88bf36 Michael Hanselmann
              for v in range(self._last_serial + 1, serial + 1)]
1893 3c88bf36 Michael Hanselmann
1894 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1895 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1896 f1da30e6 Michael Hanselmann
1897 3c88bf36 Michael Hanselmann
    assert len(result) == count
1898 3c88bf36 Michael Hanselmann
1899 009e73d0 Iustin Pop
    return result
1900 f1da30e6 Michael Hanselmann
1901 85f03e0d Michael Hanselmann
  @staticmethod
1902 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1903 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1904 ea03467c Iustin Pop

1905 ea03467c Iustin Pop
    @type job_id: str
1906 ea03467c Iustin Pop
    @param job_id: the job identifier
1907 ea03467c Iustin Pop
    @rtype: str
1908 ea03467c Iustin Pop
    @return: the path to the job file
1909 ea03467c Iustin Pop

1910 ea03467c Iustin Pop
    """
1911 c4feafe8 Iustin Pop
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1912 f1da30e6 Michael Hanselmann
1913 58b22b6e Michael Hanselmann
  @classmethod
1914 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
1915 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1916 ea03467c Iustin Pop

1917 ea03467c Iustin Pop
    @type job_id: str
1918 ea03467c Iustin Pop
    @param job_id: the job identifier
1919 ea03467c Iustin Pop
    @rtype: str
1920 ea03467c Iustin Pop
    @return: the path to the archived job file
1921 ea03467c Iustin Pop

1922 ea03467c Iustin Pop
    """
1923 0411c011 Iustin Pop
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1924 0411c011 Iustin Pop
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1925 0cb94105 Michael Hanselmann
1926 cb66225d Michael Hanselmann
  @staticmethod
1927 cb66225d Michael Hanselmann
  def _GetJobIDsUnlocked(sort=True):
1928 911a495b Iustin Pop
    """Return all known job IDs.
1929 911a495b Iustin Pop

1930 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1931 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1932 ac0930b9 Iustin Pop
    extra IDs).
1933 ac0930b9 Iustin Pop

1934 85a1c57d Guido Trotter
    @type sort: boolean
1935 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1936 ea03467c Iustin Pop
    @rtype: list
1937 ea03467c Iustin Pop
    @return: the list of job IDs
1938 ea03467c Iustin Pop

1939 911a495b Iustin Pop
    """
1940 85a1c57d Guido Trotter
    jlist = []
1941 b5b8309d Guido Trotter
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1942 cb66225d Michael Hanselmann
      m = constants.JOB_FILE_RE.match(filename)
1943 85a1c57d Guido Trotter
      if m:
1944 85a1c57d Guido Trotter
        jlist.append(m.group(1))
1945 85a1c57d Guido Trotter
    if sort:
1946 85a1c57d Guido Trotter
      jlist = utils.NiceSort(jlist)
1947 f0d874fe Iustin Pop
    return jlist
1948 911a495b Iustin Pop
1949 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1950 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1951 ea03467c Iustin Pop

1952 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1953 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1954 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1955 ea03467c Iustin Pop

1956 ea03467c Iustin Pop
    @param job_id: the job id
1957 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1958 ea03467c Iustin Pop
    @return: either None or the job object
1959 ea03467c Iustin Pop

1960 ea03467c Iustin Pop
    """
1961 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
1962 5685c1a5 Michael Hanselmann
    if job:
1963 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
1964 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Found read-only job in memcache"
1965 5685c1a5 Michael Hanselmann
      return job
1966 ac0930b9 Iustin Pop
1967 3d6c5566 Guido Trotter
    try:
1968 194c8ca4 Michael Hanselmann
      job = self._LoadJobFromDisk(job_id, False)
1969 aa9f8167 Iustin Pop
      if job is None:
1970 aa9f8167 Iustin Pop
        return job
1971 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
1972 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
1973 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
1974 3d6c5566 Guido Trotter
      if old_path == new_path:
1975 3d6c5566 Guido Trotter
        # job already archived (future case)
1976 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
1977 3d6c5566 Guido Trotter
      else:
1978 3d6c5566 Guido Trotter
        # non-archived case
1979 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
1980 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
1981 3d6c5566 Guido Trotter
      return None
1982 162c8636 Guido Trotter
1983 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Job just loaded is not writable"
1984 c0f6d0d8 Michael Hanselmann
1985 162c8636 Guido Trotter
    self._memcache[job_id] = job
1986 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
1987 162c8636 Guido Trotter
    return job
1988 162c8636 Guido Trotter
1989 c0f6d0d8 Michael Hanselmann
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1990 162c8636 Guido Trotter
    """Load the given job file from disk.
1991 162c8636 Guido Trotter

1992 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1993 162c8636 Guido Trotter

1994 162c8636 Guido Trotter
    @type job_id: string
1995 162c8636 Guido Trotter
    @param job_id: job identifier
1996 194c8ca4 Michael Hanselmann
    @type try_archived: bool
1997 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
1998 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
1999 162c8636 Guido Trotter
    @return: either None or the job object
2000 162c8636 Guido Trotter

2001 162c8636 Guido Trotter
    """
2002 c0f6d0d8 Michael Hanselmann
    path_functions = [(self._GetJobPath, True)]
2003 194c8ca4 Michael Hanselmann
2004 194c8ca4 Michael Hanselmann
    if try_archived:
2005 c0f6d0d8 Michael Hanselmann
      path_functions.append((self._GetArchivedJobPath, False))
2006 194c8ca4 Michael Hanselmann
2007 194c8ca4 Michael Hanselmann
    raw_data = None
2008 c0f6d0d8 Michael Hanselmann
    writable_default = None
2009 194c8ca4 Michael Hanselmann
2010 c0f6d0d8 Michael Hanselmann
    for (fn, writable_default) in path_functions:
2011 194c8ca4 Michael Hanselmann
      filepath = fn(job_id)
2012 194c8ca4 Michael Hanselmann
      logging.debug("Loading job from %s", filepath)
2013 194c8ca4 Michael Hanselmann
      try:
2014 194c8ca4 Michael Hanselmann
        raw_data = utils.ReadFile(filepath)
2015 194c8ca4 Michael Hanselmann
      except EnvironmentError, err:
2016 194c8ca4 Michael Hanselmann
        if err.errno != errno.ENOENT:
2017 194c8ca4 Michael Hanselmann
          raise
2018 194c8ca4 Michael Hanselmann
      else:
2019 194c8ca4 Michael Hanselmann
        break
2020 194c8ca4 Michael Hanselmann
2021 194c8ca4 Michael Hanselmann
    if not raw_data:
2022 194c8ca4 Michael Hanselmann
      return None
2023 13998ef2 Michael Hanselmann
2024 c0f6d0d8 Michael Hanselmann
    if writable is None:
2025 c0f6d0d8 Michael Hanselmann
      writable = writable_default
2026 c0f6d0d8 Michael Hanselmann
2027 94ed59a5 Iustin Pop
    try:
2028 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
2029 c0f6d0d8 Michael Hanselmann
      job = _QueuedJob.Restore(self, data, writable)
2030 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
2031 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
2032 94ed59a5 Iustin Pop
2033 ac0930b9 Iustin Pop
    return job
2034 f1da30e6 Michael Hanselmann
2035 c0f6d0d8 Michael Hanselmann
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2036 0f9c08dc Guido Trotter
    """Load the given job file from disk.
2037 0f9c08dc Guido Trotter

2038 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2039 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
2040 0f9c08dc Guido Trotter
    exception is logged.
2041 0f9c08dc Guido Trotter

2042 0f9c08dc Guido Trotter
    @type job_id: string
2043 0f9c08dc Guido Trotter
    @param job_id: job identifier
2044 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2045 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2046 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
2047 0f9c08dc Guido Trotter
    @return: either None or the job object
2048 0f9c08dc Guido Trotter

2049 0f9c08dc Guido Trotter
    """
2050 0f9c08dc Guido Trotter
    try:
2051 c0f6d0d8 Michael Hanselmann
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2052 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
2053 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
2054 0f9c08dc Guido Trotter
      return None
2055 0f9c08dc Guido Trotter
2056 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
2057 20571a26 Guido Trotter
    """Update the queue size.
2058 20571a26 Guido Trotter

2059 20571a26 Guido Trotter
    """
2060 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2061 20571a26 Guido Trotter
2062 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2063 20571a26 Guido Trotter
  @_RequireOpenQueue
2064 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
2065 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
2066 3ccafd0e Iustin Pop

2067 ea03467c Iustin Pop
    @type drain_flag: boolean
2068 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
2069 ea03467c Iustin Pop

2070 3ccafd0e Iustin Pop
    """
2071 ff699aa9 Michael Hanselmann
    jstore.SetDrainFlag(drain_flag)
2072 20571a26 Guido Trotter
2073 20571a26 Guido Trotter
    self._drained = drain_flag
2074 20571a26 Guido Trotter
2075 3ccafd0e Iustin Pop
    return True
2076 3ccafd0e Iustin Pop
2077 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2078 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
2079 85f03e0d Michael Hanselmann
    """Create and store a new job.
2080 f1da30e6 Michael Hanselmann

2081 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
2082 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
2083 c3f0a12f Iustin Pop

2084 009e73d0 Iustin Pop
    @type job_id: job ID
2085 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
2086 c3f0a12f Iustin Pop
    @type ops: list
2087 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
2088 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
2089 7beb1e53 Guido Trotter
    @return: the job object to be queued
2090 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2091 e71c8147 Michael Hanselmann
    @raise errors.GenericError: If an opcode is not valid
2092 c3f0a12f Iustin Pop

2093 c3f0a12f Iustin Pop
    """
2094 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2095 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
2096 f87b405e Michael Hanselmann
2097 c0f6d0d8 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops, True)
2098 f1da30e6 Michael Hanselmann
2099 e71c8147 Michael Hanselmann
    # Check priority
2100 e71c8147 Michael Hanselmann
    for idx, op in enumerate(job.ops):
2101 e71c8147 Michael Hanselmann
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2102 e71c8147 Michael Hanselmann
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2103 e71c8147 Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2104 e71c8147 Michael Hanselmann
                                  " are %s" % (idx, op.priority, allowed))
2105 e71c8147 Michael Hanselmann
2106 b247c6fc Michael Hanselmann
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2107 b247c6fc Michael Hanselmann
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2108 b247c6fc Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2109 b247c6fc Michael Hanselmann
                                  " match %s: %s" %
2110 b247c6fc Michael Hanselmann
                                  (idx, opcodes.TNoRelativeJobDependencies,
2111 b247c6fc Michael Hanselmann
                                   dependencies))
2112 b247c6fc Michael Hanselmann
2113 f1da30e6 Michael Hanselmann
    # Write to disk
2114 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
2115 f1da30e6 Michael Hanselmann
2116 20571a26 Guido Trotter
    self._queue_size += 1
2117 20571a26 Guido Trotter
2118 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
2119 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
2120 ac0930b9 Iustin Pop
2121 7beb1e53 Guido Trotter
    return job
2122 f1da30e6 Michael Hanselmann
2123 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2124 2971c913 Iustin Pop
  @_RequireOpenQueue
2125 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2126 2971c913 Iustin Pop
  def SubmitJob(self, ops):
2127 2971c913 Iustin Pop
    """Create and store a new job.
2128 2971c913 Iustin Pop

2129 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2130 2971c913 Iustin Pop

2131 2971c913 Iustin Pop
    """
2132 b247c6fc Michael Hanselmann
    (job_id, ) = self._NewSerialsUnlocked(1)
2133 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2134 7beb1e53 Guido Trotter
    return job_id
2135 2971c913 Iustin Pop
2136 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2137 2971c913 Iustin Pop
  @_RequireOpenQueue
2138 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2139 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
2140 2971c913 Iustin Pop
    """Create and store multiple jobs.
2141 2971c913 Iustin Pop

2142 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2143 2971c913 Iustin Pop

2144 2971c913 Iustin Pop
    """
2145 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2146 b247c6fc Michael Hanselmann
2147 b247c6fc Michael Hanselmann
    (results, added_jobs) = \
2148 b247c6fc Michael Hanselmann
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2149 7b5c4a69 Michael Hanselmann
2150 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked(added_jobs)
2151 2971c913 Iustin Pop
2152 2971c913 Iustin Pop
    return results
2153 2971c913 Iustin Pop
2154 b247c6fc Michael Hanselmann
  @staticmethod
2155 b247c6fc Michael Hanselmann
  def _FormatSubmitError(msg, ops):
2156 b247c6fc Michael Hanselmann
    """Formats errors which occurred while submitting a job.
2157 b247c6fc Michael Hanselmann

2158 b247c6fc Michael Hanselmann
    """
2159 b247c6fc Michael Hanselmann
    return ("%s; opcodes %s" %
2160 b247c6fc Michael Hanselmann
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2161 b247c6fc Michael Hanselmann
2162 b247c6fc Michael Hanselmann
  @staticmethod
2163 b247c6fc Michael Hanselmann
  def _ResolveJobDependencies(resolve_fn, deps):
2164 b247c6fc Michael Hanselmann
    """Resolves relative job IDs in dependencies.
2165 b247c6fc Michael Hanselmann

2166 b247c6fc Michael Hanselmann
    @type resolve_fn: callable
2167 b247c6fc Michael Hanselmann
    @param resolve_fn: Function to resolve a relative job ID
2168 b247c6fc Michael Hanselmann
    @type deps: list
2169 b247c6fc Michael Hanselmann
    @param deps: Dependencies
2170 b247c6fc Michael Hanselmann
    @rtype: list
2171 b247c6fc Michael Hanselmann
    @return: Resolved dependencies
2172 b247c6fc Michael Hanselmann

2173 b247c6fc Michael Hanselmann
    """
2174 b247c6fc Michael Hanselmann
    result = []
2175 b247c6fc Michael Hanselmann
2176 b247c6fc Michael Hanselmann
    for (dep_job_id, dep_status) in deps:
2177 b247c6fc Michael Hanselmann
      if ht.TRelativeJobId(dep_job_id):
2178 b247c6fc Michael Hanselmann
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2179 b247c6fc Michael Hanselmann
        try:
2180 b247c6fc Michael Hanselmann
          job_id = resolve_fn(dep_job_id)
2181 b247c6fc Michael Hanselmann
        except IndexError:
2182 b247c6fc Michael Hanselmann
          # Abort
2183 b247c6fc Michael Hanselmann
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2184 b247c6fc Michael Hanselmann
      else:
2185 b247c6fc Michael Hanselmann
        job_id = dep_job_id
2186 b247c6fc Michael Hanselmann
2187 b247c6fc Michael Hanselmann
      result.append((job_id, dep_status))
2188 b247c6fc Michael Hanselmann
2189 b247c6fc Michael Hanselmann
    return (True, result)
2190 b247c6fc Michael Hanselmann
2191 b247c6fc Michael Hanselmann
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2192 b247c6fc Michael Hanselmann
    """Create and store multiple jobs.
2193 b247c6fc Michael Hanselmann

2194 b247c6fc Michael Hanselmann
    @see: L{_SubmitJobUnlocked}
2195 b247c6fc Michael Hanselmann

2196 b247c6fc Michael Hanselmann
    """
2197 b247c6fc Michael Hanselmann
    results = []
2198 b247c6fc Michael Hanselmann
    added_jobs = []
2199 b247c6fc Michael Hanselmann
2200 b247c6fc Michael Hanselmann
    def resolve_fn(job_idx, reljobid):
2201 b247c6fc Michael Hanselmann
      assert reljobid < 0
2202 b247c6fc Michael Hanselmann
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2203 b247c6fc Michael Hanselmann
2204 b247c6fc Michael Hanselmann
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2205 b247c6fc Michael Hanselmann
      for op in ops:
2206 b247c6fc Michael Hanselmann
        if getattr(op, opcodes.DEPEND_ATTR, None):
2207 b247c6fc Michael Hanselmann
          (status, data) = \
2208 b247c6fc Michael Hanselmann
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2209 b247c6fc Michael Hanselmann
                                         op.depends)
2210 b247c6fc Michael Hanselmann
          if not status:
2211 b247c6fc Michael Hanselmann
            # Abort resolving dependencies
2212 b247c6fc Michael Hanselmann
            assert ht.TNonEmptyString(data), "No error message"
2213 b247c6fc Michael Hanselmann
            break
2214 b247c6fc Michael Hanselmann
          # Use resolved dependencies
2215 b247c6fc Michael Hanselmann
          op.depends = data
2216 b247c6fc Michael Hanselmann
      else:
2217 b247c6fc Michael Hanselmann
        try:
2218 b247c6fc Michael Hanselmann
          job = self._SubmitJobUnlocked(job_id, ops)
2219 b247c6fc Michael Hanselmann
        except errors.GenericError, err:
2220 b247c6fc Michael Hanselmann
          status = False
2221 b247c6fc Michael Hanselmann
          data = self._FormatSubmitError(str(err), ops)
2222 b247c6fc Michael Hanselmann
        else:
2223 b247c6fc Michael Hanselmann
          status = True
2224 b247c6fc Michael Hanselmann
          data = job_id
2225 b247c6fc Michael Hanselmann
          added_jobs.append(job)
2226 b247c6fc Michael Hanselmann
2227 b247c6fc Michael Hanselmann
      results.append((status, data))
2228 b247c6fc Michael Hanselmann
2229 b247c6fc Michael Hanselmann
    return (results, added_jobs)
2230 b247c6fc Michael Hanselmann
2231 75d81fc8 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2232 7b5c4a69 Michael Hanselmann
  def _EnqueueJobs(self, jobs):
2233 7b5c4a69 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2234 7b5c4a69 Michael Hanselmann

2235 7b5c4a69 Michael Hanselmann
    @type jobs: list
2236 7b5c4a69 Michael Hanselmann
    @param jobs: List of all jobs
2237 7b5c4a69 Michael Hanselmann

2238 7b5c4a69 Michael Hanselmann
    """
2239 75d81fc8 Michael Hanselmann
    return self._EnqueueJobsUnlocked(jobs)
2240 75d81fc8 Michael Hanselmann
2241 75d81fc8 Michael Hanselmann
  def _EnqueueJobsUnlocked(self, jobs):
2242 75d81fc8 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2243 75d81fc8 Michael Hanselmann

2244 75d81fc8 Michael Hanselmann
    @type jobs: list
2245 75d81fc8 Michael Hanselmann
    @param jobs: List of all jobs
2246 75d81fc8 Michael Hanselmann

2247 75d81fc8 Michael Hanselmann
    """
2248 75d81fc8 Michael Hanselmann
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2249 7b5c4a69 Michael Hanselmann
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2250 7b5c4a69 Michael Hanselmann
                             priority=[job.CalcPriority() for job in jobs])
2251 7b5c4a69 Michael Hanselmann
2252 b95479a5 Michael Hanselmann
  def _GetJobStatusForDependencies(self, job_id):
2253 b95479a5 Michael Hanselmann
    """Gets the status of a job for dependencies.
2254 b95479a5 Michael Hanselmann

2255 b95479a5 Michael Hanselmann
    @type job_id: string
2256 b95479a5 Michael Hanselmann
    @param job_id: Job ID
2257 b95479a5 Michael Hanselmann
    @raise errors.JobLost: If job can't be found
2258 b95479a5 Michael Hanselmann

2259 b95479a5 Michael Hanselmann
    """
2260 b95479a5 Michael Hanselmann
    if not isinstance(job_id, basestring):
2261 b95479a5 Michael Hanselmann
      job_id = self._FormatJobID(job_id)
2262 b95479a5 Michael Hanselmann
2263 b95479a5 Michael Hanselmann
    # Not using in-memory cache as doing so would require an exclusive lock
2264 b95479a5 Michael Hanselmann
2265 b95479a5 Michael Hanselmann
    # Try to load from disk
2266 c0f6d0d8 Michael Hanselmann
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2267 c0f6d0d8 Michael Hanselmann
2268 17385bd2 Andrea Spadaccini
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2269 b95479a5 Michael Hanselmann
2270 b95479a5 Michael Hanselmann
    if job:
2271 b95479a5 Michael Hanselmann
      return job.CalcStatus()
2272 b95479a5 Michael Hanselmann
2273 b95479a5 Michael Hanselmann
    raise errors.JobLost("Job %s not found" % job_id)
2274 b95479a5 Michael Hanselmann
2275 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2276 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
2277 ea03467c Iustin Pop
    """Update a job's on disk storage.
2278 ea03467c Iustin Pop

2279 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
2280 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
2281 ea03467c Iustin Pop
    nodes.
2282 ea03467c Iustin Pop

2283 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
2284 ea03467c Iustin Pop
    @param job: the changed job
2285 4c36bdf5 Guido Trotter
    @type replicate: boolean
2286 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
2287 ea03467c Iustin Pop

2288 ea03467c Iustin Pop
    """
2289 66bd7445 Michael Hanselmann
    if __debug__:
2290 66bd7445 Michael Hanselmann
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2291 66bd7445 Michael Hanselmann
      assert (finalized ^ (job.end_timestamp is None))
2292 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't update read-only job"
2293 66bd7445 Michael Hanselmann
2294 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
2295 a182a3ed Michael Hanselmann
    data = serializer.DumpJson(job.Serialize())
2296 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
2297 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
2298 ac0930b9 Iustin Pop
2299 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2300 5c735209 Iustin Pop
                        timeout):
2301 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
2302 6c5a7090 Michael Hanselmann

2303 6c5a7090 Michael Hanselmann
    @type job_id: string
2304 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
2305 6c5a7090 Michael Hanselmann
    @type fields: list of strings
2306 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
2307 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
2308 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
2309 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
2310 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
2311 5c735209 Iustin Pop
    @type timeout: float
2312 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
2313 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
2314 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
2315 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
2316 ea03467c Iustin Pop

2317 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
2318 ea03467c Iustin Pop
        we instead return a special value,
2319 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2320 ea03467c Iustin Pop
        as such by the clients
2321 6c5a7090 Michael Hanselmann

2322 6c5a7090 Michael Hanselmann
    """
2323 e4cf42d4 Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2324 c0f6d0d8 Michael Hanselmann
                             writable=False)
2325 989a8bee Michael Hanselmann
2326 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
2327 989a8bee Michael Hanselmann
2328 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
2329 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
2330 dfe57c22 Michael Hanselmann
2331 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2332 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2333 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
2334 188c5e0a Michael Hanselmann
    """Cancels a job.
2335 188c5e0a Michael Hanselmann

2336 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
2337 ea03467c Iustin Pop

2338 188c5e0a Michael Hanselmann
    @type job_id: string
2339 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
2340 188c5e0a Michael Hanselmann

2341 188c5e0a Michael Hanselmann
    """
2342 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
2343 188c5e0a Michael Hanselmann
2344 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2345 188c5e0a Michael Hanselmann
    if not job:
2346 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2347 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
2348 fbf0262f Michael Hanselmann
2349 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Can't cancel read-only job"
2350 c0f6d0d8 Michael Hanselmann
2351 099b2870 Michael Hanselmann
    (success, msg) = job.Cancel()
2352 188c5e0a Michael Hanselmann
2353 099b2870 Michael Hanselmann
    if success:
2354 66bd7445 Michael Hanselmann
      # If the job was finalized (e.g. cancelled), this is the final write
2355 66bd7445 Michael Hanselmann
      # allowed. The job can be archived anytime.
2356 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
2357 fbf0262f Michael Hanselmann
2358 099b2870 Michael Hanselmann
    return (success, msg)
2359 fbf0262f Michael Hanselmann
2360 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
2361 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
2362 d7fd1f28 Michael Hanselmann
    """Archives jobs.
2363 c609f802 Michael Hanselmann

2364 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
2365 25e7b43f Iustin Pop
    @param jobs: Job objects
2366 d7fd1f28 Michael Hanselmann
    @rtype: int
2367 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
2368 c609f802 Michael Hanselmann

2369 c609f802 Michael Hanselmann
    """
2370 d7fd1f28 Michael Hanselmann
    archive_jobs = []
2371 d7fd1f28 Michael Hanselmann
    rename_files = []
2372 d7fd1f28 Michael Hanselmann
    for job in jobs:
2373 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't archive read-only job"
2374 c0f6d0d8 Michael Hanselmann
2375 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2376 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
2377 d7fd1f28 Michael Hanselmann
        continue
2378 c609f802 Michael Hanselmann
2379 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
2380 c609f802 Michael Hanselmann
2381 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
2382 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
2383 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
2384 c609f802 Michael Hanselmann
2385 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
2386 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
2387 f1da30e6 Michael Hanselmann
2388 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
2389 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
2390 d7fd1f28 Michael Hanselmann
2391 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2392 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
2393 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
2394 20571a26 Guido Trotter
    # archived jobs to fix this.
2395 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
2396 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
2397 78d12585 Michael Hanselmann
2398 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2399 07cd723a Iustin Pop
  @_RequireOpenQueue
2400 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
2401 07cd723a Iustin Pop
    """Archives a job.
2402 07cd723a Iustin Pop

2403 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2404 ea03467c Iustin Pop

2405 07cd723a Iustin Pop
    @type job_id: string
2406 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
2407 78d12585 Michael Hanselmann
    @rtype: bool
2408 78d12585 Michael Hanselmann
    @return: Whether job was archived
2409 07cd723a Iustin Pop

2410 07cd723a Iustin Pop
    """
2411 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
2412 78d12585 Michael Hanselmann
2413 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2414 78d12585 Michael Hanselmann
    if not job:
2415 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2416 78d12585 Michael Hanselmann
      return False
2417 78d12585 Michael Hanselmann
2418 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
2419 07cd723a Iustin Pop
2420 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2421 07cd723a Iustin Pop
  @_RequireOpenQueue
2422 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
2423 07cd723a Iustin Pop
    """Archives all jobs based on age.
2424 07cd723a Iustin Pop

2425 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
2426 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
2427 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
2428 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
2429 07cd723a Iustin Pop

2430 07cd723a Iustin Pop
    @type age: int
2431 07cd723a Iustin Pop
    @param age: the minimum age in seconds
2432 07cd723a Iustin Pop

2433 07cd723a Iustin Pop
    """
2434 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
2435 07cd723a Iustin Pop
2436 07cd723a Iustin Pop
    now = time.time()
2437 f8ad5591 Michael Hanselmann
    end_time = now + timeout
2438 f8ad5591 Michael Hanselmann
    archived_count = 0
2439 f8ad5591 Michael Hanselmann
    last_touched = 0
2440 f8ad5591 Michael Hanselmann
2441 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
2442 d7fd1f28 Michael Hanselmann
    pending = []
2443 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
2444 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
2445 f8ad5591 Michael Hanselmann
2446 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
2447 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
2448 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
2449 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
2450 f8ad5591 Michael Hanselmann
        break
2451 f8ad5591 Michael Hanselmann
2452 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
2453 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
2454 f8ad5591 Michael Hanselmann
      if job:
2455 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
2456 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
2457 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
2458 f8ad5591 Michael Hanselmann
          else:
2459 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
2460 07cd723a Iustin Pop
        else:
2461 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
2462 f8ad5591 Michael Hanselmann
2463 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
2464 d7fd1f28 Michael Hanselmann
          pending.append(job)
2465 d7fd1f28 Michael Hanselmann
2466 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
2467 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
2468 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
2469 d7fd1f28 Michael Hanselmann
            pending = []
2470 f8ad5591 Michael Hanselmann
2471 d7fd1f28 Michael Hanselmann
    if pending:
2472 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
2473 07cd723a Iustin Pop
2474 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
2475 07cd723a Iustin Pop
2476 e07f7f7a Michael Hanselmann
  def _Query(self, fields, qfilter):
2477 e07f7f7a Michael Hanselmann
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2478 e07f7f7a Michael Hanselmann
                       namefield="id")
2479 e07f7f7a Michael Hanselmann
2480 e07f7f7a Michael Hanselmann
    job_ids = qobj.RequestedNames()
2481 e07f7f7a Michael Hanselmann
2482 e07f7f7a Michael Hanselmann
    list_all = (job_ids is None)
2483 e07f7f7a Michael Hanselmann
2484 e07f7f7a Michael Hanselmann
    if list_all:
2485 e07f7f7a Michael Hanselmann
      # Since files are added to/removed from the queue atomically, there's no
2486 e07f7f7a Michael Hanselmann
      # risk of getting the job ids in an inconsistent state.
2487 e07f7f7a Michael Hanselmann
      job_ids = self._GetJobIDsUnlocked()
2488 e07f7f7a Michael Hanselmann
2489 e07f7f7a Michael Hanselmann
    jobs = []
2490 e07f7f7a Michael Hanselmann
2491 e07f7f7a Michael Hanselmann
    for job_id in job_ids:
2492 e07f7f7a Michael Hanselmann
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2493 e07f7f7a Michael Hanselmann
      if job is not None or not list_all:
2494 e07f7f7a Michael Hanselmann
        jobs.append((job_id, job))
2495 e07f7f7a Michael Hanselmann
2496 e07f7f7a Michael Hanselmann
    return (qobj, jobs, list_all)
2497 e07f7f7a Michael Hanselmann
2498 e07f7f7a Michael Hanselmann
  def QueryJobs(self, fields, qfilter):
2499 e07f7f7a Michael Hanselmann
    """Returns a list of jobs in queue.
2500 e07f7f7a Michael Hanselmann

2501 e07f7f7a Michael Hanselmann
    @type fields: sequence
2502 e07f7f7a Michael Hanselmann
    @param fields: List of wanted fields
2503 e07f7f7a Michael Hanselmann
    @type qfilter: None or query2 filter (list)
2504 e07f7f7a Michael Hanselmann
    @param qfilter: Query filter
2505 e07f7f7a Michael Hanselmann

2506 e07f7f7a Michael Hanselmann
    """
2507 e07f7f7a Michael Hanselmann
    (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2508 e07f7f7a Michael Hanselmann
2509 e07f7f7a Michael Hanselmann
    return query.GetQueryResponse(qobj, ctx, sort_by_name=sort_by_name)
2510 e07f7f7a Michael Hanselmann
2511 e07f7f7a Michael Hanselmann
  def OldStyleQueryJobs(self, job_ids, fields):
2512 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
2513 e2715f69 Michael Hanselmann

2514 ea03467c Iustin Pop
    @type job_ids: list
2515 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
2516 ea03467c Iustin Pop
    @type fields: list
2517 ea03467c Iustin Pop
    @param fields: names of fields to return
2518 ea03467c Iustin Pop
    @rtype: list
2519 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
2520 ea03467c Iustin Pop
        the requested fields
2521 e2715f69 Michael Hanselmann

2522 e2715f69 Michael Hanselmann
    """
2523 e07f7f7a Michael Hanselmann
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2524 e2715f69 Michael Hanselmann
2525 e07f7f7a Michael Hanselmann
    (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2526 e2715f69 Michael Hanselmann
2527 e07f7f7a Michael Hanselmann
    return qobj.OldStyleQuery(ctx, sort_by_name=sort_by_name)
2528 e2715f69 Michael Hanselmann
2529 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2530 6d5ea385 Michael Hanselmann
  def PrepareShutdown(self):
2531 6d5ea385 Michael Hanselmann
    """Prepare to stop the job queue.
2532 6d5ea385 Michael Hanselmann

2533 6d5ea385 Michael Hanselmann
    Disables execution of jobs in the workerpool and returns whether there are
2534 6d5ea385 Michael Hanselmann
    any jobs currently running. If the latter is the case, the job queue is not
2535 6d5ea385 Michael Hanselmann
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2536 6d5ea385 Michael Hanselmann
    be called without interfering with any job. Queued and unfinished jobs will
2537 6d5ea385 Michael Hanselmann
    be resumed next time.
2538 6d5ea385 Michael Hanselmann

2539 6d5ea385 Michael Hanselmann
    Once this function has been called no new job submissions will be accepted
2540 6d5ea385 Michael Hanselmann
    (see L{_RequireNonDrainedQueue}).
2541 6d5ea385 Michael Hanselmann

2542 6d5ea385 Michael Hanselmann
    @rtype: bool
2543 6d5ea385 Michael Hanselmann
    @return: Whether there are any running jobs
2544 6d5ea385 Michael Hanselmann

2545 6d5ea385 Michael Hanselmann
    """
2546 6d5ea385 Michael Hanselmann
    if self._accepting_jobs:
2547 6d5ea385 Michael Hanselmann
      self._accepting_jobs = False
2548 6d5ea385 Michael Hanselmann
2549 6d5ea385 Michael Hanselmann
      # Tell worker pool to stop processing pending tasks
2550 6d5ea385 Michael Hanselmann
      self._wpool.SetActive(False)
2551 6d5ea385 Michael Hanselmann
2552 6d5ea385 Michael Hanselmann
    return self._wpool.HasRunningTasks()
2553 6d5ea385 Michael Hanselmann
2554 942e2262 Michael Hanselmann
  def AcceptingJobsUnlocked(self):
2555 942e2262 Michael Hanselmann
    """Returns whether jobs are accepted.
2556 942e2262 Michael Hanselmann

2557 942e2262 Michael Hanselmann
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2558 942e2262 Michael Hanselmann
    queue is shutting down.
2559 942e2262 Michael Hanselmann

2560 942e2262 Michael Hanselmann
    @rtype: bool
2561 942e2262 Michael Hanselmann

2562 942e2262 Michael Hanselmann
    """
2563 942e2262 Michael Hanselmann
    return self._accepting_jobs
2564 942e2262 Michael Hanselmann
2565 6d5ea385 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2566 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2567 e2715f69 Michael Hanselmann
  def Shutdown(self):
2568 e2715f69 Michael Hanselmann
    """Stops the job queue.
2569 e2715f69 Michael Hanselmann

2570 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
2571 ea03467c Iustin Pop

2572 e2715f69 Michael Hanselmann
    """
2573 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
2574 85f03e0d Michael Hanselmann
2575 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
2576 a71f9c7d Guido Trotter
    self._queue_filelock = None