Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 690e509d

History | View | Annotate | Download (79.8 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 76b62028 Iustin Pop
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 e2715f69 Michael Hanselmann
import logging
33 f1da30e6 Michael Hanselmann
import errno
34 f1048938 Iustin Pop
import time
35 5685c1a5 Michael Hanselmann
import weakref
36 b95479a5 Michael Hanselmann
import threading
37 dfc8824a Michael Hanselmann
import itertools
38 99fb250b Michael Hanselmann
import operator
39 498ae1cc Iustin Pop
40 6c2549d6 Guido Trotter
try:
41 b459a848 Andrea Spadaccini
  # pylint: disable=E0611
42 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
43 6c2549d6 Guido Trotter
except ImportError:
44 6c2549d6 Guido Trotter
  import pyinotify
45 6c2549d6 Guido Trotter
46 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
47 e2715f69 Michael Hanselmann
from ganeti import constants
48 f1da30e6 Michael Hanselmann
from ganeti import serializer
49 e2715f69 Michael Hanselmann
from ganeti import workerpool
50 99bd4f0a Guido Trotter
from ganeti import locking
51 f1da30e6 Michael Hanselmann
from ganeti import opcodes
52 580b1fdd Jose A. Lopes
from ganeti import opcodes_base
53 7a1ecaed Iustin Pop
from ganeti import errors
54 e2715f69 Michael Hanselmann
from ganeti import mcpu
55 7996a135 Iustin Pop
from ganeti import utils
56 04ab05ce Michael Hanselmann
from ganeti import jstore
57 c3f0a12f Iustin Pop
from ganeti import rpc
58 82b22e19 René Nussbaumer
from ganeti import runtime
59 a744b676 Manuel Franceschini
from ganeti import netutils
60 989a8bee Michael Hanselmann
from ganeti import compat
61 b95479a5 Michael Hanselmann
from ganeti import ht
62 a06c6ae8 Michael Hanselmann
from ganeti import query
63 a06c6ae8 Michael Hanselmann
from ganeti import qlang
64 e2b4a7ba Michael Hanselmann
from ganeti import pathutils
65 cffbbae7 Michael Hanselmann
from ganeti import vcluster
66 e2715f69 Michael Hanselmann
67 fbf0262f Michael Hanselmann
68 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
69 e2715f69 Michael Hanselmann
70 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
71 ebb80afa Guido Trotter
_LOCK = "_lock"
72 ebb80afa Guido Trotter
_QUEUE = "_queue"
73 99bd4f0a Guido Trotter
74 99fb250b Michael Hanselmann
#: Retrieves "id" attribute
75 99fb250b Michael Hanselmann
_GetIdAttr = operator.attrgetter("id")
76 99fb250b Michael Hanselmann
77 498ae1cc Iustin Pop
78 9728ae5d Iustin Pop
class CancelJob(Exception):
79 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
80 fbf0262f Michael Hanselmann

81 fbf0262f Michael Hanselmann
  """
82 fbf0262f Michael Hanselmann
83 fbf0262f Michael Hanselmann
84 942e2262 Michael Hanselmann
class QueueShutdown(Exception):
85 942e2262 Michael Hanselmann
  """Special exception to abort a job when the job queue is shutting down.
86 942e2262 Michael Hanselmann

87 942e2262 Michael Hanselmann
  """
88 942e2262 Michael Hanselmann
89 942e2262 Michael Hanselmann
90 70552c46 Michael Hanselmann
def TimeStampNow():
91 ea03467c Iustin Pop
  """Returns the current timestamp.
92 ea03467c Iustin Pop

93 ea03467c Iustin Pop
  @rtype: tuple
94 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
95 ea03467c Iustin Pop

96 ea03467c Iustin Pop
  """
97 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
98 70552c46 Michael Hanselmann
99 70552c46 Michael Hanselmann
100 cffbbae7 Michael Hanselmann
def _CallJqUpdate(runner, names, file_name, content):
101 cffbbae7 Michael Hanselmann
  """Updates job queue file after virtualizing filename.
102 cffbbae7 Michael Hanselmann

103 cffbbae7 Michael Hanselmann
  """
104 cffbbae7 Michael Hanselmann
  virt_file_name = vcluster.MakeVirtualPath(file_name)
105 cffbbae7 Michael Hanselmann
  return runner.call_jobqueue_update(names, virt_file_name, content)
106 cffbbae7 Michael Hanselmann
107 cffbbae7 Michael Hanselmann
108 a06c6ae8 Michael Hanselmann
class _SimpleJobQuery:
109 a06c6ae8 Michael Hanselmann
  """Wrapper for job queries.
110 a06c6ae8 Michael Hanselmann

111 a06c6ae8 Michael Hanselmann
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
112 a06c6ae8 Michael Hanselmann

113 a06c6ae8 Michael Hanselmann
  """
114 a06c6ae8 Michael Hanselmann
  def __init__(self, fields):
115 a06c6ae8 Michael Hanselmann
    """Initializes this class.
116 a06c6ae8 Michael Hanselmann

117 a06c6ae8 Michael Hanselmann
    """
118 a06c6ae8 Michael Hanselmann
    self._query = query.Query(query.JOB_FIELDS, fields)
119 a06c6ae8 Michael Hanselmann
120 a06c6ae8 Michael Hanselmann
  def __call__(self, job):
121 a06c6ae8 Michael Hanselmann
    """Executes a job query using cached field list.
122 a06c6ae8 Michael Hanselmann

123 a06c6ae8 Michael Hanselmann
    """
124 a06c6ae8 Michael Hanselmann
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
125 a06c6ae8 Michael Hanselmann
126 a06c6ae8 Michael Hanselmann
127 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
128 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
129 e2715f69 Michael Hanselmann

130 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
131 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
132 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
133 ea03467c Iustin Pop
  @ivar status: the current status
134 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
135 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
136 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
137 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
138 f1048938 Iustin Pop

139 e2715f69 Michael Hanselmann
  """
140 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
141 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
142 66d895a8 Iustin Pop
               "__weakref__"]
143 66d895a8 Iustin Pop
144 85f03e0d Michael Hanselmann
  def __init__(self, op):
145 66abb9ff Michael Hanselmann
    """Initializes instances of this class.
146 ea03467c Iustin Pop

147 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
148 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
149 ea03467c Iustin Pop

150 ea03467c Iustin Pop
    """
151 85f03e0d Michael Hanselmann
    self.input = op
152 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
153 85f03e0d Michael Hanselmann
    self.result = None
154 85f03e0d Michael Hanselmann
    self.log = []
155 70552c46 Michael Hanselmann
    self.start_timestamp = None
156 b9b5abcb Iustin Pop
    self.exec_timestamp = None
157 70552c46 Michael Hanselmann
    self.end_timestamp = None
158 f1da30e6 Michael Hanselmann
159 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
160 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
161 8f5c488d Michael Hanselmann
162 f1da30e6 Michael Hanselmann
  @classmethod
163 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
164 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
165 ea03467c Iustin Pop

166 ea03467c Iustin Pop
    @type state: dict
167 ea03467c Iustin Pop
    @param state: the serialized state
168 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
169 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
170 ea03467c Iustin Pop

171 ea03467c Iustin Pop
    """
172 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
173 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
174 85f03e0d Michael Hanselmann
    obj.status = state["status"]
175 85f03e0d Michael Hanselmann
    obj.result = state["result"]
176 85f03e0d Michael Hanselmann
    obj.log = state["log"]
177 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
178 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
179 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
180 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
181 f1da30e6 Michael Hanselmann
    return obj
182 f1da30e6 Michael Hanselmann
183 f1da30e6 Michael Hanselmann
  def Serialize(self):
184 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
185 ea03467c Iustin Pop

186 ea03467c Iustin Pop
    @rtype: dict
187 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
188 ea03467c Iustin Pop

189 ea03467c Iustin Pop
    """
190 6c5a7090 Michael Hanselmann
    return {
191 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
192 6c5a7090 Michael Hanselmann
      "status": self.status,
193 6c5a7090 Michael Hanselmann
      "result": self.result,
194 6c5a7090 Michael Hanselmann
      "log": self.log,
195 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
196 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
197 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
198 8f5c488d Michael Hanselmann
      "priority": self.priority,
199 6c5a7090 Michael Hanselmann
      }
200 f1048938 Iustin Pop
201 e2715f69 Michael Hanselmann
202 e2715f69 Michael Hanselmann
class _QueuedJob(object):
203 e2715f69 Michael Hanselmann
  """In-memory job representation.
204 e2715f69 Michael Hanselmann

205 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
206 ea03467c Iustin Pop
  be taken care of by users of this class.
207 ea03467c Iustin Pop

208 ea03467c Iustin Pop
  @type queue: L{JobQueue}
209 ea03467c Iustin Pop
  @ivar queue: the parent queue
210 ea03467c Iustin Pop
  @ivar id: the job ID
211 ea03467c Iustin Pop
  @type ops: list
212 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
213 ea03467c Iustin Pop
  @type log_serial: int
214 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
215 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
216 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
217 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
218 c0f6d0d8 Michael Hanselmann
  @ivar writable: Whether the job is allowed to be modified
219 e2715f69 Michael Hanselmann

220 e2715f69 Michael Hanselmann
  """
221 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
222 26d3fd2f Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
223 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
224 8a3cd185 Michael Hanselmann
               "__weakref__", "processor_lock", "writable", "archived"]
225 66d895a8 Iustin Pop
226 e0f2bf1e Michele Tartara
  def _AddReasons(self):
227 e0f2bf1e Michele Tartara
    """Extend the reason trail
228 e0f2bf1e Michele Tartara

229 e0f2bf1e Michele Tartara
    Add the reason for all the opcodes of this job to be executed.
230 e0f2bf1e Michele Tartara

231 e0f2bf1e Michele Tartara
    """
232 e0f2bf1e Michele Tartara
    count = 0
233 e0f2bf1e Michele Tartara
    for queued_op in self.ops:
234 e0f2bf1e Michele Tartara
      op = queued_op.input
235 580b1fdd Jose A. Lopes
      reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__)
236 e0f2bf1e Michele Tartara
      reason_text = "job=%d;index=%d" % (self.id, count)
237 e0f2bf1e Michele Tartara
      reason = getattr(op, "reason", [])
238 e0f2bf1e Michele Tartara
      reason.append((reason_src, reason_text, utils.EpochNano()))
239 e0f2bf1e Michele Tartara
      op.reason = reason
240 e0f2bf1e Michele Tartara
      count = count + 1
241 e0f2bf1e Michele Tartara
242 c0f6d0d8 Michael Hanselmann
  def __init__(self, queue, job_id, ops, writable):
243 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
244 ea03467c Iustin Pop

245 ea03467c Iustin Pop
    @type queue: L{JobQueue}
246 ea03467c Iustin Pop
    @param queue: our parent queue
247 ea03467c Iustin Pop
    @type job_id: job_id
248 ea03467c Iustin Pop
    @param job_id: our job id
249 ea03467c Iustin Pop
    @type ops: list
250 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
251 ea03467c Iustin Pop
        in _QueuedOpCodes
252 c0f6d0d8 Michael Hanselmann
    @type writable: bool
253 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
254 ea03467c Iustin Pop

255 ea03467c Iustin Pop
    """
256 e2715f69 Michael Hanselmann
    if not ops:
257 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
258 e2715f69 Michael Hanselmann
259 85f03e0d Michael Hanselmann
    self.queue = queue
260 76b62028 Iustin Pop
    self.id = int(job_id)
261 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
262 e0f2bf1e Michele Tartara
    self._AddReasons()
263 6c5a7090 Michael Hanselmann
    self.log_serial = 0
264 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
265 c56ec146 Iustin Pop
    self.start_timestamp = None
266 c56ec146 Iustin Pop
    self.end_timestamp = None
267 8a3cd185 Michael Hanselmann
    self.archived = False
268 6c5a7090 Michael Hanselmann
269 c0f6d0d8 Michael Hanselmann
    self._InitInMemory(self, writable)
270 fa4aa6b4 Michael Hanselmann
271 8a3cd185 Michael Hanselmann
    assert not self.archived, "New jobs can not be marked as archived"
272 8a3cd185 Michael Hanselmann
273 fa4aa6b4 Michael Hanselmann
  @staticmethod
274 c0f6d0d8 Michael Hanselmann
  def _InitInMemory(obj, writable):
275 fa4aa6b4 Michael Hanselmann
    """Initializes in-memory variables.
276 fa4aa6b4 Michael Hanselmann

277 fa4aa6b4 Michael Hanselmann
    """
278 c0f6d0d8 Michael Hanselmann
    obj.writable = writable
279 03b63608 Michael Hanselmann
    obj.ops_iter = None
280 26d3fd2f Michael Hanselmann
    obj.cur_opctx = None
281 f8a4adfa Michael Hanselmann
282 f8a4adfa Michael Hanselmann
    # Read-only jobs are not processed and therefore don't need a lock
283 f8a4adfa Michael Hanselmann
    if writable:
284 f8a4adfa Michael Hanselmann
      obj.processor_lock = threading.Lock()
285 f8a4adfa Michael Hanselmann
    else:
286 f8a4adfa Michael Hanselmann
      obj.processor_lock = None
287 be760ba8 Michael Hanselmann
288 9fa2e150 Michael Hanselmann
  def __repr__(self):
289 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
290 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
291 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
292 9fa2e150 Michael Hanselmann
293 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
294 9fa2e150 Michael Hanselmann
295 f1da30e6 Michael Hanselmann
  @classmethod
296 8a3cd185 Michael Hanselmann
  def Restore(cls, queue, state, writable, archived):
297 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
298 ea03467c Iustin Pop

299 ea03467c Iustin Pop
    @type queue: L{JobQueue}
300 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
301 ea03467c Iustin Pop
    @type state: dict
302 ea03467c Iustin Pop
    @param state: the serialized state
303 c0f6d0d8 Michael Hanselmann
    @type writable: bool
304 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
305 8a3cd185 Michael Hanselmann
    @type archived: bool
306 8a3cd185 Michael Hanselmann
    @param archived: Whether job was already archived
307 ea03467c Iustin Pop
    @rtype: _JobQueue
308 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
309 ea03467c Iustin Pop

310 ea03467c Iustin Pop
    """
311 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
312 85f03e0d Michael Hanselmann
    obj.queue = queue
313 76b62028 Iustin Pop
    obj.id = int(state["id"])
314 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
315 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
316 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
317 8a3cd185 Michael Hanselmann
    obj.archived = archived
318 6c5a7090 Michael Hanselmann
319 6c5a7090 Michael Hanselmann
    obj.ops = []
320 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
321 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
322 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
323 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
324 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
325 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
326 6c5a7090 Michael Hanselmann
327 c0f6d0d8 Michael Hanselmann
    cls._InitInMemory(obj, writable)
328 be760ba8 Michael Hanselmann
329 f1da30e6 Michael Hanselmann
    return obj
330 f1da30e6 Michael Hanselmann
331 f1da30e6 Michael Hanselmann
  def Serialize(self):
332 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
333 ea03467c Iustin Pop

334 ea03467c Iustin Pop
    @rtype: dict
335 ea03467c Iustin Pop
    @return: the serialized state
336 ea03467c Iustin Pop

337 ea03467c Iustin Pop
    """
338 f1da30e6 Michael Hanselmann
    return {
339 f1da30e6 Michael Hanselmann
      "id": self.id,
340 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
341 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
342 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
343 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
344 f1da30e6 Michael Hanselmann
      }
345 f1da30e6 Michael Hanselmann
346 85f03e0d Michael Hanselmann
  def CalcStatus(self):
347 ea03467c Iustin Pop
    """Compute the status of this job.
348 ea03467c Iustin Pop

349 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
350 ea03467c Iustin Pop
    based on their status, computes the job status.
351 ea03467c Iustin Pop

352 ea03467c Iustin Pop
    The algorithm is:
353 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
354 ea03467c Iustin Pop
        status will be the same
355 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
356 ea03467c Iustin Pop
          - waitlock
357 fbf0262f Michael Hanselmann
          - canceling
358 ea03467c Iustin Pop
          - running
359 ea03467c Iustin Pop

360 ea03467c Iustin Pop
        will determine the job status
361 ea03467c Iustin Pop

362 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
363 ea03467c Iustin Pop
        and the job status will be the same
364 ea03467c Iustin Pop

365 ea03467c Iustin Pop
    @return: the job status
366 ea03467c Iustin Pop

367 ea03467c Iustin Pop
    """
368 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
369 e2715f69 Michael Hanselmann
370 e2715f69 Michael Hanselmann
    all_success = True
371 85f03e0d Michael Hanselmann
    for op in self.ops:
372 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
373 e2715f69 Michael Hanselmann
        continue
374 e2715f69 Michael Hanselmann
375 e2715f69 Michael Hanselmann
      all_success = False
376 e2715f69 Michael Hanselmann
377 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
378 e2715f69 Michael Hanselmann
        pass
379 47099cd1 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING:
380 47099cd1 Michael Hanselmann
        status = constants.JOB_STATUS_WAITING
381 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
382 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
383 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
384 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
385 fbf0262f Michael Hanselmann
        break
386 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
387 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
388 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
389 f1da30e6 Michael Hanselmann
        break
390 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
391 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
392 4cb1d919 Michael Hanselmann
        break
393 e2715f69 Michael Hanselmann
394 e2715f69 Michael Hanselmann
    if all_success:
395 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
396 e2715f69 Michael Hanselmann
397 e2715f69 Michael Hanselmann
    return status
398 e2715f69 Michael Hanselmann
399 8f5c488d Michael Hanselmann
  def CalcPriority(self):
400 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
401 8f5c488d Michael Hanselmann

402 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
403 8f5c488d Michael Hanselmann
    priority is used.
404 8f5c488d Michael Hanselmann

405 8f5c488d Michael Hanselmann
    @rtype: int
406 8f5c488d Michael Hanselmann

407 8f5c488d Michael Hanselmann
    """
408 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
409 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
410 8f5c488d Michael Hanselmann
411 8f5c488d Michael Hanselmann
    if not priorities:
412 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
413 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
414 8f5c488d Michael Hanselmann
415 8f5c488d Michael Hanselmann
    return min(priorities)
416 8f5c488d Michael Hanselmann
417 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
418 ea03467c Iustin Pop
    """Selectively returns the log entries.
419 ea03467c Iustin Pop

420 ea03467c Iustin Pop
    @type newer_than: None or int
421 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
422 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
423 ea03467c Iustin Pop
        than this value
424 ea03467c Iustin Pop
    @rtype: list
425 ea03467c Iustin Pop
    @return: the list of the log entries selected
426 ea03467c Iustin Pop

427 ea03467c Iustin Pop
    """
428 6c5a7090 Michael Hanselmann
    if newer_than is None:
429 6c5a7090 Michael Hanselmann
      serial = -1
430 6c5a7090 Michael Hanselmann
    else:
431 6c5a7090 Michael Hanselmann
      serial = newer_than
432 6c5a7090 Michael Hanselmann
433 6c5a7090 Michael Hanselmann
    entries = []
434 6c5a7090 Michael Hanselmann
    for op in self.ops:
435 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
436 6c5a7090 Michael Hanselmann
437 6c5a7090 Michael Hanselmann
    return entries
438 6c5a7090 Michael Hanselmann
439 6a290889 Guido Trotter
  def GetInfo(self, fields):
440 6a290889 Guido Trotter
    """Returns information about a job.
441 6a290889 Guido Trotter

442 6a290889 Guido Trotter
    @type fields: list
443 6a290889 Guido Trotter
    @param fields: names of fields to return
444 6a290889 Guido Trotter
    @rtype: list
445 6a290889 Guido Trotter
    @return: list with one element for each field
446 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
447 6a290889 Guido Trotter
        has been passed
448 6a290889 Guido Trotter

449 6a290889 Guido Trotter
    """
450 a06c6ae8 Michael Hanselmann
    return _SimpleJobQuery(fields)(self)
451 6a290889 Guido Trotter
452 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
453 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
454 34327f51 Iustin Pop

455 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
456 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
457 34327f51 Iustin Pop
    finalised are not changed.
458 34327f51 Iustin Pop

459 34327f51 Iustin Pop
    @param status: a given opcode status
460 34327f51 Iustin Pop
    @param result: the opcode result
461 34327f51 Iustin Pop

462 34327f51 Iustin Pop
    """
463 747f6113 Michael Hanselmann
    not_marked = True
464 747f6113 Michael Hanselmann
    for op in self.ops:
465 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
466 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
467 747f6113 Michael Hanselmann
        continue
468 747f6113 Michael Hanselmann
      op.status = status
469 747f6113 Michael Hanselmann
      op.result = result
470 747f6113 Michael Hanselmann
      not_marked = False
471 34327f51 Iustin Pop
472 66bd7445 Michael Hanselmann
  def Finalize(self):
473 66bd7445 Michael Hanselmann
    """Marks the job as finalized.
474 66bd7445 Michael Hanselmann

475 66bd7445 Michael Hanselmann
    """
476 66bd7445 Michael Hanselmann
    self.end_timestamp = TimeStampNow()
477 66bd7445 Michael Hanselmann
478 099b2870 Michael Hanselmann
  def Cancel(self):
479 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
480 a0d2fe2c Michael Hanselmann

481 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
482 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
483 a0d2fe2c Michael Hanselmann
      as canceling and a text message
484 a0d2fe2c Michael Hanselmann

485 a0d2fe2c Michael Hanselmann
    """
486 099b2870 Michael Hanselmann
    status = self.CalcStatus()
487 099b2870 Michael Hanselmann
488 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
489 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
490 099b2870 Michael Hanselmann
                             "Job canceled by request")
491 66bd7445 Michael Hanselmann
      self.Finalize()
492 86b16e9d Michael Hanselmann
      return (True, "Job %s canceled" % self.id)
493 099b2870 Michael Hanselmann
494 47099cd1 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITING:
495 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
496 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
497 86b16e9d Michael Hanselmann
      return (True, "Job %s will be canceled" % self.id)
498 099b2870 Michael Hanselmann
499 86b16e9d Michael Hanselmann
    else:
500 86b16e9d Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
501 86b16e9d Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
502 099b2870 Michael Hanselmann
503 4679547e Michael Hanselmann
  def ChangePriority(self, priority):
504 4679547e Michael Hanselmann
    """Changes the job priority.
505 4679547e Michael Hanselmann

506 4679547e Michael Hanselmann
    @type priority: int
507 4679547e Michael Hanselmann
    @param priority: New priority
508 4679547e Michael Hanselmann
    @rtype: tuple; (bool, string)
509 4679547e Michael Hanselmann
    @return: Boolean describing whether job's priority was successfully changed
510 4679547e Michael Hanselmann
      and a text message
511 4679547e Michael Hanselmann

512 4679547e Michael Hanselmann
    """
513 4679547e Michael Hanselmann
    status = self.CalcStatus()
514 4679547e Michael Hanselmann
515 4679547e Michael Hanselmann
    if status in constants.JOBS_FINALIZED:
516 4679547e Michael Hanselmann
      return (False, "Job %s is finished" % self.id)
517 4679547e Michael Hanselmann
    elif status == constants.JOB_STATUS_CANCELING:
518 4679547e Michael Hanselmann
      return (False, "Job %s is cancelling" % self.id)
519 4679547e Michael Hanselmann
    else:
520 4679547e Michael Hanselmann
      assert status in (constants.JOB_STATUS_QUEUED,
521 4679547e Michael Hanselmann
                        constants.JOB_STATUS_WAITING,
522 4679547e Michael Hanselmann
                        constants.JOB_STATUS_RUNNING)
523 4679547e Michael Hanselmann
524 4679547e Michael Hanselmann
      changed = False
525 4679547e Michael Hanselmann
      for op in self.ops:
526 4679547e Michael Hanselmann
        if (op.status == constants.OP_STATUS_RUNNING or
527 4679547e Michael Hanselmann
            op.status in constants.OPS_FINALIZED):
528 4679547e Michael Hanselmann
          assert not changed, \
529 4679547e Michael Hanselmann
            ("Found opcode for which priority should not be changed after"
530 4679547e Michael Hanselmann
             " priority has been changed for previous opcodes")
531 4679547e Michael Hanselmann
          continue
532 4679547e Michael Hanselmann
533 4679547e Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
534 4679547e Michael Hanselmann
                             constants.OP_STATUS_WAITING)
535 4679547e Michael Hanselmann
536 4679547e Michael Hanselmann
        changed = True
537 4679547e Michael Hanselmann
538 3c631ea2 Michael Hanselmann
        # Set new priority (doesn't modify opcode input)
539 4679547e Michael Hanselmann
        op.priority = priority
540 4679547e Michael Hanselmann
541 4679547e Michael Hanselmann
      if changed:
542 4679547e Michael Hanselmann
        return (True, ("Priorities of pending opcodes for job %s have been"
543 4679547e Michael Hanselmann
                       " changed to %s" % (self.id, priority)))
544 4679547e Michael Hanselmann
      else:
545 4679547e Michael Hanselmann
        return (False, "Job %s had no pending opcodes" % self.id)
546 4679547e Michael Hanselmann
547 f1048938 Iustin Pop
548 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
549 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
550 031a3e57 Michael Hanselmann
    """Initializes this class.
551 ea03467c Iustin Pop

552 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
553 031a3e57 Michael Hanselmann
    @param queue: Job queue
554 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
555 031a3e57 Michael Hanselmann
    @param job: Job object
556 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
557 031a3e57 Michael Hanselmann
    @param op: OpCode
558 031a3e57 Michael Hanselmann

559 031a3e57 Michael Hanselmann
    """
560 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
561 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
562 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
563 031a3e57 Michael Hanselmann
564 031a3e57 Michael Hanselmann
    self._queue = queue
565 031a3e57 Michael Hanselmann
    self._job = job
566 031a3e57 Michael Hanselmann
    self._op = op
567 031a3e57 Michael Hanselmann
568 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
569 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
570 dc1e2262 Michael Hanselmann

571 dc1e2262 Michael Hanselmann
    """
572 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
573 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
574 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
575 dc1e2262 Michael Hanselmann
      raise CancelJob()
576 dc1e2262 Michael Hanselmann
577 942e2262 Michael Hanselmann
    # See if queue is shutting down
578 942e2262 Michael Hanselmann
    if not self._queue.AcceptingJobsUnlocked():
579 942e2262 Michael Hanselmann
      logging.debug("Queue is shutting down")
580 942e2262 Michael Hanselmann
      raise QueueShutdown()
581 942e2262 Michael Hanselmann
582 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
583 031a3e57 Michael Hanselmann
  def NotifyStart(self):
584 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
585 e92376d7 Iustin Pop

586 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
587 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
588 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
589 47099cd1 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
590 e92376d7 Iustin Pop

591 e92376d7 Iustin Pop
    """
592 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
593 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
594 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
595 fbf0262f Michael Hanselmann
596 271daef8 Iustin Pop
    # Cancel here if we were asked to
597 dc1e2262 Michael Hanselmann
    self._CheckCancel()
598 fbf0262f Michael Hanselmann
599 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
600 9bdab621 Michael Hanselmann
601 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
602 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
603 271daef8 Iustin Pop
604 271daef8 Iustin Pop
    # And finally replicate the job status
605 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
606 031a3e57 Michael Hanselmann
607 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
608 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
609 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
610 9bf5e01f Guido Trotter

611 9bf5e01f Guido Trotter
    """
612 9bf5e01f Guido Trotter
    self._job.log_serial += 1
613 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
614 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
615 9bf5e01f Guido Trotter
616 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
617 031a3e57 Michael Hanselmann
    """Append a log entry.
618 031a3e57 Michael Hanselmann

619 031a3e57 Michael Hanselmann
    """
620 031a3e57 Michael Hanselmann
    assert len(args) < 3
621 031a3e57 Michael Hanselmann
622 031a3e57 Michael Hanselmann
    if len(args) == 1:
623 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
624 031a3e57 Michael Hanselmann
      log_msg = args[0]
625 031a3e57 Michael Hanselmann
    else:
626 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
627 031a3e57 Michael Hanselmann
628 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
629 031a3e57 Michael Hanselmann
    # precision.
630 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
631 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
632 031a3e57 Michael Hanselmann
633 e4e59de8 Michael Hanselmann
  def CurrentPriority(self):
634 e4e59de8 Michael Hanselmann
    """Returns current priority for opcode.
635 ef2df7d3 Michael Hanselmann

636 ef2df7d3 Michael Hanselmann
    """
637 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
638 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
639 dc1e2262 Michael Hanselmann
640 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
641 dc1e2262 Michael Hanselmann
    self._CheckCancel()
642 dc1e2262 Michael Hanselmann
643 e4e59de8 Michael Hanselmann
    return self._op.priority
644 e4e59de8 Michael Hanselmann
645 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
646 6a373640 Michael Hanselmann
    """Submits jobs for processing.
647 6a373640 Michael Hanselmann

648 6a373640 Michael Hanselmann
    See L{JobQueue.SubmitManyJobs}.
649 6a373640 Michael Hanselmann

650 6a373640 Michael Hanselmann
    """
651 6a373640 Michael Hanselmann
    # Locking is done in job queue
652 6a373640 Michael Hanselmann
    return self._queue.SubmitManyJobs(jobs)
653 6a373640 Michael Hanselmann
654 031a3e57 Michael Hanselmann
655 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
656 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
657 989a8bee Michael Hanselmann
    """Initializes this class.
658 6c2549d6 Guido Trotter

659 989a8bee Michael Hanselmann
    @type fields: list of strings
660 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
661 989a8bee Michael Hanselmann
    @type prev_job_info: string
662 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
663 989a8bee Michael Hanselmann
    @type prev_log_serial: string
664 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
665 6c2549d6 Guido Trotter

666 989a8bee Michael Hanselmann
    """
667 dc2879ea Michael Hanselmann
    self._squery = _SimpleJobQuery(fields)
668 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
669 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
670 6c2549d6 Guido Trotter
671 989a8bee Michael Hanselmann
  def __call__(self, job):
672 989a8bee Michael Hanselmann
    """Checks whether job has changed.
673 6c2549d6 Guido Trotter

674 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
675 989a8bee Michael Hanselmann
    @param job: Job object
676 6c2549d6 Guido Trotter

677 6c2549d6 Guido Trotter
    """
678 c0f6d0d8 Michael Hanselmann
    assert not job.writable, "Expected read-only job"
679 c0f6d0d8 Michael Hanselmann
680 989a8bee Michael Hanselmann
    status = job.CalcStatus()
681 dc2879ea Michael Hanselmann
    job_info = self._squery(job)
682 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
683 6c2549d6 Guido Trotter
684 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
685 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
686 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
687 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
688 6c2549d6 Guido Trotter
    # significantly different.
689 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
690 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
691 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
692 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
693 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
694 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
695 6c2549d6 Guido Trotter
696 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
697 6c2549d6 Guido Trotter
    # no changes.
698 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
699 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
700 47099cd1 Michael Hanselmann
                       constants.JOB_STATUS_WAITING) or
701 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
702 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
703 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
704 989a8bee Michael Hanselmann
      return (job_info, log_entries)
705 6c2549d6 Guido Trotter
706 989a8bee Michael Hanselmann
    return None
707 989a8bee Michael Hanselmann
708 989a8bee Michael Hanselmann
709 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
710 383477e9 Michael Hanselmann
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
711 989a8bee Michael Hanselmann
    """Initializes this class.
712 989a8bee Michael Hanselmann

713 989a8bee Michael Hanselmann
    @type filename: string
714 989a8bee Michael Hanselmann
    @param filename: Path to job file
715 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
716 6c2549d6 Guido Trotter

717 989a8bee Michael Hanselmann
    """
718 383477e9 Michael Hanselmann
    self._wm = _inotify_wm_cls()
719 989a8bee Michael Hanselmann
    self._inotify_handler = \
720 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
721 989a8bee Michael Hanselmann
    self._notifier = \
722 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
723 989a8bee Michael Hanselmann
    try:
724 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
725 989a8bee Michael Hanselmann
    except Exception:
726 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
727 989a8bee Michael Hanselmann
      self._notifier.stop()
728 989a8bee Michael Hanselmann
      raise
729 989a8bee Michael Hanselmann
730 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
731 989a8bee Michael Hanselmann
    """Callback for inotify.
732 989a8bee Michael Hanselmann

733 989a8bee Michael Hanselmann
    """
734 6c2549d6 Guido Trotter
    if not notifier_enabled:
735 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
736 989a8bee Michael Hanselmann
737 989a8bee Michael Hanselmann
  def Wait(self, timeout):
738 989a8bee Michael Hanselmann
    """Waits for the job file to change.
739 989a8bee Michael Hanselmann

740 989a8bee Michael Hanselmann
    @type timeout: float
741 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
742 989a8bee Michael Hanselmann
    @return: Whether there have been events
743 989a8bee Michael Hanselmann

744 989a8bee Michael Hanselmann
    """
745 989a8bee Michael Hanselmann
    assert timeout >= 0
746 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
747 989a8bee Michael Hanselmann
    if have_events:
748 989a8bee Michael Hanselmann
      self._notifier.read_events()
749 989a8bee Michael Hanselmann
    self._notifier.process_events()
750 989a8bee Michael Hanselmann
    return have_events
751 989a8bee Michael Hanselmann
752 989a8bee Michael Hanselmann
  def Close(self):
753 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
754 989a8bee Michael Hanselmann

755 989a8bee Michael Hanselmann
    """
756 989a8bee Michael Hanselmann
    self._notifier.stop()
757 989a8bee Michael Hanselmann
758 989a8bee Michael Hanselmann
759 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
760 383477e9 Michael Hanselmann
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
761 989a8bee Michael Hanselmann
    """Initializes this class.
762 989a8bee Michael Hanselmann

763 989a8bee Michael Hanselmann
    @type filename: string
764 989a8bee Michael Hanselmann
    @param filename: Path to job file
765 989a8bee Michael Hanselmann

766 989a8bee Michael Hanselmann
    """
767 989a8bee Michael Hanselmann
    self._filewaiter = None
768 989a8bee Michael Hanselmann
    self._filename = filename
769 383477e9 Michael Hanselmann
    self._waiter_cls = _waiter_cls
770 6c2549d6 Guido Trotter
771 989a8bee Michael Hanselmann
  def Wait(self, timeout):
772 989a8bee Michael Hanselmann
    """Waits for a job to change.
773 6c2549d6 Guido Trotter

774 989a8bee Michael Hanselmann
    @type timeout: float
775 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
776 989a8bee Michael Hanselmann
    @return: Whether there have been events
777 989a8bee Michael Hanselmann

778 989a8bee Michael Hanselmann
    """
779 989a8bee Michael Hanselmann
    if self._filewaiter:
780 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
781 989a8bee Michael Hanselmann
782 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
783 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
784 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
785 989a8bee Michael Hanselmann
    # race condition.
786 383477e9 Michael Hanselmann
    self._filewaiter = self._waiter_cls(self._filename)
787 989a8bee Michael Hanselmann
788 989a8bee Michael Hanselmann
    return True
789 989a8bee Michael Hanselmann
790 989a8bee Michael Hanselmann
  def Close(self):
791 989a8bee Michael Hanselmann
    """Closes underlying waiter.
792 989a8bee Michael Hanselmann

793 989a8bee Michael Hanselmann
    """
794 989a8bee Michael Hanselmann
    if self._filewaiter:
795 989a8bee Michael Hanselmann
      self._filewaiter.Close()
796 989a8bee Michael Hanselmann
797 989a8bee Michael Hanselmann
798 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
799 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
800 989a8bee Michael Hanselmann

801 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
802 989a8bee Michael Hanselmann
  the current job status has changed.
803 989a8bee Michael Hanselmann

804 989a8bee Michael Hanselmann
  """
805 989a8bee Michael Hanselmann
  @staticmethod
806 dfc8824a Michael Hanselmann
  def _CheckForChanges(counter, job_load_fn, check_fn):
807 dfc8824a Michael Hanselmann
    if counter.next() > 0:
808 dfc8824a Michael Hanselmann
      # If this isn't the first check the job is given some more time to change
809 dfc8824a Michael Hanselmann
      # again. This gives better performance for jobs generating many
810 dfc8824a Michael Hanselmann
      # changes/messages.
811 dfc8824a Michael Hanselmann
      time.sleep(0.1)
812 dfc8824a Michael Hanselmann
813 989a8bee Michael Hanselmann
    job = job_load_fn()
814 989a8bee Michael Hanselmann
    if not job:
815 989a8bee Michael Hanselmann
      raise errors.JobLost()
816 989a8bee Michael Hanselmann
817 989a8bee Michael Hanselmann
    result = check_fn(job)
818 989a8bee Michael Hanselmann
    if result is None:
819 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
820 989a8bee Michael Hanselmann
821 989a8bee Michael Hanselmann
    return result
822 989a8bee Michael Hanselmann
823 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
824 383477e9 Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout,
825 383477e9 Michael Hanselmann
               _waiter_cls=_JobChangesWaiter):
826 989a8bee Michael Hanselmann
    """Waits for changes on a job.
827 989a8bee Michael Hanselmann

828 989a8bee Michael Hanselmann
    @type filename: string
829 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
830 989a8bee Michael Hanselmann
    @type job_load_fn: callable
831 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
832 989a8bee Michael Hanselmann
    @type fields: list of strings
833 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
834 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
835 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
836 989a8bee Michael Hanselmann
    @type prev_log_serial: int
837 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
838 989a8bee Michael Hanselmann
    @type timeout: float
839 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
840 989a8bee Michael Hanselmann

841 989a8bee Michael Hanselmann
    """
842 dfc8824a Michael Hanselmann
    counter = itertools.count()
843 6c2549d6 Guido Trotter
    try:
844 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
845 383477e9 Michael Hanselmann
      waiter = _waiter_cls(filename)
846 989a8bee Michael Hanselmann
      try:
847 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
848 dfc8824a Michael Hanselmann
                                          counter, job_load_fn, check_fn),
849 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
850 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
851 989a8bee Michael Hanselmann
      finally:
852 989a8bee Michael Hanselmann
        waiter.Close()
853 383477e9 Michael Hanselmann
    except errors.JobLost:
854 6c2549d6 Guido Trotter
      return None
855 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
856 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
857 6c2549d6 Guido Trotter
858 6c2549d6 Guido Trotter
859 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
860 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
861 6760e4ed Michael Hanselmann

862 6760e4ed Michael Hanselmann
  """
863 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
864 6760e4ed Michael Hanselmann
    to_encode = err
865 6760e4ed Michael Hanselmann
  else:
866 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
867 6760e4ed Michael Hanselmann
868 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
869 6760e4ed Michael Hanselmann
870 6760e4ed Michael Hanselmann
871 26d3fd2f Michael Hanselmann
class _TimeoutStrategyWrapper:
872 26d3fd2f Michael Hanselmann
  def __init__(self, fn):
873 26d3fd2f Michael Hanselmann
    """Initializes this class.
874 26d3fd2f Michael Hanselmann

875 26d3fd2f Michael Hanselmann
    """
876 26d3fd2f Michael Hanselmann
    self._fn = fn
877 26d3fd2f Michael Hanselmann
    self._next = None
878 26d3fd2f Michael Hanselmann
879 26d3fd2f Michael Hanselmann
  def _Advance(self):
880 26d3fd2f Michael Hanselmann
    """Gets the next timeout if necessary.
881 26d3fd2f Michael Hanselmann

882 26d3fd2f Michael Hanselmann
    """
883 26d3fd2f Michael Hanselmann
    if self._next is None:
884 26d3fd2f Michael Hanselmann
      self._next = self._fn()
885 26d3fd2f Michael Hanselmann
886 26d3fd2f Michael Hanselmann
  def Peek(self):
887 26d3fd2f Michael Hanselmann
    """Returns the next timeout.
888 26d3fd2f Michael Hanselmann

889 26d3fd2f Michael Hanselmann
    """
890 26d3fd2f Michael Hanselmann
    self._Advance()
891 26d3fd2f Michael Hanselmann
    return self._next
892 26d3fd2f Michael Hanselmann
893 26d3fd2f Michael Hanselmann
  def Next(self):
894 26d3fd2f Michael Hanselmann
    """Returns the current timeout and advances the internal state.
895 26d3fd2f Michael Hanselmann

896 26d3fd2f Michael Hanselmann
    """
897 26d3fd2f Michael Hanselmann
    self._Advance()
898 26d3fd2f Michael Hanselmann
    result = self._next
899 26d3fd2f Michael Hanselmann
    self._next = None
900 26d3fd2f Michael Hanselmann
    return result
901 26d3fd2f Michael Hanselmann
902 26d3fd2f Michael Hanselmann
903 b80cc518 Michael Hanselmann
class _OpExecContext:
904 26d3fd2f Michael Hanselmann
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
905 b80cc518 Michael Hanselmann
    """Initializes this class.
906 b80cc518 Michael Hanselmann

907 b80cc518 Michael Hanselmann
    """
908 b80cc518 Michael Hanselmann
    self.op = op
909 b80cc518 Michael Hanselmann
    self.index = index
910 b80cc518 Michael Hanselmann
    self.log_prefix = log_prefix
911 b80cc518 Michael Hanselmann
    self.summary = op.input.Summary()
912 b80cc518 Michael Hanselmann
913 b95479a5 Michael Hanselmann
    # Create local copy to modify
914 580b1fdd Jose A. Lopes
    if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
915 b95479a5 Michael Hanselmann
      self.jobdeps = op.input.depends[:]
916 b95479a5 Michael Hanselmann
    else:
917 b95479a5 Michael Hanselmann
      self.jobdeps = None
918 b95479a5 Michael Hanselmann
919 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = timeout_strategy_factory
920 26d3fd2f Michael Hanselmann
    self._ResetTimeoutStrategy()
921 26d3fd2f Michael Hanselmann
922 26d3fd2f Michael Hanselmann
  def _ResetTimeoutStrategy(self):
923 26d3fd2f Michael Hanselmann
    """Creates a new timeout strategy.
924 26d3fd2f Michael Hanselmann

925 26d3fd2f Michael Hanselmann
    """
926 26d3fd2f Michael Hanselmann
    self._timeout_strategy = \
927 26d3fd2f Michael Hanselmann
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
928 26d3fd2f Michael Hanselmann
929 26d3fd2f Michael Hanselmann
  def CheckPriorityIncrease(self):
930 26d3fd2f Michael Hanselmann
    """Checks whether priority can and should be increased.
931 26d3fd2f Michael Hanselmann

932 26d3fd2f Michael Hanselmann
    Called when locks couldn't be acquired.
933 26d3fd2f Michael Hanselmann

934 26d3fd2f Michael Hanselmann
    """
935 26d3fd2f Michael Hanselmann
    op = self.op
936 26d3fd2f Michael Hanselmann
937 26d3fd2f Michael Hanselmann
    # Exhausted all retries and next round should not use blocking acquire
938 26d3fd2f Michael Hanselmann
    # for locks?
939 26d3fd2f Michael Hanselmann
    if (self._timeout_strategy.Peek() is None and
940 26d3fd2f Michael Hanselmann
        op.priority > constants.OP_PRIO_HIGHEST):
941 26d3fd2f Michael Hanselmann
      logging.debug("Increasing priority")
942 26d3fd2f Michael Hanselmann
      op.priority -= 1
943 26d3fd2f Michael Hanselmann
      self._ResetTimeoutStrategy()
944 26d3fd2f Michael Hanselmann
      return True
945 26d3fd2f Michael Hanselmann
946 26d3fd2f Michael Hanselmann
    return False
947 26d3fd2f Michael Hanselmann
948 26d3fd2f Michael Hanselmann
  def GetNextLockTimeout(self):
949 26d3fd2f Michael Hanselmann
    """Returns the next lock acquire timeout.
950 26d3fd2f Michael Hanselmann

951 26d3fd2f Michael Hanselmann
    """
952 26d3fd2f Michael Hanselmann
    return self._timeout_strategy.Next()
953 26d3fd2f Michael Hanselmann
954 b80cc518 Michael Hanselmann
955 be760ba8 Michael Hanselmann
class _JobProcessor(object):
956 75d81fc8 Michael Hanselmann
  (DEFER,
957 75d81fc8 Michael Hanselmann
   WAITDEP,
958 75d81fc8 Michael Hanselmann
   FINISHED) = range(1, 4)
959 75d81fc8 Michael Hanselmann
960 26d3fd2f Michael Hanselmann
  def __init__(self, queue, opexec_fn, job,
961 26d3fd2f Michael Hanselmann
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
962 be760ba8 Michael Hanselmann
    """Initializes this class.
963 be760ba8 Michael Hanselmann

964 be760ba8 Michael Hanselmann
    """
965 be760ba8 Michael Hanselmann
    self.queue = queue
966 be760ba8 Michael Hanselmann
    self.opexec_fn = opexec_fn
967 be760ba8 Michael Hanselmann
    self.job = job
968 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = _timeout_strategy_factory
969 be760ba8 Michael Hanselmann
970 be760ba8 Michael Hanselmann
  @staticmethod
971 26d3fd2f Michael Hanselmann
  def _FindNextOpcode(job, timeout_strategy_factory):
972 be760ba8 Michael Hanselmann
    """Locates the next opcode to run.
973 be760ba8 Michael Hanselmann

974 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
975 be760ba8 Michael Hanselmann
    @param job: Job object
976 26d3fd2f Michael Hanselmann
    @param timeout_strategy_factory: Callable to create new timeout strategy
977 be760ba8 Michael Hanselmann

978 be760ba8 Michael Hanselmann
    """
979 be760ba8 Michael Hanselmann
    # Create some sort of a cache to speed up locating next opcode for future
980 be760ba8 Michael Hanselmann
    # lookups
981 be760ba8 Michael Hanselmann
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
982 be760ba8 Michael Hanselmann
    # pending and one for processed ops.
983 03b63608 Michael Hanselmann
    if job.ops_iter is None:
984 03b63608 Michael Hanselmann
      job.ops_iter = enumerate(job.ops)
985 be760ba8 Michael Hanselmann
986 be760ba8 Michael Hanselmann
    # Find next opcode to run
987 be760ba8 Michael Hanselmann
    while True:
988 be760ba8 Michael Hanselmann
      try:
989 03b63608 Michael Hanselmann
        (idx, op) = job.ops_iter.next()
990 be760ba8 Michael Hanselmann
      except StopIteration:
991 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for a finished job")
992 be760ba8 Michael Hanselmann
993 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_RUNNING:
994 be760ba8 Michael Hanselmann
        # Found an opcode already marked as running
995 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for job marked as running")
996 be760ba8 Michael Hanselmann
997 26d3fd2f Michael Hanselmann
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
998 26d3fd2f Michael Hanselmann
                             timeout_strategy_factory)
999 be760ba8 Michael Hanselmann
1000 66bd7445 Michael Hanselmann
      if op.status not in constants.OPS_FINALIZED:
1001 66bd7445 Michael Hanselmann
        return opctx
1002 be760ba8 Michael Hanselmann
1003 66bd7445 Michael Hanselmann
      # This is a job that was partially completed before master daemon
1004 66bd7445 Michael Hanselmann
      # shutdown, so it can be expected that some opcodes are already
1005 66bd7445 Michael Hanselmann
      # completed successfully (if any did error out, then the whole job
1006 66bd7445 Michael Hanselmann
      # should have been aborted and not resubmitted for processing).
1007 66bd7445 Michael Hanselmann
      logging.info("%s: opcode %s already processed, skipping",
1008 66bd7445 Michael Hanselmann
                   opctx.log_prefix, opctx.summary)
1009 be760ba8 Michael Hanselmann
1010 be760ba8 Michael Hanselmann
  @staticmethod
1011 be760ba8 Michael Hanselmann
  def _MarkWaitlock(job, op):
1012 be760ba8 Michael Hanselmann
    """Marks an opcode as waiting for locks.
1013 be760ba8 Michael Hanselmann

1014 be760ba8 Michael Hanselmann
    The job's start timestamp is also set if necessary.
1015 be760ba8 Michael Hanselmann

1016 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
1017 be760ba8 Michael Hanselmann
    @param job: Job object
1018 a38e8674 Michael Hanselmann
    @type op: L{_QueuedOpCode}
1019 a38e8674 Michael Hanselmann
    @param op: Opcode object
1020 be760ba8 Michael Hanselmann

1021 be760ba8 Michael Hanselmann
    """
1022 be760ba8 Michael Hanselmann
    assert op in job.ops
1023 5fd6b694 Michael Hanselmann
    assert op.status in (constants.OP_STATUS_QUEUED,
1024 47099cd1 Michael Hanselmann
                         constants.OP_STATUS_WAITING)
1025 5fd6b694 Michael Hanselmann
1026 5fd6b694 Michael Hanselmann
    update = False
1027 be760ba8 Michael Hanselmann
1028 be760ba8 Michael Hanselmann
    op.result = None
1029 5fd6b694 Michael Hanselmann
1030 5fd6b694 Michael Hanselmann
    if op.status == constants.OP_STATUS_QUEUED:
1031 47099cd1 Michael Hanselmann
      op.status = constants.OP_STATUS_WAITING
1032 5fd6b694 Michael Hanselmann
      update = True
1033 5fd6b694 Michael Hanselmann
1034 5fd6b694 Michael Hanselmann
    if op.start_timestamp is None:
1035 5fd6b694 Michael Hanselmann
      op.start_timestamp = TimeStampNow()
1036 5fd6b694 Michael Hanselmann
      update = True
1037 be760ba8 Michael Hanselmann
1038 be760ba8 Michael Hanselmann
    if job.start_timestamp is None:
1039 be760ba8 Michael Hanselmann
      job.start_timestamp = op.start_timestamp
1040 5fd6b694 Michael Hanselmann
      update = True
1041 5fd6b694 Michael Hanselmann
1042 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1043 5fd6b694 Michael Hanselmann
1044 5fd6b694 Michael Hanselmann
    return update
1045 be760ba8 Michael Hanselmann
1046 b95479a5 Michael Hanselmann
  @staticmethod
1047 b95479a5 Michael Hanselmann
  def _CheckDependencies(queue, job, opctx):
1048 b95479a5 Michael Hanselmann
    """Checks if an opcode has dependencies and if so, processes them.
1049 b95479a5 Michael Hanselmann

1050 b95479a5 Michael Hanselmann
    @type queue: L{JobQueue}
1051 b95479a5 Michael Hanselmann
    @param queue: Queue object
1052 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1053 b95479a5 Michael Hanselmann
    @param job: Job object
1054 b95479a5 Michael Hanselmann
    @type opctx: L{_OpExecContext}
1055 b95479a5 Michael Hanselmann
    @param opctx: Opcode execution context
1056 b95479a5 Michael Hanselmann
    @rtype: bool
1057 b95479a5 Michael Hanselmann
    @return: Whether opcode will be re-scheduled by dependency tracker
1058 b95479a5 Michael Hanselmann

1059 b95479a5 Michael Hanselmann
    """
1060 b95479a5 Michael Hanselmann
    op = opctx.op
1061 b95479a5 Michael Hanselmann
1062 b95479a5 Michael Hanselmann
    result = False
1063 b95479a5 Michael Hanselmann
1064 b95479a5 Michael Hanselmann
    while opctx.jobdeps:
1065 b95479a5 Michael Hanselmann
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1066 b95479a5 Michael Hanselmann
1067 b95479a5 Michael Hanselmann
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1068 b95479a5 Michael Hanselmann
                                                          dep_status)
1069 b95479a5 Michael Hanselmann
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1070 b95479a5 Michael Hanselmann
1071 b95479a5 Michael Hanselmann
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1072 b95479a5 Michael Hanselmann
1073 b95479a5 Michael Hanselmann
      if depresult == _JobDependencyManager.CONTINUE:
1074 b95479a5 Michael Hanselmann
        # Remove dependency and continue
1075 b95479a5 Michael Hanselmann
        opctx.jobdeps.pop(0)
1076 b95479a5 Michael Hanselmann
1077 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.WAIT:
1078 b95479a5 Michael Hanselmann
        # Need to wait for notification, dependency tracker will re-add job
1079 b95479a5 Michael Hanselmann
        # to workerpool
1080 b95479a5 Michael Hanselmann
        result = True
1081 b95479a5 Michael Hanselmann
        break
1082 b95479a5 Michael Hanselmann
1083 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.CANCEL:
1084 b95479a5 Michael Hanselmann
        # Job was cancelled, cancel this job as well
1085 b95479a5 Michael Hanselmann
        job.Cancel()
1086 b95479a5 Michael Hanselmann
        assert op.status == constants.OP_STATUS_CANCELING
1087 b95479a5 Michael Hanselmann
        break
1088 b95479a5 Michael Hanselmann
1089 b95479a5 Michael Hanselmann
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1090 b95479a5 Michael Hanselmann
                         _JobDependencyManager.ERROR):
1091 b95479a5 Michael Hanselmann
        # Job failed or there was an error, this job must fail
1092 b95479a5 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1093 b95479a5 Michael Hanselmann
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1094 b95479a5 Michael Hanselmann
        break
1095 b95479a5 Michael Hanselmann
1096 b95479a5 Michael Hanselmann
      else:
1097 b95479a5 Michael Hanselmann
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1098 b95479a5 Michael Hanselmann
                                     depresult)
1099 b95479a5 Michael Hanselmann
1100 b95479a5 Michael Hanselmann
    return result
1101 b95479a5 Michael Hanselmann
1102 b80cc518 Michael Hanselmann
  def _ExecOpCodeUnlocked(self, opctx):
1103 be760ba8 Michael Hanselmann
    """Processes one opcode and returns the result.
1104 be760ba8 Michael Hanselmann

1105 be760ba8 Michael Hanselmann
    """
1106 b80cc518 Michael Hanselmann
    op = opctx.op
1107 b80cc518 Michael Hanselmann
1108 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1109 be760ba8 Michael Hanselmann
1110 26d3fd2f Michael Hanselmann
    timeout = opctx.GetNextLockTimeout()
1111 26d3fd2f Michael Hanselmann
1112 be760ba8 Michael Hanselmann
    try:
1113 be760ba8 Michael Hanselmann
      # Make sure not to hold queue lock while calling ExecOpCode
1114 be760ba8 Michael Hanselmann
      result = self.opexec_fn(op.input,
1115 26d3fd2f Michael Hanselmann
                              _OpExecCallbacks(self.queue, self.job, op),
1116 e4e59de8 Michael Hanselmann
                              timeout=timeout)
1117 26d3fd2f Michael Hanselmann
    except mcpu.LockAcquireTimeout:
1118 26d3fd2f Michael Hanselmann
      assert timeout is not None, "Received timeout for blocking acquire"
1119 26d3fd2f Michael Hanselmann
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1120 9e49dfc5 Michael Hanselmann
1121 47099cd1 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_WAITING,
1122 9e49dfc5 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1123 9e49dfc5 Michael Hanselmann
1124 9e49dfc5 Michael Hanselmann
      # Was job cancelled while we were waiting for the lock?
1125 9e49dfc5 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELING:
1126 9e49dfc5 Michael Hanselmann
        return (constants.OP_STATUS_CANCELING, None)
1127 9e49dfc5 Michael Hanselmann
1128 942e2262 Michael Hanselmann
      # Queue is shutting down, return to queued
1129 942e2262 Michael Hanselmann
      if not self.queue.AcceptingJobsUnlocked():
1130 942e2262 Michael Hanselmann
        return (constants.OP_STATUS_QUEUED, None)
1131 942e2262 Michael Hanselmann
1132 5fd6b694 Michael Hanselmann
      # Stay in waitlock while trying to re-acquire lock
1133 47099cd1 Michael Hanselmann
      return (constants.OP_STATUS_WAITING, None)
1134 be760ba8 Michael Hanselmann
    except CancelJob:
1135 b80cc518 Michael Hanselmann
      logging.exception("%s: Canceling job", opctx.log_prefix)
1136 be760ba8 Michael Hanselmann
      assert op.status == constants.OP_STATUS_CANCELING
1137 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_CANCELING, None)
1138 942e2262 Michael Hanselmann
1139 942e2262 Michael Hanselmann
    except QueueShutdown:
1140 942e2262 Michael Hanselmann
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1141 942e2262 Michael Hanselmann
1142 942e2262 Michael Hanselmann
      assert op.status == constants.OP_STATUS_WAITING
1143 942e2262 Michael Hanselmann
1144 942e2262 Michael Hanselmann
      # Job hadn't been started yet, so it should return to the queue
1145 942e2262 Michael Hanselmann
      return (constants.OP_STATUS_QUEUED, None)
1146 942e2262 Michael Hanselmann
1147 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1148 b80cc518 Michael Hanselmann
      logging.exception("%s: Caught exception in %s",
1149 b80cc518 Michael Hanselmann
                        opctx.log_prefix, opctx.summary)
1150 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1151 be760ba8 Michael Hanselmann
    else:
1152 b80cc518 Michael Hanselmann
      logging.debug("%s: %s successful",
1153 b80cc518 Michael Hanselmann
                    opctx.log_prefix, opctx.summary)
1154 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_SUCCESS, result)
1155 be760ba8 Michael Hanselmann
1156 26d3fd2f Michael Hanselmann
  def __call__(self, _nextop_fn=None):
1157 be760ba8 Michael Hanselmann
    """Continues execution of a job.
1158 be760ba8 Michael Hanselmann

1159 26d3fd2f Michael Hanselmann
    @param _nextop_fn: Callback function for tests
1160 75d81fc8 Michael Hanselmann
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1161 75d81fc8 Michael Hanselmann
      be deferred and C{WAITDEP} if the dependency manager
1162 75d81fc8 Michael Hanselmann
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1163 be760ba8 Michael Hanselmann

1164 be760ba8 Michael Hanselmann
    """
1165 be760ba8 Michael Hanselmann
    queue = self.queue
1166 be760ba8 Michael Hanselmann
    job = self.job
1167 be760ba8 Michael Hanselmann
1168 be760ba8 Michael Hanselmann
    logging.debug("Processing job %s", job.id)
1169 be760ba8 Michael Hanselmann
1170 be760ba8 Michael Hanselmann
    queue.acquire(shared=1)
1171 be760ba8 Michael Hanselmann
    try:
1172 be760ba8 Michael Hanselmann
      opcount = len(job.ops)
1173 be760ba8 Michael Hanselmann
1174 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Expected writable job"
1175 c0f6d0d8 Michael Hanselmann
1176 66bd7445 Michael Hanselmann
      # Don't do anything for finalized jobs
1177 66bd7445 Michael Hanselmann
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1178 75d81fc8 Michael Hanselmann
        return self.FINISHED
1179 66bd7445 Michael Hanselmann
1180 26d3fd2f Michael Hanselmann
      # Is a previous opcode still pending?
1181 26d3fd2f Michael Hanselmann
      if job.cur_opctx:
1182 26d3fd2f Michael Hanselmann
        opctx = job.cur_opctx
1183 5fd6b694 Michael Hanselmann
        job.cur_opctx = None
1184 26d3fd2f Michael Hanselmann
      else:
1185 26d3fd2f Michael Hanselmann
        if __debug__ and _nextop_fn:
1186 26d3fd2f Michael Hanselmann
          _nextop_fn()
1187 26d3fd2f Michael Hanselmann
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1188 26d3fd2f Michael Hanselmann
1189 b80cc518 Michael Hanselmann
      op = opctx.op
1190 be760ba8 Michael Hanselmann
1191 be760ba8 Michael Hanselmann
      # Consistency check
1192 be760ba8 Michael Hanselmann
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1193 66bd7445 Michael Hanselmann
                                     constants.OP_STATUS_CANCELING)
1194 5fd6b694 Michael Hanselmann
                        for i in job.ops[opctx.index + 1:])
1195 be760ba8 Michael Hanselmann
1196 be760ba8 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_QUEUED,
1197 47099cd1 Michael Hanselmann
                           constants.OP_STATUS_WAITING,
1198 66bd7445 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1199 be760ba8 Michael Hanselmann
1200 26d3fd2f Michael Hanselmann
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1201 26d3fd2f Michael Hanselmann
              op.priority >= constants.OP_PRIO_HIGHEST)
1202 26d3fd2f Michael Hanselmann
1203 b95479a5 Michael Hanselmann
      waitjob = None
1204 b95479a5 Michael Hanselmann
1205 66bd7445 Michael Hanselmann
      if op.status != constants.OP_STATUS_CANCELING:
1206 30c945d0 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
1207 47099cd1 Michael Hanselmann
                             constants.OP_STATUS_WAITING)
1208 30c945d0 Michael Hanselmann
1209 be760ba8 Michael Hanselmann
        # Prepare to start opcode
1210 5fd6b694 Michael Hanselmann
        if self._MarkWaitlock(job, op):
1211 5fd6b694 Michael Hanselmann
          # Write to disk
1212 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1213 be760ba8 Michael Hanselmann
1214 47099cd1 Michael Hanselmann
        assert op.status == constants.OP_STATUS_WAITING
1215 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1216 5fd6b694 Michael Hanselmann
        assert job.start_timestamp and op.start_timestamp
1217 b95479a5 Michael Hanselmann
        assert waitjob is None
1218 b95479a5 Michael Hanselmann
1219 b95479a5 Michael Hanselmann
        # Check if waiting for a job is necessary
1220 b95479a5 Michael Hanselmann
        waitjob = self._CheckDependencies(queue, job, opctx)
1221 be760ba8 Michael Hanselmann
1222 47099cd1 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_WAITING,
1223 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_CANCELING,
1224 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_ERROR)
1225 be760ba8 Michael Hanselmann
1226 b95479a5 Michael Hanselmann
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1227 b95479a5 Michael Hanselmann
                                         constants.OP_STATUS_ERROR)):
1228 b95479a5 Michael Hanselmann
          logging.info("%s: opcode %s waiting for locks",
1229 b95479a5 Michael Hanselmann
                       opctx.log_prefix, opctx.summary)
1230 be760ba8 Michael Hanselmann
1231 b95479a5 Michael Hanselmann
          assert not opctx.jobdeps, "Not all dependencies were removed"
1232 b95479a5 Michael Hanselmann
1233 b95479a5 Michael Hanselmann
          queue.release()
1234 b95479a5 Michael Hanselmann
          try:
1235 b95479a5 Michael Hanselmann
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1236 b95479a5 Michael Hanselmann
          finally:
1237 b95479a5 Michael Hanselmann
            queue.acquire(shared=1)
1238 b95479a5 Michael Hanselmann
1239 b95479a5 Michael Hanselmann
          op.status = op_status
1240 b95479a5 Michael Hanselmann
          op.result = op_result
1241 b95479a5 Michael Hanselmann
1242 b95479a5 Michael Hanselmann
          assert not waitjob
1243 be760ba8 Michael Hanselmann
1244 942e2262 Michael Hanselmann
        if op.status in (constants.OP_STATUS_WAITING,
1245 942e2262 Michael Hanselmann
                         constants.OP_STATUS_QUEUED):
1246 942e2262 Michael Hanselmann
          # waiting: Couldn't get locks in time
1247 942e2262 Michael Hanselmann
          # queued: Queue is shutting down
1248 26d3fd2f Michael Hanselmann
          assert not op.end_timestamp
1249 be760ba8 Michael Hanselmann
        else:
1250 26d3fd2f Michael Hanselmann
          # Finalize opcode
1251 26d3fd2f Michael Hanselmann
          op.end_timestamp = TimeStampNow()
1252 be760ba8 Michael Hanselmann
1253 26d3fd2f Michael Hanselmann
          if op.status == constants.OP_STATUS_CANCELING:
1254 26d3fd2f Michael Hanselmann
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1255 26d3fd2f Michael Hanselmann
                                  for i in job.ops[opctx.index:])
1256 26d3fd2f Michael Hanselmann
          else:
1257 26d3fd2f Michael Hanselmann
            assert op.status in constants.OPS_FINALIZED
1258 be760ba8 Michael Hanselmann
1259 942e2262 Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
1260 942e2262 Michael Hanselmann
        # Queue is shutting down
1261 942e2262 Michael Hanselmann
        assert not waitjob
1262 942e2262 Michael Hanselmann
1263 942e2262 Michael Hanselmann
        finalize = False
1264 942e2262 Michael Hanselmann
1265 942e2262 Michael Hanselmann
        # Reset context
1266 942e2262 Michael Hanselmann
        job.cur_opctx = None
1267 942e2262 Michael Hanselmann
1268 942e2262 Michael Hanselmann
        # In no case must the status be finalized here
1269 942e2262 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1270 942e2262 Michael Hanselmann
1271 942e2262 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1272 be760ba8 Michael Hanselmann
        finalize = False
1273 be760ba8 Michael Hanselmann
1274 b95479a5 Michael Hanselmann
        if not waitjob and opctx.CheckPriorityIncrease():
1275 5fd6b694 Michael Hanselmann
          # Priority was changed, need to update on-disk file
1276 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1277 be760ba8 Michael Hanselmann
1278 26d3fd2f Michael Hanselmann
        # Keep around for another round
1279 26d3fd2f Michael Hanselmann
        job.cur_opctx = opctx
1280 be760ba8 Michael Hanselmann
1281 26d3fd2f Michael Hanselmann
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1282 26d3fd2f Michael Hanselmann
                op.priority >= constants.OP_PRIO_HIGHEST)
1283 be760ba8 Michael Hanselmann
1284 26d3fd2f Michael Hanselmann
        # In no case must the status be finalized here
1285 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1286 be760ba8 Michael Hanselmann
1287 be760ba8 Michael Hanselmann
      else:
1288 26d3fd2f Michael Hanselmann
        # Ensure all opcodes so far have been successful
1289 26d3fd2f Michael Hanselmann
        assert (opctx.index == 0 or
1290 26d3fd2f Michael Hanselmann
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1291 26d3fd2f Michael Hanselmann
                           for i in job.ops[:opctx.index]))
1292 26d3fd2f Michael Hanselmann
1293 26d3fd2f Michael Hanselmann
        # Reset context
1294 26d3fd2f Michael Hanselmann
        job.cur_opctx = None
1295 26d3fd2f Michael Hanselmann
1296 26d3fd2f Michael Hanselmann
        if op.status == constants.OP_STATUS_SUCCESS:
1297 26d3fd2f Michael Hanselmann
          finalize = False
1298 26d3fd2f Michael Hanselmann
1299 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_ERROR:
1300 26d3fd2f Michael Hanselmann
          # Ensure failed opcode has an exception as its result
1301 26d3fd2f Michael Hanselmann
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1302 26d3fd2f Michael Hanselmann
1303 26d3fd2f Michael Hanselmann
          to_encode = errors.OpExecError("Preceding opcode failed")
1304 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1305 26d3fd2f Michael Hanselmann
                                _EncodeOpError(to_encode))
1306 26d3fd2f Michael Hanselmann
          finalize = True
1307 be760ba8 Michael Hanselmann
1308 26d3fd2f Michael Hanselmann
          # Consistency check
1309 26d3fd2f Michael Hanselmann
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1310 26d3fd2f Michael Hanselmann
                            errors.GetEncodedError(i.result)
1311 26d3fd2f Michael Hanselmann
                            for i in job.ops[opctx.index:])
1312 be760ba8 Michael Hanselmann
1313 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELING:
1314 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1315 26d3fd2f Michael Hanselmann
                                "Job canceled by request")
1316 26d3fd2f Michael Hanselmann
          finalize = True
1317 26d3fd2f Michael Hanselmann
1318 26d3fd2f Michael Hanselmann
        else:
1319 26d3fd2f Michael Hanselmann
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1320 26d3fd2f Michael Hanselmann
1321 66bd7445 Michael Hanselmann
        if opctx.index == (opcount - 1):
1322 66bd7445 Michael Hanselmann
          # Finalize on last opcode
1323 66bd7445 Michael Hanselmann
          finalize = True
1324 66bd7445 Michael Hanselmann
1325 66bd7445 Michael Hanselmann
        if finalize:
1326 26d3fd2f Michael Hanselmann
          # All opcodes have been run, finalize job
1327 66bd7445 Michael Hanselmann
          job.Finalize()
1328 26d3fd2f Michael Hanselmann
1329 26d3fd2f Michael Hanselmann
        # Write to disk. If the job status is final, this is the final write
1330 26d3fd2f Michael Hanselmann
        # allowed. Once the file has been written, it can be archived anytime.
1331 26d3fd2f Michael Hanselmann
        queue.UpdateJobUnlocked(job)
1332 be760ba8 Michael Hanselmann
1333 b95479a5 Michael Hanselmann
        assert not waitjob
1334 b95479a5 Michael Hanselmann
1335 66bd7445 Michael Hanselmann
        if finalize:
1336 26d3fd2f Michael Hanselmann
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1337 75d81fc8 Michael Hanselmann
          return self.FINISHED
1338 be760ba8 Michael Hanselmann
1339 b95479a5 Michael Hanselmann
      assert not waitjob or queue.depmgr.JobWaiting(job)
1340 b95479a5 Michael Hanselmann
1341 75d81fc8 Michael Hanselmann
      if waitjob:
1342 75d81fc8 Michael Hanselmann
        return self.WAITDEP
1343 75d81fc8 Michael Hanselmann
      else:
1344 75d81fc8 Michael Hanselmann
        return self.DEFER
1345 be760ba8 Michael Hanselmann
    finally:
1346 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Job became read-only while being processed"
1347 be760ba8 Michael Hanselmann
      queue.release()
1348 be760ba8 Michael Hanselmann
1349 be760ba8 Michael Hanselmann
1350 df5a5730 Michael Hanselmann
def _EvaluateJobProcessorResult(depmgr, job, result):
1351 df5a5730 Michael Hanselmann
  """Looks at a result from L{_JobProcessor} for a job.
1352 df5a5730 Michael Hanselmann

1353 df5a5730 Michael Hanselmann
  To be used in a L{_JobQueueWorker}.
1354 df5a5730 Michael Hanselmann

1355 df5a5730 Michael Hanselmann
  """
1356 df5a5730 Michael Hanselmann
  if result == _JobProcessor.FINISHED:
1357 df5a5730 Michael Hanselmann
    # Notify waiting jobs
1358 df5a5730 Michael Hanselmann
    depmgr.NotifyWaiters(job.id)
1359 df5a5730 Michael Hanselmann
1360 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.DEFER:
1361 df5a5730 Michael Hanselmann
    # Schedule again
1362 df5a5730 Michael Hanselmann
    raise workerpool.DeferTask(priority=job.CalcPriority())
1363 df5a5730 Michael Hanselmann
1364 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.WAITDEP:
1365 df5a5730 Michael Hanselmann
    # No-op, dependency manager will re-schedule
1366 df5a5730 Michael Hanselmann
    pass
1367 df5a5730 Michael Hanselmann
1368 df5a5730 Michael Hanselmann
  else:
1369 df5a5730 Michael Hanselmann
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1370 df5a5730 Michael Hanselmann
                                 (result, ))
1371 df5a5730 Michael Hanselmann
1372 df5a5730 Michael Hanselmann
1373 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
1374 031a3e57 Michael Hanselmann
  """The actual job workers.
1375 031a3e57 Michael Hanselmann

1376 031a3e57 Michael Hanselmann
  """
1377 b459a848 Andrea Spadaccini
  def RunTask(self, job): # pylint: disable=W0221
1378 e2715f69 Michael Hanselmann
    """Job executor.
1379 e2715f69 Michael Hanselmann

1380 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1381 ea03467c Iustin Pop
    @param job: the job to be processed
1382 ea03467c Iustin Pop

1383 e2715f69 Michael Hanselmann
    """
1384 f8a4adfa Michael Hanselmann
    assert job.writable, "Expected writable job"
1385 f8a4adfa Michael Hanselmann
1386 b95479a5 Michael Hanselmann
    # Ensure only one worker is active on a single job. If a job registers for
1387 b95479a5 Michael Hanselmann
    # a dependency job, and the other job notifies before the first worker is
1388 b95479a5 Michael Hanselmann
    # done, the job can end up in the tasklist more than once.
1389 b95479a5 Michael Hanselmann
    job.processor_lock.acquire()
1390 b95479a5 Michael Hanselmann
    try:
1391 b95479a5 Michael Hanselmann
      return self._RunTaskInner(job)
1392 b95479a5 Michael Hanselmann
    finally:
1393 b95479a5 Michael Hanselmann
      job.processor_lock.release()
1394 b95479a5 Michael Hanselmann
1395 b95479a5 Michael Hanselmann
  def _RunTaskInner(self, job):
1396 b95479a5 Michael Hanselmann
    """Executes a job.
1397 b95479a5 Michael Hanselmann

1398 b95479a5 Michael Hanselmann
    Must be called with per-job lock acquired.
1399 b95479a5 Michael Hanselmann

1400 b95479a5 Michael Hanselmann
    """
1401 be760ba8 Michael Hanselmann
    queue = job.queue
1402 be760ba8 Michael Hanselmann
    assert queue == self.pool.queue
1403 be760ba8 Michael Hanselmann
1404 0aeeb6e3 Michael Hanselmann
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1405 0aeeb6e3 Michael Hanselmann
    setname_fn(None)
1406 daba67c7 Michael Hanselmann
1407 be760ba8 Michael Hanselmann
    proc = mcpu.Processor(queue.context, job.id)
1408 be760ba8 Michael Hanselmann
1409 0aeeb6e3 Michael Hanselmann
    # Create wrapper for setting thread name
1410 0aeeb6e3 Michael Hanselmann
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1411 0aeeb6e3 Michael Hanselmann
                                    proc.ExecOpCode)
1412 0aeeb6e3 Michael Hanselmann
1413 df5a5730 Michael Hanselmann
    _EvaluateJobProcessorResult(queue.depmgr, job,
1414 df5a5730 Michael Hanselmann
                                _JobProcessor(queue, wrap_execop_fn, job)())
1415 75d81fc8 Michael Hanselmann
1416 0aeeb6e3 Michael Hanselmann
  @staticmethod
1417 0aeeb6e3 Michael Hanselmann
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1418 0aeeb6e3 Michael Hanselmann
    """Updates the worker thread name to include a short summary of the opcode.
1419 0aeeb6e3 Michael Hanselmann

1420 0aeeb6e3 Michael Hanselmann
    @param setname_fn: Callable setting worker thread name
1421 0aeeb6e3 Michael Hanselmann
    @param execop_fn: Callable for executing opcode (usually
1422 0aeeb6e3 Michael Hanselmann
                      L{mcpu.Processor.ExecOpCode})
1423 0aeeb6e3 Michael Hanselmann

1424 0aeeb6e3 Michael Hanselmann
    """
1425 0aeeb6e3 Michael Hanselmann
    setname_fn(op)
1426 0aeeb6e3 Michael Hanselmann
    try:
1427 0aeeb6e3 Michael Hanselmann
      return execop_fn(op, *args, **kwargs)
1428 0aeeb6e3 Michael Hanselmann
    finally:
1429 0aeeb6e3 Michael Hanselmann
      setname_fn(None)
1430 0aeeb6e3 Michael Hanselmann
1431 0aeeb6e3 Michael Hanselmann
  @staticmethod
1432 0aeeb6e3 Michael Hanselmann
  def _GetWorkerName(job, op):
1433 0aeeb6e3 Michael Hanselmann
    """Sets the worker thread name.
1434 0aeeb6e3 Michael Hanselmann

1435 0aeeb6e3 Michael Hanselmann
    @type job: L{_QueuedJob}
1436 0aeeb6e3 Michael Hanselmann
    @type op: L{opcodes.OpCode}
1437 0aeeb6e3 Michael Hanselmann

1438 0aeeb6e3 Michael Hanselmann
    """
1439 0aeeb6e3 Michael Hanselmann
    parts = ["Job%s" % job.id]
1440 0aeeb6e3 Michael Hanselmann
1441 0aeeb6e3 Michael Hanselmann
    if op:
1442 0aeeb6e3 Michael Hanselmann
      parts.append(op.TinySummary())
1443 0aeeb6e3 Michael Hanselmann
1444 0aeeb6e3 Michael Hanselmann
    return "/".join(parts)
1445 0aeeb6e3 Michael Hanselmann
1446 e2715f69 Michael Hanselmann
1447 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
1448 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
1449 ea03467c Iustin Pop

1450 ea03467c Iustin Pop
  """
1451 5bdce580 Michael Hanselmann
  def __init__(self, queue):
1452 0aeeb6e3 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("Jq",
1453 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
1454 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
1455 5bdce580 Michael Hanselmann
    self.queue = queue
1456 e2715f69 Michael Hanselmann
1457 e2715f69 Michael Hanselmann
1458 b95479a5 Michael Hanselmann
class _JobDependencyManager:
1459 b95479a5 Michael Hanselmann
  """Keeps track of job dependencies.
1460 b95479a5 Michael Hanselmann

1461 b95479a5 Michael Hanselmann
  """
1462 b95479a5 Michael Hanselmann
  (WAIT,
1463 b95479a5 Michael Hanselmann
   ERROR,
1464 b95479a5 Michael Hanselmann
   CANCEL,
1465 b95479a5 Michael Hanselmann
   CONTINUE,
1466 b95479a5 Michael Hanselmann
   WRONGSTATUS) = range(1, 6)
1467 b95479a5 Michael Hanselmann
1468 b95479a5 Michael Hanselmann
  def __init__(self, getstatus_fn, enqueue_fn):
1469 b95479a5 Michael Hanselmann
    """Initializes this class.
1470 b95479a5 Michael Hanselmann

1471 b95479a5 Michael Hanselmann
    """
1472 b95479a5 Michael Hanselmann
    self._getstatus_fn = getstatus_fn
1473 b95479a5 Michael Hanselmann
    self._enqueue_fn = enqueue_fn
1474 b95479a5 Michael Hanselmann
1475 b95479a5 Michael Hanselmann
    self._waiters = {}
1476 b95479a5 Michael Hanselmann
    self._lock = locking.SharedLock("JobDepMgr")
1477 b95479a5 Michael Hanselmann
1478 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1479 b459a848 Andrea Spadaccini
  def GetLockInfo(self, requested): # pylint: disable=W0613
1480 fcb21ad7 Michael Hanselmann
    """Retrieves information about waiting jobs.
1481 fcb21ad7 Michael Hanselmann

1482 fcb21ad7 Michael Hanselmann
    @type requested: set
1483 fcb21ad7 Michael Hanselmann
    @param requested: Requested information, see C{query.LQ_*}
1484 fcb21ad7 Michael Hanselmann

1485 fcb21ad7 Michael Hanselmann
    """
1486 fcb21ad7 Michael Hanselmann
    # No need to sort here, that's being done by the lock manager and query
1487 fcb21ad7 Michael Hanselmann
    # library. There are no priorities for notifying jobs, hence all show up as
1488 fcb21ad7 Michael Hanselmann
    # one item under "pending".
1489 fcb21ad7 Michael Hanselmann
    return [("job/%s" % job_id, None, None,
1490 fcb21ad7 Michael Hanselmann
             [("job", [job.id for job in waiters])])
1491 fcb21ad7 Michael Hanselmann
            for job_id, waiters in self._waiters.items()
1492 fcb21ad7 Michael Hanselmann
            if waiters]
1493 fcb21ad7 Michael Hanselmann
1494 fcb21ad7 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1495 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
1496 b95479a5 Michael Hanselmann
    """Checks if a job is waiting.
1497 b95479a5 Michael Hanselmann

1498 b95479a5 Michael Hanselmann
    """
1499 b95479a5 Michael Hanselmann
    return compat.any(job in jobs
1500 b95479a5 Michael Hanselmann
                      for jobs in self._waiters.values())
1501 b95479a5 Michael Hanselmann
1502 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1503 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1504 b95479a5 Michael Hanselmann
    """Checks if a dependency job has the requested status.
1505 b95479a5 Michael Hanselmann

1506 b95479a5 Michael Hanselmann
    If the other job is not yet in a finalized status, the calling job will be
1507 b95479a5 Michael Hanselmann
    notified (re-added to the workerpool) at a later point.
1508 b95479a5 Michael Hanselmann

1509 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1510 b95479a5 Michael Hanselmann
    @param job: Job object
1511 76b62028 Iustin Pop
    @type dep_job_id: int
1512 b95479a5 Michael Hanselmann
    @param dep_job_id: ID of dependency job
1513 b95479a5 Michael Hanselmann
    @type dep_status: list
1514 b95479a5 Michael Hanselmann
    @param dep_status: Required status
1515 b95479a5 Michael Hanselmann

1516 b95479a5 Michael Hanselmann
    """
1517 76b62028 Iustin Pop
    assert ht.TJobId(job.id)
1518 76b62028 Iustin Pop
    assert ht.TJobId(dep_job_id)
1519 b95479a5 Michael Hanselmann
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1520 b95479a5 Michael Hanselmann
1521 b95479a5 Michael Hanselmann
    if job.id == dep_job_id:
1522 b95479a5 Michael Hanselmann
      return (self.ERROR, "Job can't depend on itself")
1523 b95479a5 Michael Hanselmann
1524 b95479a5 Michael Hanselmann
    # Get status of dependency job
1525 b95479a5 Michael Hanselmann
    try:
1526 b95479a5 Michael Hanselmann
      status = self._getstatus_fn(dep_job_id)
1527 b95479a5 Michael Hanselmann
    except errors.JobLost, err:
1528 b95479a5 Michael Hanselmann
      return (self.ERROR, "Dependency error: %s" % err)
1529 b95479a5 Michael Hanselmann
1530 b95479a5 Michael Hanselmann
    assert status in constants.JOB_STATUS_ALL
1531 b95479a5 Michael Hanselmann
1532 b95479a5 Michael Hanselmann
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1533 b95479a5 Michael Hanselmann
1534 b95479a5 Michael Hanselmann
    if status not in constants.JOBS_FINALIZED:
1535 b95479a5 Michael Hanselmann
      # Register for notification and wait for job to finish
1536 b95479a5 Michael Hanselmann
      job_id_waiters.add(job)
1537 b95479a5 Michael Hanselmann
      return (self.WAIT,
1538 b95479a5 Michael Hanselmann
              "Need to wait for job %s, wanted status '%s'" %
1539 b95479a5 Michael Hanselmann
              (dep_job_id, dep_status))
1540 b95479a5 Michael Hanselmann
1541 b95479a5 Michael Hanselmann
    # Remove from waiters list
1542 b95479a5 Michael Hanselmann
    if job in job_id_waiters:
1543 b95479a5 Michael Hanselmann
      job_id_waiters.remove(job)
1544 b95479a5 Michael Hanselmann
1545 b95479a5 Michael Hanselmann
    if (status == constants.JOB_STATUS_CANCELED and
1546 b95479a5 Michael Hanselmann
        constants.JOB_STATUS_CANCELED not in dep_status):
1547 b95479a5 Michael Hanselmann
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1548 b95479a5 Michael Hanselmann
1549 b95479a5 Michael Hanselmann
    elif not dep_status or status in dep_status:
1550 b95479a5 Michael Hanselmann
      return (self.CONTINUE,
1551 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s'" %
1552 b95479a5 Michael Hanselmann
              (dep_job_id, status))
1553 b95479a5 Michael Hanselmann
1554 b95479a5 Michael Hanselmann
    else:
1555 b95479a5 Michael Hanselmann
      return (self.WRONGSTATUS,
1556 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s',"
1557 b95479a5 Michael Hanselmann
              " not one of '%s' as required" %
1558 b95479a5 Michael Hanselmann
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1559 b95479a5 Michael Hanselmann
1560 37d76f1e Michael Hanselmann
  def _RemoveEmptyWaitersUnlocked(self):
1561 37d76f1e Michael Hanselmann
    """Remove all jobs without actual waiters.
1562 37d76f1e Michael Hanselmann

1563 37d76f1e Michael Hanselmann
    """
1564 37d76f1e Michael Hanselmann
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1565 37d76f1e Michael Hanselmann
                   if not waiters]:
1566 37d76f1e Michael Hanselmann
      del self._waiters[job_id]
1567 37d76f1e Michael Hanselmann
1568 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
1569 b95479a5 Michael Hanselmann
    """Notifies all jobs waiting for a certain job ID.
1570 b95479a5 Michael Hanselmann

1571 1316ebc2 Michael Hanselmann
    @attention: Do not call until L{CheckAndRegister} returned a status other
1572 1316ebc2 Michael Hanselmann
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1573 76b62028 Iustin Pop
    @type job_id: int
1574 b95479a5 Michael Hanselmann
    @param job_id: Job ID
1575 b95479a5 Michael Hanselmann

1576 b95479a5 Michael Hanselmann
    """
1577 76b62028 Iustin Pop
    assert ht.TJobId(job_id)
1578 b95479a5 Michael Hanselmann
1579 37d76f1e Michael Hanselmann
    self._lock.acquire()
1580 37d76f1e Michael Hanselmann
    try:
1581 37d76f1e Michael Hanselmann
      self._RemoveEmptyWaitersUnlocked()
1582 37d76f1e Michael Hanselmann
1583 37d76f1e Michael Hanselmann
      jobs = self._waiters.pop(job_id, None)
1584 37d76f1e Michael Hanselmann
    finally:
1585 37d76f1e Michael Hanselmann
      self._lock.release()
1586 37d76f1e Michael Hanselmann
1587 b95479a5 Michael Hanselmann
    if jobs:
1588 b95479a5 Michael Hanselmann
      # Re-add jobs to workerpool
1589 b95479a5 Michael Hanselmann
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1590 b95479a5 Michael Hanselmann
                    len(jobs), job_id)
1591 b95479a5 Michael Hanselmann
      self._enqueue_fn(jobs)
1592 b95479a5 Michael Hanselmann
1593 b95479a5 Michael Hanselmann
1594 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
1595 6c881c52 Iustin Pop
  """Decorator for "public" functions.
1596 ea03467c Iustin Pop

1597 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
1598 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
1599 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
1600 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
1601 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
1602 ea03467c Iustin Pop

1603 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
1604 f1da30e6 Michael Hanselmann

1605 6c881c52 Iustin Pop
  Example::
1606 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
1607 6c881c52 Iustin Pop
    @_RequireOpenQueue
1608 6c881c52 Iustin Pop
    def Example(self):
1609 6c881c52 Iustin Pop
      pass
1610 db37da70 Michael Hanselmann

1611 6c881c52 Iustin Pop
  """
1612 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
1613 b459a848 Andrea Spadaccini
    # pylint: disable=W0212
1614 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
1615 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
1616 6c881c52 Iustin Pop
  return wrapper
1617 db37da70 Michael Hanselmann
1618 db37da70 Michael Hanselmann
1619 c8d0be94 Michael Hanselmann
def _RequireNonDrainedQueue(fn):
1620 c8d0be94 Michael Hanselmann
  """Decorator checking for a non-drained queue.
1621 c8d0be94 Michael Hanselmann

1622 c8d0be94 Michael Hanselmann
  To be used with functions submitting new jobs.
1623 c8d0be94 Michael Hanselmann

1624 c8d0be94 Michael Hanselmann
  """
1625 c8d0be94 Michael Hanselmann
  def wrapper(self, *args, **kwargs):
1626 c8d0be94 Michael Hanselmann
    """Wrapper function.
1627 c8d0be94 Michael Hanselmann

1628 c8d0be94 Michael Hanselmann
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1629 c8d0be94 Michael Hanselmann

1630 c8d0be94 Michael Hanselmann
    """
1631 c8d0be94 Michael Hanselmann
    # Ok when sharing the big job queue lock, as the drain file is created when
1632 c8d0be94 Michael Hanselmann
    # the lock is exclusive.
1633 c8d0be94 Michael Hanselmann
    # Needs access to protected member, pylint: disable=W0212
1634 c8d0be94 Michael Hanselmann
    if self._drained:
1635 c8d0be94 Michael Hanselmann
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1636 6d5ea385 Michael Hanselmann
1637 6d5ea385 Michael Hanselmann
    if not self._accepting_jobs:
1638 6d5ea385 Michael Hanselmann
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1639 6d5ea385 Michael Hanselmann
1640 c8d0be94 Michael Hanselmann
    return fn(self, *args, **kwargs)
1641 c8d0be94 Michael Hanselmann
  return wrapper
1642 c8d0be94 Michael Hanselmann
1643 c8d0be94 Michael Hanselmann
1644 6c881c52 Iustin Pop
class JobQueue(object):
1645 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
1646 db37da70 Michael Hanselmann

1647 6c881c52 Iustin Pop
  """
1648 85f03e0d Michael Hanselmann
  def __init__(self, context):
1649 ea03467c Iustin Pop
    """Constructor for JobQueue.
1650 ea03467c Iustin Pop

1651 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
1652 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
1653 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
1654 ea03467c Iustin Pop
    running).
1655 ea03467c Iustin Pop

1656 ea03467c Iustin Pop
    @type context: GanetiContext
1657 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
1658 ea03467c Iustin Pop
        data and other ganeti objects
1659 ea03467c Iustin Pop

1660 ea03467c Iustin Pop
    """
1661 5bdce580 Michael Hanselmann
    self.context = context
1662 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
1663 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
1664 f1da30e6 Michael Hanselmann
1665 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
1666 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
1667 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
1668 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
1669 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
1670 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
1671 ebb80afa Guido Trotter
1672 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
1673 ebb80afa Guido Trotter
    self.release = self._lock.release
1674 85f03e0d Michael Hanselmann
1675 6d5ea385 Michael Hanselmann
    # Accept jobs by default
1676 6d5ea385 Michael Hanselmann
    self._accepting_jobs = True
1677 6d5ea385 Michael Hanselmann
1678 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
1679 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
1680 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1681 f1da30e6 Michael Hanselmann
1682 04ab05ce Michael Hanselmann
    # Read serial file
1683 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
1684 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
1685 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
1686 c4beba1c Iustin Pop
1687 23752136 Michael Hanselmann
    # Get initial list of nodes
1688 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
1689 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
1690 59303563 Iustin Pop
                       if n.master_candidate)
1691 8e00939c Michael Hanselmann
1692 8e00939c Michael Hanselmann
    # Remove master node
1693 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
1694 23752136 Michael Hanselmann
1695 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
1696 23752136 Michael Hanselmann
1697 6d5ea385 Michael Hanselmann
    self._queue_size = None
1698 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1699 6d5ea385 Michael Hanselmann
    assert ht.TInt(self._queue_size)
1700 ff699aa9 Michael Hanselmann
    self._drained = jstore.CheckDrainFlag()
1701 20571a26 Guido Trotter
1702 b95479a5 Michael Hanselmann
    # Job dependencies
1703 b95479a5 Michael Hanselmann
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1704 b95479a5 Michael Hanselmann
                                        self._EnqueueJobs)
1705 fcb21ad7 Michael Hanselmann
    self.context.glm.AddToLockMonitor(self.depmgr)
1706 b95479a5 Michael Hanselmann
1707 85f03e0d Michael Hanselmann
    # Setup worker pool
1708 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
1709 85f03e0d Michael Hanselmann
    try:
1710 de9d02c7 Michael Hanselmann
      self._InspectQueue()
1711 de9d02c7 Michael Hanselmann
    except:
1712 de9d02c7 Michael Hanselmann
      self._wpool.TerminateWorkers()
1713 de9d02c7 Michael Hanselmann
      raise
1714 711b5124 Michael Hanselmann
1715 de9d02c7 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1716 de9d02c7 Michael Hanselmann
  @_RequireOpenQueue
1717 de9d02c7 Michael Hanselmann
  def _InspectQueue(self):
1718 de9d02c7 Michael Hanselmann
    """Loads the whole job queue and resumes unfinished jobs.
1719 de9d02c7 Michael Hanselmann

1720 de9d02c7 Michael Hanselmann
    This function needs the lock here because WorkerPool.AddTask() may start a
1721 de9d02c7 Michael Hanselmann
    job while we're still doing our work.
1722 711b5124 Michael Hanselmann

1723 de9d02c7 Michael Hanselmann
    """
1724 de9d02c7 Michael Hanselmann
    logging.info("Inspecting job queue")
1725 de9d02c7 Michael Hanselmann
1726 7b5c4a69 Michael Hanselmann
    restartjobs = []
1727 7b5c4a69 Michael Hanselmann
1728 de9d02c7 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked()
1729 de9d02c7 Michael Hanselmann
    jobs_count = len(all_job_ids)
1730 de9d02c7 Michael Hanselmann
    lastinfo = time.time()
1731 de9d02c7 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1732 de9d02c7 Michael Hanselmann
      # Give an update every 1000 jobs or 10 seconds
1733 de9d02c7 Michael Hanselmann
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1734 de9d02c7 Michael Hanselmann
          idx == (jobs_count - 1)):
1735 de9d02c7 Michael Hanselmann
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1736 de9d02c7 Michael Hanselmann
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1737 711b5124 Michael Hanselmann
        lastinfo = time.time()
1738 94ed59a5 Iustin Pop
1739 de9d02c7 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1740 85f03e0d Michael Hanselmann
1741 de9d02c7 Michael Hanselmann
      # a failure in loading the job can cause 'None' to be returned
1742 de9d02c7 Michael Hanselmann
      if job is None:
1743 de9d02c7 Michael Hanselmann
        continue
1744 85f03e0d Michael Hanselmann
1745 de9d02c7 Michael Hanselmann
      status = job.CalcStatus()
1746 711b5124 Michael Hanselmann
1747 320d1daf Michael Hanselmann
      if status == constants.JOB_STATUS_QUEUED:
1748 7b5c4a69 Michael Hanselmann
        restartjobs.append(job)
1749 de9d02c7 Michael Hanselmann
1750 de9d02c7 Michael Hanselmann
      elif status in (constants.JOB_STATUS_RUNNING,
1751 47099cd1 Michael Hanselmann
                      constants.JOB_STATUS_WAITING,
1752 de9d02c7 Michael Hanselmann
                      constants.JOB_STATUS_CANCELING):
1753 de9d02c7 Michael Hanselmann
        logging.warning("Unfinished job %s found: %s", job.id, job)
1754 320d1daf Michael Hanselmann
1755 47099cd1 Michael Hanselmann
        if status == constants.JOB_STATUS_WAITING:
1756 320d1daf Michael Hanselmann
          # Restart job
1757 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1758 320d1daf Michael Hanselmann
          restartjobs.append(job)
1759 320d1daf Michael Hanselmann
        else:
1760 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1761 320d1daf Michael Hanselmann
                                "Unclean master daemon shutdown")
1762 45df0793 Michael Hanselmann
          job.Finalize()
1763 320d1daf Michael Hanselmann
1764 de9d02c7 Michael Hanselmann
        self.UpdateJobUnlocked(job)
1765 de9d02c7 Michael Hanselmann
1766 7b5c4a69 Michael Hanselmann
    if restartjobs:
1767 7b5c4a69 Michael Hanselmann
      logging.info("Restarting %s jobs", len(restartjobs))
1768 75d81fc8 Michael Hanselmann
      self._EnqueueJobsUnlocked(restartjobs)
1769 7b5c4a69 Michael Hanselmann
1770 de9d02c7 Michael Hanselmann
    logging.info("Job queue inspection finished")
1771 85f03e0d Michael Hanselmann
1772 fb1ffbca Michael Hanselmann
  def _GetRpc(self, address_list):
1773 fb1ffbca Michael Hanselmann
    """Gets RPC runner with context.
1774 fb1ffbca Michael Hanselmann

1775 fb1ffbca Michael Hanselmann
    """
1776 fb1ffbca Michael Hanselmann
    return rpc.JobQueueRunner(self.context, address_list)
1777 fb1ffbca Michael Hanselmann
1778 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1779 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1780 99aabbed Iustin Pop
  def AddNode(self, node):
1781 99aabbed Iustin Pop
    """Register a new node with the queue.
1782 99aabbed Iustin Pop

1783 99aabbed Iustin Pop
    @type node: L{objects.Node}
1784 99aabbed Iustin Pop
    @param node: the node object to be added
1785 99aabbed Iustin Pop

1786 99aabbed Iustin Pop
    """
1787 99aabbed Iustin Pop
    node_name = node.name
1788 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1789 23752136 Michael Hanselmann
1790 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1791 fb1ffbca Michael Hanselmann
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1792 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1793 c8457ce7 Iustin Pop
    if msg:
1794 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1795 c8457ce7 Iustin Pop
                      node_name, msg)
1796 23752136 Michael Hanselmann
1797 59303563 Iustin Pop
    if not node.master_candidate:
1798 59303563 Iustin Pop
      # remove if existing, ignoring errors
1799 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1800 59303563 Iustin Pop
      # and skip the replication of the job ids
1801 59303563 Iustin Pop
      return
1802 59303563 Iustin Pop
1803 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1804 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1805 23752136 Michael Hanselmann
1806 d2e03a33 Michael Hanselmann
    # Upload current serial file
1807 e2b4a7ba Michael Hanselmann
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1808 d2e03a33 Michael Hanselmann
1809 fb1ffbca Michael Hanselmann
    # Static address list
1810 fb1ffbca Michael Hanselmann
    addrs = [node.primary_ip]
1811 fb1ffbca Michael Hanselmann
1812 d2e03a33 Michael Hanselmann
    for file_name in files:
1813 9f774ee8 Michael Hanselmann
      # Read file content
1814 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1815 9f774ee8 Michael Hanselmann
1816 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1817 cffbbae7 Michael Hanselmann
                             file_name, content)
1818 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1819 c8457ce7 Iustin Pop
      if msg:
1820 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1821 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1822 d2e03a33 Michael Hanselmann
1823 be6c403e Michael Hanselmann
    # Set queue drained flag
1824 be6c403e Michael Hanselmann
    result = \
1825 be6c403e Michael Hanselmann
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1826 be6c403e Michael Hanselmann
                                                       self._drained)
1827 be6c403e Michael Hanselmann
    msg = result[node_name].fail_msg
1828 be6c403e Michael Hanselmann
    if msg:
1829 be6c403e Michael Hanselmann
      logging.error("Failed to set queue drained flag on node %s: %s",
1830 be6c403e Michael Hanselmann
                    node_name, msg)
1831 be6c403e Michael Hanselmann
1832 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1833 d2e03a33 Michael Hanselmann
1834 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1835 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1836 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1837 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1838 ea03467c Iustin Pop

1839 ea03467c Iustin Pop
    @type node_name: str
1840 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1841 ea03467c Iustin Pop

1842 ea03467c Iustin Pop
    """
1843 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1844 23752136 Michael Hanselmann
1845 7e950d31 Iustin Pop
  @staticmethod
1846 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1847 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1848 ea03467c Iustin Pop

1849 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1850 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1851 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1852 ea03467c Iustin Pop

1853 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1854 ea03467c Iustin Pop
    @type nodes: list
1855 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1856 ea03467c Iustin Pop
    @type failmsg: str
1857 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1858 ea03467c Iustin Pop

1859 ea03467c Iustin Pop
    """
1860 e74798c1 Michael Hanselmann
    failed = []
1861 e74798c1 Michael Hanselmann
    success = []
1862 e74798c1 Michael Hanselmann
1863 e74798c1 Michael Hanselmann
    for node in nodes:
1864 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1865 c8457ce7 Iustin Pop
      if msg:
1866 e74798c1 Michael Hanselmann
        failed.append(node)
1867 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1868 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1869 c8457ce7 Iustin Pop
      else:
1870 c8457ce7 Iustin Pop
        success.append(node)
1871 e74798c1 Michael Hanselmann
1872 e74798c1 Michael Hanselmann
    # +1 for the master node
1873 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1874 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1875 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1876 e74798c1 Michael Hanselmann
1877 99aabbed Iustin Pop
  def _GetNodeIp(self):
1878 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1879 99aabbed Iustin Pop

1880 ea03467c Iustin Pop
    @rtype: (list, list)
1881 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1882 ea03467c Iustin Pop
        names and the second one with the node addresses
1883 ea03467c Iustin Pop

1884 99aabbed Iustin Pop
    """
1885 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1886 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1887 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1888 99aabbed Iustin Pop
    return name_list, addr_list
1889 99aabbed Iustin Pop
1890 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1891 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1892 8e00939c Michael Hanselmann

1893 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1894 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1895 ea03467c Iustin Pop

1896 ea03467c Iustin Pop
    @type file_name: str
1897 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1898 ea03467c Iustin Pop
    @type data: str
1899 ea03467c Iustin Pop
    @param data: the new contents of the file
1900 4c36bdf5 Guido Trotter
    @type replicate: boolean
1901 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1902 ea03467c Iustin Pop

1903 8e00939c Michael Hanselmann
    """
1904 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1905 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1906 fe05a931 Michele Tartara
                    gid=getents.daemons_gid,
1907 fe05a931 Michele Tartara
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1908 8e00939c Michael Hanselmann
1909 4c36bdf5 Guido Trotter
    if replicate:
1910 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1911 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1912 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1913 23752136 Michael Hanselmann
1914 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1915 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1916 ea03467c Iustin Pop

1917 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1918 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1919 ea03467c Iustin Pop

1920 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1921 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1922 ea03467c Iustin Pop

1923 ea03467c Iustin Pop
    """
1924 dd875d32 Michael Hanselmann
    # Rename them locally
1925 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1926 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1927 abc1f2ce Michael Hanselmann
1928 dd875d32 Michael Hanselmann
    # ... and on all nodes
1929 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1930 fb1ffbca Michael Hanselmann
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1931 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1932 abc1f2ce Michael Hanselmann
1933 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1934 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1935 f1da30e6 Michael Hanselmann

1936 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1937 f1da30e6 Michael Hanselmann

1938 009e73d0 Iustin Pop
    @type count: integer
1939 009e73d0 Iustin Pop
    @param count: how many serials to return
1940 76b62028 Iustin Pop
    @rtype: list of int
1941 76b62028 Iustin Pop
    @return: a list of job identifiers.
1942 f1da30e6 Michael Hanselmann

1943 f1da30e6 Michael Hanselmann
    """
1944 2c9fa1ff Iustin Pop
    assert ht.TNonNegativeInt(count)
1945 719f8fba Michael Hanselmann
1946 f1da30e6 Michael Hanselmann
    # New number
1947 009e73d0 Iustin Pop
    serial = self._last_serial + count
1948 f1da30e6 Michael Hanselmann
1949 f1da30e6 Michael Hanselmann
    # Write to file
1950 e2b4a7ba Michael Hanselmann
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1951 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1952 f1da30e6 Michael Hanselmann
1953 1410a389 Michael Hanselmann
    result = [jstore.FormatJobID(v)
1954 3c88bf36 Michael Hanselmann
              for v in range(self._last_serial + 1, serial + 1)]
1955 3c88bf36 Michael Hanselmann
1956 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1957 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1958 f1da30e6 Michael Hanselmann
1959 3c88bf36 Michael Hanselmann
    assert len(result) == count
1960 3c88bf36 Michael Hanselmann
1961 009e73d0 Iustin Pop
    return result
1962 f1da30e6 Michael Hanselmann
1963 85f03e0d Michael Hanselmann
  @staticmethod
1964 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1965 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1966 ea03467c Iustin Pop

1967 ea03467c Iustin Pop
    @type job_id: str
1968 ea03467c Iustin Pop
    @param job_id: the job identifier
1969 ea03467c Iustin Pop
    @rtype: str
1970 ea03467c Iustin Pop
    @return: the path to the job file
1971 ea03467c Iustin Pop

1972 ea03467c Iustin Pop
    """
1973 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1974 f1da30e6 Michael Hanselmann
1975 1410a389 Michael Hanselmann
  @staticmethod
1976 1410a389 Michael Hanselmann
  def _GetArchivedJobPath(job_id):
1977 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1978 ea03467c Iustin Pop

1979 ea03467c Iustin Pop
    @type job_id: str
1980 ea03467c Iustin Pop
    @param job_id: the job identifier
1981 ea03467c Iustin Pop
    @rtype: str
1982 ea03467c Iustin Pop
    @return: the path to the archived job file
1983 ea03467c Iustin Pop

1984 ea03467c Iustin Pop
    """
1985 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1986 1410a389 Michael Hanselmann
                          jstore.GetArchiveDirectory(job_id),
1987 1410a389 Michael Hanselmann
                          "job-%s" % job_id)
1988 0cb94105 Michael Hanselmann
1989 cb66225d Michael Hanselmann
  @staticmethod
1990 0422250e Michael Hanselmann
  def _DetermineJobDirectories(archived):
1991 bb921668 Michael Hanselmann
    """Build list of directories containing job files.
1992 bb921668 Michael Hanselmann

1993 bb921668 Michael Hanselmann
    @type archived: bool
1994 bb921668 Michael Hanselmann
    @param archived: Whether to include directories for archived jobs
1995 bb921668 Michael Hanselmann
    @rtype: list
1996 bb921668 Michael Hanselmann

1997 bb921668 Michael Hanselmann
    """
1998 0422250e Michael Hanselmann
    result = [pathutils.QUEUE_DIR]
1999 0422250e Michael Hanselmann
2000 0422250e Michael Hanselmann
    if archived:
2001 0422250e Michael Hanselmann
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
2002 0422250e Michael Hanselmann
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
2003 0422250e Michael Hanselmann
                        utils.ListVisibleFiles(archive_path)))
2004 0422250e Michael Hanselmann
2005 0422250e Michael Hanselmann
    return result
2006 0422250e Michael Hanselmann
2007 0422250e Michael Hanselmann
  @classmethod
2008 0422250e Michael Hanselmann
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
2009 911a495b Iustin Pop
    """Return all known job IDs.
2010 911a495b Iustin Pop

2011 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
2012 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
2013 ac0930b9 Iustin Pop
    extra IDs).
2014 ac0930b9 Iustin Pop

2015 85a1c57d Guido Trotter
    @type sort: boolean
2016 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
2017 ea03467c Iustin Pop
    @rtype: list
2018 ea03467c Iustin Pop
    @return: the list of job IDs
2019 ea03467c Iustin Pop

2020 911a495b Iustin Pop
    """
2021 85a1c57d Guido Trotter
    jlist = []
2022 0422250e Michael Hanselmann
2023 0422250e Michael Hanselmann
    for path in cls._DetermineJobDirectories(archived):
2024 0422250e Michael Hanselmann
      for filename in utils.ListVisibleFiles(path):
2025 0422250e Michael Hanselmann
        m = constants.JOB_FILE_RE.match(filename)
2026 0422250e Michael Hanselmann
        if m:
2027 0422250e Michael Hanselmann
          jlist.append(int(m.group(1)))
2028 0422250e Michael Hanselmann
2029 85a1c57d Guido Trotter
    if sort:
2030 76b62028 Iustin Pop
      jlist.sort()
2031 f0d874fe Iustin Pop
    return jlist
2032 911a495b Iustin Pop
2033 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
2034 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
2035 ea03467c Iustin Pop

2036 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
2037 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
2038 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
2039 ea03467c Iustin Pop

2040 76b62028 Iustin Pop
    @type job_id: int
2041 ea03467c Iustin Pop
    @param job_id: the job id
2042 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
2043 ea03467c Iustin Pop
    @return: either None or the job object
2044 ea03467c Iustin Pop

2045 ea03467c Iustin Pop
    """
2046 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
2047 5685c1a5 Michael Hanselmann
    if job:
2048 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
2049 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Found read-only job in memcache"
2050 5685c1a5 Michael Hanselmann
      return job
2051 ac0930b9 Iustin Pop
2052 3d6c5566 Guido Trotter
    try:
2053 194c8ca4 Michael Hanselmann
      job = self._LoadJobFromDisk(job_id, False)
2054 aa9f8167 Iustin Pop
      if job is None:
2055 aa9f8167 Iustin Pop
        return job
2056 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
2057 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
2058 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
2059 3d6c5566 Guido Trotter
      if old_path == new_path:
2060 3d6c5566 Guido Trotter
        # job already archived (future case)
2061 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
2062 3d6c5566 Guido Trotter
      else:
2063 3d6c5566 Guido Trotter
        # non-archived case
2064 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
2065 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
2066 3d6c5566 Guido Trotter
      return None
2067 162c8636 Guido Trotter
2068 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Job just loaded is not writable"
2069 c0f6d0d8 Michael Hanselmann
2070 162c8636 Guido Trotter
    self._memcache[job_id] = job
2071 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
2072 162c8636 Guido Trotter
    return job
2073 162c8636 Guido Trotter
2074 c0f6d0d8 Michael Hanselmann
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2075 162c8636 Guido Trotter
    """Load the given job file from disk.
2076 162c8636 Guido Trotter

2077 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2078 162c8636 Guido Trotter

2079 76b62028 Iustin Pop
    @type job_id: int
2080 162c8636 Guido Trotter
    @param job_id: job identifier
2081 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2082 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2083 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
2084 162c8636 Guido Trotter
    @return: either None or the job object
2085 162c8636 Guido Trotter

2086 162c8636 Guido Trotter
    """
2087 8a3cd185 Michael Hanselmann
    path_functions = [(self._GetJobPath, False)]
2088 194c8ca4 Michael Hanselmann
2089 194c8ca4 Michael Hanselmann
    if try_archived:
2090 8a3cd185 Michael Hanselmann
      path_functions.append((self._GetArchivedJobPath, True))
2091 194c8ca4 Michael Hanselmann
2092 194c8ca4 Michael Hanselmann
    raw_data = None
2093 8a3cd185 Michael Hanselmann
    archived = None
2094 194c8ca4 Michael Hanselmann
2095 8a3cd185 Michael Hanselmann
    for (fn, archived) in path_functions:
2096 194c8ca4 Michael Hanselmann
      filepath = fn(job_id)
2097 194c8ca4 Michael Hanselmann
      logging.debug("Loading job from %s", filepath)
2098 194c8ca4 Michael Hanselmann
      try:
2099 194c8ca4 Michael Hanselmann
        raw_data = utils.ReadFile(filepath)
2100 194c8ca4 Michael Hanselmann
      except EnvironmentError, err:
2101 194c8ca4 Michael Hanselmann
        if err.errno != errno.ENOENT:
2102 194c8ca4 Michael Hanselmann
          raise
2103 194c8ca4 Michael Hanselmann
      else:
2104 194c8ca4 Michael Hanselmann
        break
2105 194c8ca4 Michael Hanselmann
2106 194c8ca4 Michael Hanselmann
    if not raw_data:
2107 194c8ca4 Michael Hanselmann
      return None
2108 13998ef2 Michael Hanselmann
2109 c0f6d0d8 Michael Hanselmann
    if writable is None:
2110 8a3cd185 Michael Hanselmann
      writable = not archived
2111 c0f6d0d8 Michael Hanselmann
2112 94ed59a5 Iustin Pop
    try:
2113 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
2114 8a3cd185 Michael Hanselmann
      job = _QueuedJob.Restore(self, data, writable, archived)
2115 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
2116 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
2117 94ed59a5 Iustin Pop
2118 ac0930b9 Iustin Pop
    return job
2119 f1da30e6 Michael Hanselmann
2120 c0f6d0d8 Michael Hanselmann
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2121 0f9c08dc Guido Trotter
    """Load the given job file from disk.
2122 0f9c08dc Guido Trotter

2123 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2124 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
2125 0f9c08dc Guido Trotter
    exception is logged.
2126 0f9c08dc Guido Trotter

2127 76b62028 Iustin Pop
    @type job_id: int
2128 0f9c08dc Guido Trotter
    @param job_id: job identifier
2129 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2130 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2131 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
2132 0f9c08dc Guido Trotter
    @return: either None or the job object
2133 0f9c08dc Guido Trotter

2134 0f9c08dc Guido Trotter
    """
2135 0f9c08dc Guido Trotter
    try:
2136 c0f6d0d8 Michael Hanselmann
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2137 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
2138 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
2139 0f9c08dc Guido Trotter
      return None
2140 0f9c08dc Guido Trotter
2141 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
2142 20571a26 Guido Trotter
    """Update the queue size.
2143 20571a26 Guido Trotter

2144 20571a26 Guido Trotter
    """
2145 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2146 20571a26 Guido Trotter
2147 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2148 20571a26 Guido Trotter
  @_RequireOpenQueue
2149 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
2150 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
2151 3ccafd0e Iustin Pop

2152 ea03467c Iustin Pop
    @type drain_flag: boolean
2153 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
2154 ea03467c Iustin Pop

2155 3ccafd0e Iustin Pop
    """
2156 be6c403e Michael Hanselmann
    # Change flag locally
2157 ff699aa9 Michael Hanselmann
    jstore.SetDrainFlag(drain_flag)
2158 20571a26 Guido Trotter
2159 20571a26 Guido Trotter
    self._drained = drain_flag
2160 20571a26 Guido Trotter
2161 be6c403e Michael Hanselmann
    # ... and on all nodes
2162 be6c403e Michael Hanselmann
    (names, addrs) = self._GetNodeIp()
2163 be6c403e Michael Hanselmann
    result = \
2164 be6c403e Michael Hanselmann
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2165 be6c403e Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
2166 be6c403e Michael Hanselmann
                         "Setting queue drain flag to %s" % drain_flag)
2167 be6c403e Michael Hanselmann
2168 3ccafd0e Iustin Pop
    return True
2169 3ccafd0e Iustin Pop
2170 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2171 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
2172 85f03e0d Michael Hanselmann
    """Create and store a new job.
2173 f1da30e6 Michael Hanselmann

2174 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
2175 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
2176 c3f0a12f Iustin Pop

2177 009e73d0 Iustin Pop
    @type job_id: job ID
2178 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
2179 c3f0a12f Iustin Pop
    @type ops: list
2180 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
2181 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
2182 7beb1e53 Guido Trotter
    @return: the job object to be queued
2183 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2184 e71c8147 Michael Hanselmann
    @raise errors.GenericError: If an opcode is not valid
2185 c3f0a12f Iustin Pop

2186 c3f0a12f Iustin Pop
    """
2187 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2188 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
2189 f87b405e Michael Hanselmann
2190 c0f6d0d8 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops, True)
2191 f1da30e6 Michael Hanselmann
2192 e71c8147 Michael Hanselmann
    for idx, op in enumerate(job.ops):
2193 42d49574 Michael Hanselmann
      # Check priority
2194 e71c8147 Michael Hanselmann
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2195 e71c8147 Michael Hanselmann
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2196 e71c8147 Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2197 e71c8147 Michael Hanselmann
                                  " are %s" % (idx, op.priority, allowed))
2198 e71c8147 Michael Hanselmann
2199 42d49574 Michael Hanselmann
      # Check job dependencies
2200 580b1fdd Jose A. Lopes
      dependencies = getattr(op.input, opcodes_base.DEPEND_ATTR, None)
2201 580b1fdd Jose A. Lopes
      if not opcodes_base.TNoRelativeJobDependencies(dependencies):
2202 b247c6fc Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2203 b247c6fc Michael Hanselmann
                                  " match %s: %s" %
2204 580b1fdd Jose A. Lopes
                                  (idx, opcodes_base.TNoRelativeJobDependencies,
2205 b247c6fc Michael Hanselmann
                                   dependencies))
2206 b247c6fc Michael Hanselmann
2207 f1da30e6 Michael Hanselmann
    # Write to disk
2208 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
2209 f1da30e6 Michael Hanselmann
2210 20571a26 Guido Trotter
    self._queue_size += 1
2211 20571a26 Guido Trotter
2212 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
2213 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
2214 ac0930b9 Iustin Pop
2215 7beb1e53 Guido Trotter
    return job
2216 f1da30e6 Michael Hanselmann
2217 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2218 2971c913 Iustin Pop
  @_RequireOpenQueue
2219 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2220 2971c913 Iustin Pop
  def SubmitJob(self, ops):
2221 2971c913 Iustin Pop
    """Create and store a new job.
2222 2971c913 Iustin Pop

2223 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2224 2971c913 Iustin Pop

2225 2971c913 Iustin Pop
    """
2226 b247c6fc Michael Hanselmann
    (job_id, ) = self._NewSerialsUnlocked(1)
2227 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2228 7beb1e53 Guido Trotter
    return job_id
2229 2971c913 Iustin Pop
2230 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2231 2971c913 Iustin Pop
  @_RequireOpenQueue
2232 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2233 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
2234 2971c913 Iustin Pop
    """Create and store multiple jobs.
2235 2971c913 Iustin Pop

2236 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2237 2971c913 Iustin Pop

2238 2971c913 Iustin Pop
    """
2239 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2240 b247c6fc Michael Hanselmann
2241 b247c6fc Michael Hanselmann
    (results, added_jobs) = \
2242 b247c6fc Michael Hanselmann
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2243 7b5c4a69 Michael Hanselmann
2244 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked(added_jobs)
2245 2971c913 Iustin Pop
2246 2971c913 Iustin Pop
    return results
2247 2971c913 Iustin Pop
2248 b247c6fc Michael Hanselmann
  @staticmethod
2249 b247c6fc Michael Hanselmann
  def _FormatSubmitError(msg, ops):
2250 b247c6fc Michael Hanselmann
    """Formats errors which occurred while submitting a job.
2251 b247c6fc Michael Hanselmann

2252 b247c6fc Michael Hanselmann
    """
2253 b247c6fc Michael Hanselmann
    return ("%s; opcodes %s" %
2254 b247c6fc Michael Hanselmann
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2255 b247c6fc Michael Hanselmann
2256 b247c6fc Michael Hanselmann
  @staticmethod
2257 b247c6fc Michael Hanselmann
  def _ResolveJobDependencies(resolve_fn, deps):
2258 b247c6fc Michael Hanselmann
    """Resolves relative job IDs in dependencies.
2259 b247c6fc Michael Hanselmann

2260 b247c6fc Michael Hanselmann
    @type resolve_fn: callable
2261 b247c6fc Michael Hanselmann
    @param resolve_fn: Function to resolve a relative job ID
2262 b247c6fc Michael Hanselmann
    @type deps: list
2263 b247c6fc Michael Hanselmann
    @param deps: Dependencies
2264 4c27b231 Michael Hanselmann
    @rtype: tuple; (boolean, string or list)
2265 4c27b231 Michael Hanselmann
    @return: If successful (first tuple item), the returned list contains
2266 4c27b231 Michael Hanselmann
      resolved job IDs along with the requested status; if not successful,
2267 4c27b231 Michael Hanselmann
      the second element is an error message
2268 b247c6fc Michael Hanselmann

2269 b247c6fc Michael Hanselmann
    """
2270 b247c6fc Michael Hanselmann
    result = []
2271 b247c6fc Michael Hanselmann
2272 b247c6fc Michael Hanselmann
    for (dep_job_id, dep_status) in deps:
2273 b247c6fc Michael Hanselmann
      if ht.TRelativeJobId(dep_job_id):
2274 b247c6fc Michael Hanselmann
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2275 b247c6fc Michael Hanselmann
        try:
2276 b247c6fc Michael Hanselmann
          job_id = resolve_fn(dep_job_id)
2277 b247c6fc Michael Hanselmann
        except IndexError:
2278 b247c6fc Michael Hanselmann
          # Abort
2279 b247c6fc Michael Hanselmann
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2280 b247c6fc Michael Hanselmann
      else:
2281 b247c6fc Michael Hanselmann
        job_id = dep_job_id
2282 b247c6fc Michael Hanselmann
2283 b247c6fc Michael Hanselmann
      result.append((job_id, dep_status))
2284 b247c6fc Michael Hanselmann
2285 b247c6fc Michael Hanselmann
    return (True, result)
2286 b247c6fc Michael Hanselmann
2287 b247c6fc Michael Hanselmann
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2288 b247c6fc Michael Hanselmann
    """Create and store multiple jobs.
2289 b247c6fc Michael Hanselmann

2290 b247c6fc Michael Hanselmann
    @see: L{_SubmitJobUnlocked}
2291 b247c6fc Michael Hanselmann

2292 b247c6fc Michael Hanselmann
    """
2293 b247c6fc Michael Hanselmann
    results = []
2294 b247c6fc Michael Hanselmann
    added_jobs = []
2295 b247c6fc Michael Hanselmann
2296 b247c6fc Michael Hanselmann
    def resolve_fn(job_idx, reljobid):
2297 b247c6fc Michael Hanselmann
      assert reljobid < 0
2298 b247c6fc Michael Hanselmann
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2299 b247c6fc Michael Hanselmann
2300 b247c6fc Michael Hanselmann
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2301 b247c6fc Michael Hanselmann
      for op in ops:
2302 580b1fdd Jose A. Lopes
        if getattr(op, opcodes_base.DEPEND_ATTR, None):
2303 b247c6fc Michael Hanselmann
          (status, data) = \
2304 b247c6fc Michael Hanselmann
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2305 b247c6fc Michael Hanselmann
                                         op.depends)
2306 b247c6fc Michael Hanselmann
          if not status:
2307 b247c6fc Michael Hanselmann
            # Abort resolving dependencies
2308 b247c6fc Michael Hanselmann
            assert ht.TNonEmptyString(data), "No error message"
2309 b247c6fc Michael Hanselmann
            break
2310 b247c6fc Michael Hanselmann
          # Use resolved dependencies
2311 b247c6fc Michael Hanselmann
          op.depends = data
2312 b247c6fc Michael Hanselmann
      else:
2313 b247c6fc Michael Hanselmann
        try:
2314 b247c6fc Michael Hanselmann
          job = self._SubmitJobUnlocked(job_id, ops)
2315 b247c6fc Michael Hanselmann
        except errors.GenericError, err:
2316 b247c6fc Michael Hanselmann
          status = False
2317 b247c6fc Michael Hanselmann
          data = self._FormatSubmitError(str(err), ops)
2318 b247c6fc Michael Hanselmann
        else:
2319 b247c6fc Michael Hanselmann
          status = True
2320 b247c6fc Michael Hanselmann
          data = job_id
2321 b247c6fc Michael Hanselmann
          added_jobs.append(job)
2322 b247c6fc Michael Hanselmann
2323 b247c6fc Michael Hanselmann
      results.append((status, data))
2324 b247c6fc Michael Hanselmann
2325 b247c6fc Michael Hanselmann
    return (results, added_jobs)
2326 b247c6fc Michael Hanselmann
2327 75d81fc8 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2328 7b5c4a69 Michael Hanselmann
  def _EnqueueJobs(self, jobs):
2329 7b5c4a69 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2330 7b5c4a69 Michael Hanselmann

2331 7b5c4a69 Michael Hanselmann
    @type jobs: list
2332 7b5c4a69 Michael Hanselmann
    @param jobs: List of all jobs
2333 7b5c4a69 Michael Hanselmann

2334 7b5c4a69 Michael Hanselmann
    """
2335 75d81fc8 Michael Hanselmann
    return self._EnqueueJobsUnlocked(jobs)
2336 75d81fc8 Michael Hanselmann
2337 75d81fc8 Michael Hanselmann
  def _EnqueueJobsUnlocked(self, jobs):
2338 75d81fc8 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2339 75d81fc8 Michael Hanselmann

2340 75d81fc8 Michael Hanselmann
    @type jobs: list
2341 75d81fc8 Michael Hanselmann
    @param jobs: List of all jobs
2342 75d81fc8 Michael Hanselmann

2343 75d81fc8 Michael Hanselmann
    """
2344 75d81fc8 Michael Hanselmann
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2345 7b5c4a69 Michael Hanselmann
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2346 99fb250b Michael Hanselmann
                             priority=[job.CalcPriority() for job in jobs],
2347 99fb250b Michael Hanselmann
                             task_id=map(_GetIdAttr, jobs))
2348 7b5c4a69 Michael Hanselmann
2349 b95479a5 Michael Hanselmann
  def _GetJobStatusForDependencies(self, job_id):
2350 b95479a5 Michael Hanselmann
    """Gets the status of a job for dependencies.
2351 b95479a5 Michael Hanselmann

2352 76b62028 Iustin Pop
    @type job_id: int
2353 b95479a5 Michael Hanselmann
    @param job_id: Job ID
2354 b95479a5 Michael Hanselmann
    @raise errors.JobLost: If job can't be found
2355 b95479a5 Michael Hanselmann

2356 b95479a5 Michael Hanselmann
    """
2357 b95479a5 Michael Hanselmann
    # Not using in-memory cache as doing so would require an exclusive lock
2358 b95479a5 Michael Hanselmann
2359 b95479a5 Michael Hanselmann
    # Try to load from disk
2360 c0f6d0d8 Michael Hanselmann
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2361 c0f6d0d8 Michael Hanselmann
2362 17385bd2 Andrea Spadaccini
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2363 b95479a5 Michael Hanselmann
2364 b95479a5 Michael Hanselmann
    if job:
2365 b95479a5 Michael Hanselmann
      return job.CalcStatus()
2366 b95479a5 Michael Hanselmann
2367 b95479a5 Michael Hanselmann
    raise errors.JobLost("Job %s not found" % job_id)
2368 b95479a5 Michael Hanselmann
2369 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2370 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
2371 ea03467c Iustin Pop
    """Update a job's on disk storage.
2372 ea03467c Iustin Pop

2373 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
2374 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
2375 ea03467c Iustin Pop
    nodes.
2376 ea03467c Iustin Pop

2377 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
2378 ea03467c Iustin Pop
    @param job: the changed job
2379 4c36bdf5 Guido Trotter
    @type replicate: boolean
2380 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
2381 ea03467c Iustin Pop

2382 ea03467c Iustin Pop
    """
2383 66bd7445 Michael Hanselmann
    if __debug__:
2384 66bd7445 Michael Hanselmann
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2385 66bd7445 Michael Hanselmann
      assert (finalized ^ (job.end_timestamp is None))
2386 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't update read-only job"
2387 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't update archived job"
2388 66bd7445 Michael Hanselmann
2389 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
2390 a182a3ed Michael Hanselmann
    data = serializer.DumpJson(job.Serialize())
2391 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
2392 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
2393 ac0930b9 Iustin Pop
2394 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2395 5c735209 Iustin Pop
                        timeout):
2396 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
2397 6c5a7090 Michael Hanselmann

2398 76b62028 Iustin Pop
    @type job_id: int
2399 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
2400 6c5a7090 Michael Hanselmann
    @type fields: list of strings
2401 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
2402 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
2403 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
2404 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
2405 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
2406 5c735209 Iustin Pop
    @type timeout: float
2407 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
2408 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
2409 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
2410 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
2411 ea03467c Iustin Pop

2412 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
2413 ea03467c Iustin Pop
        we instead return a special value,
2414 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2415 ea03467c Iustin Pop
        as such by the clients
2416 6c5a7090 Michael Hanselmann

2417 6c5a7090 Michael Hanselmann
    """
2418 04569469 Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2419 c0f6d0d8 Michael Hanselmann
                             writable=False)
2420 989a8bee Michael Hanselmann
2421 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
2422 989a8bee Michael Hanselmann
2423 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
2424 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
2425 dfe57c22 Michael Hanselmann
2426 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2427 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2428 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
2429 188c5e0a Michael Hanselmann
    """Cancels a job.
2430 188c5e0a Michael Hanselmann

2431 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
2432 ea03467c Iustin Pop

2433 76b62028 Iustin Pop
    @type job_id: int
2434 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
2435 188c5e0a Michael Hanselmann

2436 188c5e0a Michael Hanselmann
    """
2437 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
2438 188c5e0a Michael Hanselmann
2439 aebd0e4e Michael Hanselmann
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2440 aebd0e4e Michael Hanselmann
2441 4679547e Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2442 4679547e Michael Hanselmann
  @_RequireOpenQueue
2443 4679547e Michael Hanselmann
  def ChangeJobPriority(self, job_id, priority):
2444 4679547e Michael Hanselmann
    """Changes a job's priority.
2445 4679547e Michael Hanselmann

2446 4679547e Michael Hanselmann
    @type job_id: int
2447 4679547e Michael Hanselmann
    @param job_id: ID of the job whose priority should be changed
2448 4679547e Michael Hanselmann
    @type priority: int
2449 4679547e Michael Hanselmann
    @param priority: New priority
2450 4679547e Michael Hanselmann

2451 4679547e Michael Hanselmann
    """
2452 4679547e Michael Hanselmann
    logging.info("Changing priority of job %s to %s", job_id, priority)
2453 4679547e Michael Hanselmann
2454 4679547e Michael Hanselmann
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2455 4679547e Michael Hanselmann
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2456 4679547e Michael Hanselmann
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2457 4679547e Michael Hanselmann
                                (priority, allowed))
2458 4679547e Michael Hanselmann
2459 4679547e Michael Hanselmann
    def fn(job):
2460 4679547e Michael Hanselmann
      (success, msg) = job.ChangePriority(priority)
2461 4679547e Michael Hanselmann
2462 4679547e Michael Hanselmann
      if success:
2463 4679547e Michael Hanselmann
        try:
2464 4679547e Michael Hanselmann
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2465 4679547e Michael Hanselmann
        except workerpool.NoSuchTask:
2466 4679547e Michael Hanselmann
          logging.debug("Job %s is not in workerpool at this time", job.id)
2467 4679547e Michael Hanselmann
2468 4679547e Michael Hanselmann
      return (success, msg)
2469 4679547e Michael Hanselmann
2470 4679547e Michael Hanselmann
    return self._ModifyJobUnlocked(job_id, fn)
2471 4679547e Michael Hanselmann
2472 aebd0e4e Michael Hanselmann
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2473 aebd0e4e Michael Hanselmann
    """Modifies a job.
2474 aebd0e4e Michael Hanselmann

2475 aebd0e4e Michael Hanselmann
    @type job_id: int
2476 aebd0e4e Michael Hanselmann
    @param job_id: Job ID
2477 aebd0e4e Michael Hanselmann
    @type mod_fn: callable
2478 aebd0e4e Michael Hanselmann
    @param mod_fn: Modifying function, receiving job object as parameter,
2479 aebd0e4e Michael Hanselmann
      returning tuple of (status boolean, message string)
2480 aebd0e4e Michael Hanselmann

2481 aebd0e4e Michael Hanselmann
    """
2482 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2483 188c5e0a Michael Hanselmann
    if not job:
2484 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2485 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
2486 fbf0262f Michael Hanselmann
2487 aebd0e4e Michael Hanselmann
    assert job.writable, "Can't modify read-only job"
2488 aebd0e4e Michael Hanselmann
    assert not job.archived, "Can't modify archived job"
2489 c0f6d0d8 Michael Hanselmann
2490 aebd0e4e Michael Hanselmann
    (success, msg) = mod_fn(job)
2491 188c5e0a Michael Hanselmann
2492 099b2870 Michael Hanselmann
    if success:
2493 66bd7445 Michael Hanselmann
      # If the job was finalized (e.g. cancelled), this is the final write
2494 66bd7445 Michael Hanselmann
      # allowed. The job can be archived anytime.
2495 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
2496 fbf0262f Michael Hanselmann
2497 099b2870 Michael Hanselmann
    return (success, msg)
2498 fbf0262f Michael Hanselmann
2499 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
2500 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
2501 d7fd1f28 Michael Hanselmann
    """Archives jobs.
2502 c609f802 Michael Hanselmann

2503 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
2504 25e7b43f Iustin Pop
    @param jobs: Job objects
2505 d7fd1f28 Michael Hanselmann
    @rtype: int
2506 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
2507 c609f802 Michael Hanselmann

2508 c609f802 Michael Hanselmann
    """
2509 d7fd1f28 Michael Hanselmann
    archive_jobs = []
2510 d7fd1f28 Michael Hanselmann
    rename_files = []
2511 d7fd1f28 Michael Hanselmann
    for job in jobs:
2512 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't archive read-only job"
2513 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't cancel archived job"
2514 c0f6d0d8 Michael Hanselmann
2515 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2516 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
2517 d7fd1f28 Michael Hanselmann
        continue
2518 c609f802 Michael Hanselmann
2519 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
2520 c609f802 Michael Hanselmann
2521 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
2522 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
2523 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
2524 c609f802 Michael Hanselmann
2525 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
2526 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
2527 f1da30e6 Michael Hanselmann
2528 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
2529 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
2530 d7fd1f28 Michael Hanselmann
2531 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2532 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
2533 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
2534 20571a26 Guido Trotter
    # archived jobs to fix this.
2535 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
2536 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
2537 78d12585 Michael Hanselmann
2538 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2539 07cd723a Iustin Pop
  @_RequireOpenQueue
2540 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
2541 07cd723a Iustin Pop
    """Archives a job.
2542 07cd723a Iustin Pop

2543 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2544 ea03467c Iustin Pop

2545 76b62028 Iustin Pop
    @type job_id: int
2546 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
2547 78d12585 Michael Hanselmann
    @rtype: bool
2548 78d12585 Michael Hanselmann
    @return: Whether job was archived
2549 07cd723a Iustin Pop

2550 07cd723a Iustin Pop
    """
2551 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
2552 78d12585 Michael Hanselmann
2553 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2554 78d12585 Michael Hanselmann
    if not job:
2555 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2556 78d12585 Michael Hanselmann
      return False
2557 78d12585 Michael Hanselmann
2558 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
2559 07cd723a Iustin Pop
2560 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2561 07cd723a Iustin Pop
  @_RequireOpenQueue
2562 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
2563 07cd723a Iustin Pop
    """Archives all jobs based on age.
2564 07cd723a Iustin Pop

2565 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
2566 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
2567 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
2568 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
2569 07cd723a Iustin Pop

2570 07cd723a Iustin Pop
    @type age: int
2571 07cd723a Iustin Pop
    @param age: the minimum age in seconds
2572 07cd723a Iustin Pop

2573 07cd723a Iustin Pop
    """
2574 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
2575 07cd723a Iustin Pop
2576 07cd723a Iustin Pop
    now = time.time()
2577 f8ad5591 Michael Hanselmann
    end_time = now + timeout
2578 f8ad5591 Michael Hanselmann
    archived_count = 0
2579 f8ad5591 Michael Hanselmann
    last_touched = 0
2580 f8ad5591 Michael Hanselmann
2581 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
2582 d7fd1f28 Michael Hanselmann
    pending = []
2583 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
2584 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
2585 f8ad5591 Michael Hanselmann
2586 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
2587 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
2588 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
2589 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
2590 f8ad5591 Michael Hanselmann
        break
2591 f8ad5591 Michael Hanselmann
2592 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
2593 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
2594 f8ad5591 Michael Hanselmann
      if job:
2595 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
2596 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
2597 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
2598 f8ad5591 Michael Hanselmann
          else:
2599 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
2600 07cd723a Iustin Pop
        else:
2601 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
2602 f8ad5591 Michael Hanselmann
2603 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
2604 d7fd1f28 Michael Hanselmann
          pending.append(job)
2605 d7fd1f28 Michael Hanselmann
2606 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
2607 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
2608 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
2609 d7fd1f28 Michael Hanselmann
            pending = []
2610 f8ad5591 Michael Hanselmann
2611 d7fd1f28 Michael Hanselmann
    if pending:
2612 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
2613 07cd723a Iustin Pop
2614 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
2615 07cd723a Iustin Pop
2616 e07f7f7a Michael Hanselmann
  def _Query(self, fields, qfilter):
2617 e07f7f7a Michael Hanselmann
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2618 e07f7f7a Michael Hanselmann
                       namefield="id")
2619 e07f7f7a Michael Hanselmann
2620 0422250e Michael Hanselmann
    # Archived jobs are only looked at if the "archived" field is referenced
2621 0422250e Michael Hanselmann
    # either as a requested field or in the filter. By default archived jobs
2622 0422250e Michael Hanselmann
    # are ignored.
2623 0422250e Michael Hanselmann
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2624 0422250e Michael Hanselmann
2625 e07f7f7a Michael Hanselmann
    job_ids = qobj.RequestedNames()
2626 e07f7f7a Michael Hanselmann
2627 e07f7f7a Michael Hanselmann
    list_all = (job_ids is None)
2628 e07f7f7a Michael Hanselmann
2629 e07f7f7a Michael Hanselmann
    if list_all:
2630 e07f7f7a Michael Hanselmann
      # Since files are added to/removed from the queue atomically, there's no
2631 e07f7f7a Michael Hanselmann
      # risk of getting the job ids in an inconsistent state.
2632 0422250e Michael Hanselmann
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2633 e07f7f7a Michael Hanselmann
2634 e07f7f7a Michael Hanselmann
    jobs = []
2635 e07f7f7a Michael Hanselmann
2636 e07f7f7a Michael Hanselmann
    for job_id in job_ids:
2637 e07f7f7a Michael Hanselmann
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2638 e07f7f7a Michael Hanselmann
      if job is not None or not list_all:
2639 e07f7f7a Michael Hanselmann
        jobs.append((job_id, job))
2640 e07f7f7a Michael Hanselmann
2641 e07f7f7a Michael Hanselmann
    return (qobj, jobs, list_all)
2642 e07f7f7a Michael Hanselmann
2643 e07f7f7a Michael Hanselmann
  def QueryJobs(self, fields, qfilter):
2644 e07f7f7a Michael Hanselmann
    """Returns a list of jobs in queue.
2645 e07f7f7a Michael Hanselmann

2646 e07f7f7a Michael Hanselmann
    @type fields: sequence
2647 e07f7f7a Michael Hanselmann
    @param fields: List of wanted fields
2648 e07f7f7a Michael Hanselmann
    @type qfilter: None or query2 filter (list)
2649 e07f7f7a Michael Hanselmann
    @param qfilter: Query filter
2650 e07f7f7a Michael Hanselmann

2651 e07f7f7a Michael Hanselmann
    """
2652 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2653 e07f7f7a Michael Hanselmann
2654 76b62028 Iustin Pop
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2655 e07f7f7a Michael Hanselmann
2656 e07f7f7a Michael Hanselmann
  def OldStyleQueryJobs(self, job_ids, fields):
2657 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
2658 e2715f69 Michael Hanselmann

2659 ea03467c Iustin Pop
    @type job_ids: list
2660 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
2661 ea03467c Iustin Pop
    @type fields: list
2662 ea03467c Iustin Pop
    @param fields: names of fields to return
2663 ea03467c Iustin Pop
    @rtype: list
2664 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
2665 ea03467c Iustin Pop
        the requested fields
2666 e2715f69 Michael Hanselmann

2667 e2715f69 Michael Hanselmann
    """
2668 76b62028 Iustin Pop
    # backwards compat:
2669 76b62028 Iustin Pop
    job_ids = [int(jid) for jid in job_ids]
2670 e07f7f7a Michael Hanselmann
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2671 e2715f69 Michael Hanselmann
2672 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2673 e2715f69 Michael Hanselmann
2674 76b62028 Iustin Pop
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2675 e2715f69 Michael Hanselmann
2676 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2677 6d5ea385 Michael Hanselmann
  def PrepareShutdown(self):
2678 6d5ea385 Michael Hanselmann
    """Prepare to stop the job queue.
2679 6d5ea385 Michael Hanselmann

2680 6d5ea385 Michael Hanselmann
    Disables execution of jobs in the workerpool and returns whether there are
2681 6d5ea385 Michael Hanselmann
    any jobs currently running. If the latter is the case, the job queue is not
2682 6d5ea385 Michael Hanselmann
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2683 6d5ea385 Michael Hanselmann
    be called without interfering with any job. Queued and unfinished jobs will
2684 6d5ea385 Michael Hanselmann
    be resumed next time.
2685 6d5ea385 Michael Hanselmann

2686 6d5ea385 Michael Hanselmann
    Once this function has been called no new job submissions will be accepted
2687 6d5ea385 Michael Hanselmann
    (see L{_RequireNonDrainedQueue}).
2688 6d5ea385 Michael Hanselmann

2689 6d5ea385 Michael Hanselmann
    @rtype: bool
2690 6d5ea385 Michael Hanselmann
    @return: Whether there are any running jobs
2691 6d5ea385 Michael Hanselmann

2692 6d5ea385 Michael Hanselmann
    """
2693 6d5ea385 Michael Hanselmann
    if self._accepting_jobs:
2694 6d5ea385 Michael Hanselmann
      self._accepting_jobs = False
2695 6d5ea385 Michael Hanselmann
2696 6d5ea385 Michael Hanselmann
      # Tell worker pool to stop processing pending tasks
2697 6d5ea385 Michael Hanselmann
      self._wpool.SetActive(False)
2698 6d5ea385 Michael Hanselmann
2699 6d5ea385 Michael Hanselmann
    return self._wpool.HasRunningTasks()
2700 6d5ea385 Michael Hanselmann
2701 942e2262 Michael Hanselmann
  def AcceptingJobsUnlocked(self):
2702 942e2262 Michael Hanselmann
    """Returns whether jobs are accepted.
2703 942e2262 Michael Hanselmann

2704 942e2262 Michael Hanselmann
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2705 942e2262 Michael Hanselmann
    queue is shutting down.
2706 942e2262 Michael Hanselmann

2707 942e2262 Michael Hanselmann
    @rtype: bool
2708 942e2262 Michael Hanselmann

2709 942e2262 Michael Hanselmann
    """
2710 942e2262 Michael Hanselmann
    return self._accepting_jobs
2711 942e2262 Michael Hanselmann
2712 6d5ea385 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2713 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2714 e2715f69 Michael Hanselmann
  def Shutdown(self):
2715 e2715f69 Michael Hanselmann
    """Stops the job queue.
2716 e2715f69 Michael Hanselmann

2717 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
2718 ea03467c Iustin Pop

2719 e2715f69 Michael Hanselmann
    """
2720 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
2721 85f03e0d Michael Hanselmann
2722 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
2723 a71f9c7d Guido Trotter
    self._queue_filelock = None