Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ f0583b66

History | View | Annotate | Download (79.2 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 76b62028 Iustin Pop
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 e2715f69 Michael Hanselmann
import logging
33 f1da30e6 Michael Hanselmann
import errno
34 f1048938 Iustin Pop
import time
35 5685c1a5 Michael Hanselmann
import weakref
36 b95479a5 Michael Hanselmann
import threading
37 dfc8824a Michael Hanselmann
import itertools
38 99fb250b Michael Hanselmann
import operator
39 498ae1cc Iustin Pop
40 6c2549d6 Guido Trotter
try:
41 b459a848 Andrea Spadaccini
  # pylint: disable=E0611
42 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
43 6c2549d6 Guido Trotter
except ImportError:
44 6c2549d6 Guido Trotter
  import pyinotify
45 6c2549d6 Guido Trotter
46 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
47 e2715f69 Michael Hanselmann
from ganeti import constants
48 f1da30e6 Michael Hanselmann
from ganeti import serializer
49 e2715f69 Michael Hanselmann
from ganeti import workerpool
50 99bd4f0a Guido Trotter
from ganeti import locking
51 f1da30e6 Michael Hanselmann
from ganeti import opcodes
52 7a1ecaed Iustin Pop
from ganeti import errors
53 e2715f69 Michael Hanselmann
from ganeti import mcpu
54 7996a135 Iustin Pop
from ganeti import utils
55 04ab05ce Michael Hanselmann
from ganeti import jstore
56 c3f0a12f Iustin Pop
from ganeti import rpc
57 82b22e19 René Nussbaumer
from ganeti import runtime
58 a744b676 Manuel Franceschini
from ganeti import netutils
59 989a8bee Michael Hanselmann
from ganeti import compat
60 b95479a5 Michael Hanselmann
from ganeti import ht
61 a06c6ae8 Michael Hanselmann
from ganeti import query
62 a06c6ae8 Michael Hanselmann
from ganeti import qlang
63 e2b4a7ba Michael Hanselmann
from ganeti import pathutils
64 cffbbae7 Michael Hanselmann
from ganeti import vcluster
65 e2715f69 Michael Hanselmann
66 fbf0262f Michael Hanselmann
67 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
68 e2715f69 Michael Hanselmann
69 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
70 ebb80afa Guido Trotter
_LOCK = "_lock"
71 ebb80afa Guido Trotter
_QUEUE = "_queue"
72 99bd4f0a Guido Trotter
73 99fb250b Michael Hanselmann
#: Retrieves "id" attribute
74 99fb250b Michael Hanselmann
_GetIdAttr = operator.attrgetter("id")
75 99fb250b Michael Hanselmann
76 498ae1cc Iustin Pop
77 9728ae5d Iustin Pop
class CancelJob(Exception):
78 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
79 fbf0262f Michael Hanselmann

80 fbf0262f Michael Hanselmann
  """
81 fbf0262f Michael Hanselmann
82 fbf0262f Michael Hanselmann
83 942e2262 Michael Hanselmann
class QueueShutdown(Exception):
84 942e2262 Michael Hanselmann
  """Special exception to abort a job when the job queue is shutting down.
85 942e2262 Michael Hanselmann

86 942e2262 Michael Hanselmann
  """
87 942e2262 Michael Hanselmann
88 942e2262 Michael Hanselmann
89 70552c46 Michael Hanselmann
def TimeStampNow():
90 ea03467c Iustin Pop
  """Returns the current timestamp.
91 ea03467c Iustin Pop

92 ea03467c Iustin Pop
  @rtype: tuple
93 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
94 ea03467c Iustin Pop

95 ea03467c Iustin Pop
  """
96 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
97 70552c46 Michael Hanselmann
98 70552c46 Michael Hanselmann
99 cffbbae7 Michael Hanselmann
def _CallJqUpdate(runner, names, file_name, content):
100 cffbbae7 Michael Hanselmann
  """Updates job queue file after virtualizing filename.
101 cffbbae7 Michael Hanselmann

102 cffbbae7 Michael Hanselmann
  """
103 cffbbae7 Michael Hanselmann
  virt_file_name = vcluster.MakeVirtualPath(file_name)
104 cffbbae7 Michael Hanselmann
  return runner.call_jobqueue_update(names, virt_file_name, content)
105 cffbbae7 Michael Hanselmann
106 cffbbae7 Michael Hanselmann
107 a06c6ae8 Michael Hanselmann
class _SimpleJobQuery:
108 a06c6ae8 Michael Hanselmann
  """Wrapper for job queries.
109 a06c6ae8 Michael Hanselmann

110 a06c6ae8 Michael Hanselmann
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
111 a06c6ae8 Michael Hanselmann

112 a06c6ae8 Michael Hanselmann
  """
113 a06c6ae8 Michael Hanselmann
  def __init__(self, fields):
114 a06c6ae8 Michael Hanselmann
    """Initializes this class.
115 a06c6ae8 Michael Hanselmann

116 a06c6ae8 Michael Hanselmann
    """
117 a06c6ae8 Michael Hanselmann
    self._query = query.Query(query.JOB_FIELDS, fields)
118 a06c6ae8 Michael Hanselmann
119 a06c6ae8 Michael Hanselmann
  def __call__(self, job):
120 a06c6ae8 Michael Hanselmann
    """Executes a job query using cached field list.
121 a06c6ae8 Michael Hanselmann

122 a06c6ae8 Michael Hanselmann
    """
123 a06c6ae8 Michael Hanselmann
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
124 a06c6ae8 Michael Hanselmann
125 a06c6ae8 Michael Hanselmann
126 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
127 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
128 e2715f69 Michael Hanselmann

129 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
130 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
131 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
132 ea03467c Iustin Pop
  @ivar status: the current status
133 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
134 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
135 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
137 f1048938 Iustin Pop

138 e2715f69 Michael Hanselmann
  """
139 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
140 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
141 66d895a8 Iustin Pop
               "__weakref__"]
142 66d895a8 Iustin Pop
143 85f03e0d Michael Hanselmann
  def __init__(self, op):
144 66abb9ff Michael Hanselmann
    """Initializes instances of this class.
145 ea03467c Iustin Pop

146 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
147 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
148 ea03467c Iustin Pop

149 ea03467c Iustin Pop
    """
150 85f03e0d Michael Hanselmann
    self.input = op
151 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
152 85f03e0d Michael Hanselmann
    self.result = None
153 85f03e0d Michael Hanselmann
    self.log = []
154 70552c46 Michael Hanselmann
    self.start_timestamp = None
155 b9b5abcb Iustin Pop
    self.exec_timestamp = None
156 70552c46 Michael Hanselmann
    self.end_timestamp = None
157 f1da30e6 Michael Hanselmann
158 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
159 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
160 8f5c488d Michael Hanselmann
161 f1da30e6 Michael Hanselmann
  @classmethod
162 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
163 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
164 ea03467c Iustin Pop

165 ea03467c Iustin Pop
    @type state: dict
166 ea03467c Iustin Pop
    @param state: the serialized state
167 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
168 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
169 ea03467c Iustin Pop

170 ea03467c Iustin Pop
    """
171 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
172 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173 85f03e0d Michael Hanselmann
    obj.status = state["status"]
174 85f03e0d Michael Hanselmann
    obj.result = state["result"]
175 85f03e0d Michael Hanselmann
    obj.log = state["log"]
176 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
177 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
178 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
179 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
180 f1da30e6 Michael Hanselmann
    return obj
181 f1da30e6 Michael Hanselmann
182 f1da30e6 Michael Hanselmann
  def Serialize(self):
183 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
184 ea03467c Iustin Pop

185 ea03467c Iustin Pop
    @rtype: dict
186 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
187 ea03467c Iustin Pop

188 ea03467c Iustin Pop
    """
189 6c5a7090 Michael Hanselmann
    return {
190 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
191 6c5a7090 Michael Hanselmann
      "status": self.status,
192 6c5a7090 Michael Hanselmann
      "result": self.result,
193 6c5a7090 Michael Hanselmann
      "log": self.log,
194 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
195 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
196 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
197 8f5c488d Michael Hanselmann
      "priority": self.priority,
198 6c5a7090 Michael Hanselmann
      }
199 f1048938 Iustin Pop
200 e2715f69 Michael Hanselmann
201 e2715f69 Michael Hanselmann
class _QueuedJob(object):
202 e2715f69 Michael Hanselmann
  """In-memory job representation.
203 e2715f69 Michael Hanselmann

204 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
205 ea03467c Iustin Pop
  be taken care of by users of this class.
206 ea03467c Iustin Pop

207 ea03467c Iustin Pop
  @type queue: L{JobQueue}
208 ea03467c Iustin Pop
  @ivar queue: the parent queue
209 ea03467c Iustin Pop
  @ivar id: the job ID
210 ea03467c Iustin Pop
  @type ops: list
211 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
212 ea03467c Iustin Pop
  @type log_serial: int
213 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
214 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
215 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
216 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
217 c0f6d0d8 Michael Hanselmann
  @ivar writable: Whether the job is allowed to be modified
218 e2715f69 Michael Hanselmann

219 e2715f69 Michael Hanselmann
  """
220 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
221 26d3fd2f Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
223 8a3cd185 Michael Hanselmann
               "__weakref__", "processor_lock", "writable", "archived"]
224 66d895a8 Iustin Pop
225 c0f6d0d8 Michael Hanselmann
  def __init__(self, queue, job_id, ops, writable):
226 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
227 ea03467c Iustin Pop

228 ea03467c Iustin Pop
    @type queue: L{JobQueue}
229 ea03467c Iustin Pop
    @param queue: our parent queue
230 ea03467c Iustin Pop
    @type job_id: job_id
231 ea03467c Iustin Pop
    @param job_id: our job id
232 ea03467c Iustin Pop
    @type ops: list
233 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
234 ea03467c Iustin Pop
        in _QueuedOpCodes
235 c0f6d0d8 Michael Hanselmann
    @type writable: bool
236 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
237 ea03467c Iustin Pop

238 ea03467c Iustin Pop
    """
239 e2715f69 Michael Hanselmann
    if not ops:
240 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
241 e2715f69 Michael Hanselmann
242 85f03e0d Michael Hanselmann
    self.queue = queue
243 76b62028 Iustin Pop
    self.id = int(job_id)
244 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
245 6c5a7090 Michael Hanselmann
    self.log_serial = 0
246 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
247 c56ec146 Iustin Pop
    self.start_timestamp = None
248 c56ec146 Iustin Pop
    self.end_timestamp = None
249 8a3cd185 Michael Hanselmann
    self.archived = False
250 6c5a7090 Michael Hanselmann
251 c0f6d0d8 Michael Hanselmann
    self._InitInMemory(self, writable)
252 fa4aa6b4 Michael Hanselmann
253 8a3cd185 Michael Hanselmann
    assert not self.archived, "New jobs can not be marked as archived"
254 8a3cd185 Michael Hanselmann
255 fa4aa6b4 Michael Hanselmann
  @staticmethod
256 c0f6d0d8 Michael Hanselmann
  def _InitInMemory(obj, writable):
257 fa4aa6b4 Michael Hanselmann
    """Initializes in-memory variables.
258 fa4aa6b4 Michael Hanselmann

259 fa4aa6b4 Michael Hanselmann
    """
260 c0f6d0d8 Michael Hanselmann
    obj.writable = writable
261 03b63608 Michael Hanselmann
    obj.ops_iter = None
262 26d3fd2f Michael Hanselmann
    obj.cur_opctx = None
263 f8a4adfa Michael Hanselmann
264 f8a4adfa Michael Hanselmann
    # Read-only jobs are not processed and therefore don't need a lock
265 f8a4adfa Michael Hanselmann
    if writable:
266 f8a4adfa Michael Hanselmann
      obj.processor_lock = threading.Lock()
267 f8a4adfa Michael Hanselmann
    else:
268 f8a4adfa Michael Hanselmann
      obj.processor_lock = None
269 be760ba8 Michael Hanselmann
270 9fa2e150 Michael Hanselmann
  def __repr__(self):
271 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
272 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
273 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
274 9fa2e150 Michael Hanselmann
275 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
276 9fa2e150 Michael Hanselmann
277 f1da30e6 Michael Hanselmann
  @classmethod
278 8a3cd185 Michael Hanselmann
  def Restore(cls, queue, state, writable, archived):
279 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
280 ea03467c Iustin Pop

281 ea03467c Iustin Pop
    @type queue: L{JobQueue}
282 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
283 ea03467c Iustin Pop
    @type state: dict
284 ea03467c Iustin Pop
    @param state: the serialized state
285 c0f6d0d8 Michael Hanselmann
    @type writable: bool
286 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
287 8a3cd185 Michael Hanselmann
    @type archived: bool
288 8a3cd185 Michael Hanselmann
    @param archived: Whether job was already archived
289 ea03467c Iustin Pop
    @rtype: _JobQueue
290 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
291 ea03467c Iustin Pop

292 ea03467c Iustin Pop
    """
293 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
294 85f03e0d Michael Hanselmann
    obj.queue = queue
295 76b62028 Iustin Pop
    obj.id = int(state["id"])
296 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
297 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
298 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
299 8a3cd185 Michael Hanselmann
    obj.archived = archived
300 6c5a7090 Michael Hanselmann
301 6c5a7090 Michael Hanselmann
    obj.ops = []
302 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
303 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
304 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
305 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
306 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
307 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
308 6c5a7090 Michael Hanselmann
309 c0f6d0d8 Michael Hanselmann
    cls._InitInMemory(obj, writable)
310 be760ba8 Michael Hanselmann
311 f1da30e6 Michael Hanselmann
    return obj
312 f1da30e6 Michael Hanselmann
313 f1da30e6 Michael Hanselmann
  def Serialize(self):
314 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
315 ea03467c Iustin Pop

316 ea03467c Iustin Pop
    @rtype: dict
317 ea03467c Iustin Pop
    @return: the serialized state
318 ea03467c Iustin Pop

319 ea03467c Iustin Pop
    """
320 f1da30e6 Michael Hanselmann
    return {
321 f1da30e6 Michael Hanselmann
      "id": self.id,
322 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
323 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
324 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
325 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
326 f1da30e6 Michael Hanselmann
      }
327 f1da30e6 Michael Hanselmann
328 85f03e0d Michael Hanselmann
  def CalcStatus(self):
329 ea03467c Iustin Pop
    """Compute the status of this job.
330 ea03467c Iustin Pop

331 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
332 ea03467c Iustin Pop
    based on their status, computes the job status.
333 ea03467c Iustin Pop

334 ea03467c Iustin Pop
    The algorithm is:
335 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
336 ea03467c Iustin Pop
        status will be the same
337 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
338 ea03467c Iustin Pop
          - waitlock
339 fbf0262f Michael Hanselmann
          - canceling
340 ea03467c Iustin Pop
          - running
341 ea03467c Iustin Pop

342 ea03467c Iustin Pop
        will determine the job status
343 ea03467c Iustin Pop

344 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
345 ea03467c Iustin Pop
        and the job status will be the same
346 ea03467c Iustin Pop

347 ea03467c Iustin Pop
    @return: the job status
348 ea03467c Iustin Pop

349 ea03467c Iustin Pop
    """
350 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
351 e2715f69 Michael Hanselmann
352 e2715f69 Michael Hanselmann
    all_success = True
353 85f03e0d Michael Hanselmann
    for op in self.ops:
354 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
355 e2715f69 Michael Hanselmann
        continue
356 e2715f69 Michael Hanselmann
357 e2715f69 Michael Hanselmann
      all_success = False
358 e2715f69 Michael Hanselmann
359 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
360 e2715f69 Michael Hanselmann
        pass
361 47099cd1 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING:
362 47099cd1 Michael Hanselmann
        status = constants.JOB_STATUS_WAITING
363 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
364 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
365 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
366 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
367 fbf0262f Michael Hanselmann
        break
368 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
369 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
370 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
371 f1da30e6 Michael Hanselmann
        break
372 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
373 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
374 4cb1d919 Michael Hanselmann
        break
375 e2715f69 Michael Hanselmann
376 e2715f69 Michael Hanselmann
    if all_success:
377 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
378 e2715f69 Michael Hanselmann
379 e2715f69 Michael Hanselmann
    return status
380 e2715f69 Michael Hanselmann
381 8f5c488d Michael Hanselmann
  def CalcPriority(self):
382 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
383 8f5c488d Michael Hanselmann

384 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
385 8f5c488d Michael Hanselmann
    priority is used.
386 8f5c488d Michael Hanselmann

387 8f5c488d Michael Hanselmann
    @rtype: int
388 8f5c488d Michael Hanselmann

389 8f5c488d Michael Hanselmann
    """
390 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
391 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
392 8f5c488d Michael Hanselmann
393 8f5c488d Michael Hanselmann
    if not priorities:
394 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
395 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
396 8f5c488d Michael Hanselmann
397 8f5c488d Michael Hanselmann
    return min(priorities)
398 8f5c488d Michael Hanselmann
399 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
400 ea03467c Iustin Pop
    """Selectively returns the log entries.
401 ea03467c Iustin Pop

402 ea03467c Iustin Pop
    @type newer_than: None or int
403 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
404 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
405 ea03467c Iustin Pop
        than this value
406 ea03467c Iustin Pop
    @rtype: list
407 ea03467c Iustin Pop
    @return: the list of the log entries selected
408 ea03467c Iustin Pop

409 ea03467c Iustin Pop
    """
410 6c5a7090 Michael Hanselmann
    if newer_than is None:
411 6c5a7090 Michael Hanselmann
      serial = -1
412 6c5a7090 Michael Hanselmann
    else:
413 6c5a7090 Michael Hanselmann
      serial = newer_than
414 6c5a7090 Michael Hanselmann
415 6c5a7090 Michael Hanselmann
    entries = []
416 6c5a7090 Michael Hanselmann
    for op in self.ops:
417 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
418 6c5a7090 Michael Hanselmann
419 6c5a7090 Michael Hanselmann
    return entries
420 6c5a7090 Michael Hanselmann
421 6a290889 Guido Trotter
  def GetInfo(self, fields):
422 6a290889 Guido Trotter
    """Returns information about a job.
423 6a290889 Guido Trotter

424 6a290889 Guido Trotter
    @type fields: list
425 6a290889 Guido Trotter
    @param fields: names of fields to return
426 6a290889 Guido Trotter
    @rtype: list
427 6a290889 Guido Trotter
    @return: list with one element for each field
428 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
429 6a290889 Guido Trotter
        has been passed
430 6a290889 Guido Trotter

431 6a290889 Guido Trotter
    """
432 a06c6ae8 Michael Hanselmann
    return _SimpleJobQuery(fields)(self)
433 6a290889 Guido Trotter
434 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
435 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
436 34327f51 Iustin Pop

437 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
438 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
439 34327f51 Iustin Pop
    finalised are not changed.
440 34327f51 Iustin Pop

441 34327f51 Iustin Pop
    @param status: a given opcode status
442 34327f51 Iustin Pop
    @param result: the opcode result
443 34327f51 Iustin Pop

444 34327f51 Iustin Pop
    """
445 747f6113 Michael Hanselmann
    not_marked = True
446 747f6113 Michael Hanselmann
    for op in self.ops:
447 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
448 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
449 747f6113 Michael Hanselmann
        continue
450 747f6113 Michael Hanselmann
      op.status = status
451 747f6113 Michael Hanselmann
      op.result = result
452 747f6113 Michael Hanselmann
      not_marked = False
453 34327f51 Iustin Pop
454 66bd7445 Michael Hanselmann
  def Finalize(self):
455 66bd7445 Michael Hanselmann
    """Marks the job as finalized.
456 66bd7445 Michael Hanselmann

457 66bd7445 Michael Hanselmann
    """
458 66bd7445 Michael Hanselmann
    self.end_timestamp = TimeStampNow()
459 66bd7445 Michael Hanselmann
460 099b2870 Michael Hanselmann
  def Cancel(self):
461 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
462 a0d2fe2c Michael Hanselmann

463 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
464 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
465 a0d2fe2c Michael Hanselmann
      as canceling and a text message
466 a0d2fe2c Michael Hanselmann

467 a0d2fe2c Michael Hanselmann
    """
468 099b2870 Michael Hanselmann
    status = self.CalcStatus()
469 099b2870 Michael Hanselmann
470 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
471 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
472 099b2870 Michael Hanselmann
                             "Job canceled by request")
473 66bd7445 Michael Hanselmann
      self.Finalize()
474 86b16e9d Michael Hanselmann
      return (True, "Job %s canceled" % self.id)
475 099b2870 Michael Hanselmann
476 47099cd1 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITING:
477 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
478 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
479 86b16e9d Michael Hanselmann
      return (True, "Job %s will be canceled" % self.id)
480 099b2870 Michael Hanselmann
481 86b16e9d Michael Hanselmann
    else:
482 86b16e9d Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
483 86b16e9d Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
484 099b2870 Michael Hanselmann
485 4679547e Michael Hanselmann
  def ChangePriority(self, priority):
486 4679547e Michael Hanselmann
    """Changes the job priority.
487 4679547e Michael Hanselmann

488 4679547e Michael Hanselmann
    @type priority: int
489 4679547e Michael Hanselmann
    @param priority: New priority
490 4679547e Michael Hanselmann
    @rtype: tuple; (bool, string)
491 4679547e Michael Hanselmann
    @return: Boolean describing whether job's priority was successfully changed
492 4679547e Michael Hanselmann
      and a text message
493 4679547e Michael Hanselmann

494 4679547e Michael Hanselmann
    """
495 4679547e Michael Hanselmann
    status = self.CalcStatus()
496 4679547e Michael Hanselmann
497 4679547e Michael Hanselmann
    if status in constants.JOBS_FINALIZED:
498 4679547e Michael Hanselmann
      return (False, "Job %s is finished" % self.id)
499 4679547e Michael Hanselmann
    elif status == constants.JOB_STATUS_CANCELING:
500 4679547e Michael Hanselmann
      return (False, "Job %s is cancelling" % self.id)
501 4679547e Michael Hanselmann
    else:
502 4679547e Michael Hanselmann
      assert status in (constants.JOB_STATUS_QUEUED,
503 4679547e Michael Hanselmann
                        constants.JOB_STATUS_WAITING,
504 4679547e Michael Hanselmann
                        constants.JOB_STATUS_RUNNING)
505 4679547e Michael Hanselmann
506 4679547e Michael Hanselmann
      changed = False
507 4679547e Michael Hanselmann
      for op in self.ops:
508 4679547e Michael Hanselmann
        if (op.status == constants.OP_STATUS_RUNNING or
509 4679547e Michael Hanselmann
            op.status in constants.OPS_FINALIZED):
510 4679547e Michael Hanselmann
          assert not changed, \
511 4679547e Michael Hanselmann
            ("Found opcode for which priority should not be changed after"
512 4679547e Michael Hanselmann
             " priority has been changed for previous opcodes")
513 4679547e Michael Hanselmann
          continue
514 4679547e Michael Hanselmann
515 4679547e Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
516 4679547e Michael Hanselmann
                             constants.OP_STATUS_WAITING)
517 4679547e Michael Hanselmann
518 4679547e Michael Hanselmann
        changed = True
519 4679547e Michael Hanselmann
520 3c631ea2 Michael Hanselmann
        # Set new priority (doesn't modify opcode input)
521 4679547e Michael Hanselmann
        op.priority = priority
522 4679547e Michael Hanselmann
523 4679547e Michael Hanselmann
      if changed:
524 4679547e Michael Hanselmann
        return (True, ("Priorities of pending opcodes for job %s have been"
525 4679547e Michael Hanselmann
                       " changed to %s" % (self.id, priority)))
526 4679547e Michael Hanselmann
      else:
527 4679547e Michael Hanselmann
        return (False, "Job %s had no pending opcodes" % self.id)
528 4679547e Michael Hanselmann
529 f1048938 Iustin Pop
530 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
531 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
532 031a3e57 Michael Hanselmann
    """Initializes this class.
533 ea03467c Iustin Pop

534 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
535 031a3e57 Michael Hanselmann
    @param queue: Job queue
536 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
537 031a3e57 Michael Hanselmann
    @param job: Job object
538 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
539 031a3e57 Michael Hanselmann
    @param op: OpCode
540 031a3e57 Michael Hanselmann

541 031a3e57 Michael Hanselmann
    """
542 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
543 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
544 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
545 031a3e57 Michael Hanselmann
546 031a3e57 Michael Hanselmann
    self._queue = queue
547 031a3e57 Michael Hanselmann
    self._job = job
548 031a3e57 Michael Hanselmann
    self._op = op
549 031a3e57 Michael Hanselmann
550 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
551 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
552 dc1e2262 Michael Hanselmann

553 dc1e2262 Michael Hanselmann
    """
554 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
555 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
556 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
557 dc1e2262 Michael Hanselmann
      raise CancelJob()
558 dc1e2262 Michael Hanselmann
559 942e2262 Michael Hanselmann
    # See if queue is shutting down
560 942e2262 Michael Hanselmann
    if not self._queue.AcceptingJobsUnlocked():
561 942e2262 Michael Hanselmann
      logging.debug("Queue is shutting down")
562 942e2262 Michael Hanselmann
      raise QueueShutdown()
563 942e2262 Michael Hanselmann
564 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
565 031a3e57 Michael Hanselmann
  def NotifyStart(self):
566 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
567 e92376d7 Iustin Pop

568 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
569 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
570 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
571 47099cd1 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
572 e92376d7 Iustin Pop

573 e92376d7 Iustin Pop
    """
574 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
575 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
576 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
577 fbf0262f Michael Hanselmann
578 271daef8 Iustin Pop
    # Cancel here if we were asked to
579 dc1e2262 Michael Hanselmann
    self._CheckCancel()
580 fbf0262f Michael Hanselmann
581 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
582 9bdab621 Michael Hanselmann
583 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
584 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
585 271daef8 Iustin Pop
586 271daef8 Iustin Pop
    # And finally replicate the job status
587 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
588 031a3e57 Michael Hanselmann
589 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
590 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
591 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
592 9bf5e01f Guido Trotter

593 9bf5e01f Guido Trotter
    """
594 9bf5e01f Guido Trotter
    self._job.log_serial += 1
595 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
596 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
597 9bf5e01f Guido Trotter
598 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
599 031a3e57 Michael Hanselmann
    """Append a log entry.
600 031a3e57 Michael Hanselmann

601 031a3e57 Michael Hanselmann
    """
602 031a3e57 Michael Hanselmann
    assert len(args) < 3
603 031a3e57 Michael Hanselmann
604 031a3e57 Michael Hanselmann
    if len(args) == 1:
605 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
606 031a3e57 Michael Hanselmann
      log_msg = args[0]
607 031a3e57 Michael Hanselmann
    else:
608 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
609 031a3e57 Michael Hanselmann
610 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
611 031a3e57 Michael Hanselmann
    # precision.
612 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
613 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
614 031a3e57 Michael Hanselmann
615 e4e59de8 Michael Hanselmann
  def CurrentPriority(self):
616 e4e59de8 Michael Hanselmann
    """Returns current priority for opcode.
617 ef2df7d3 Michael Hanselmann

618 ef2df7d3 Michael Hanselmann
    """
619 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
620 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
621 dc1e2262 Michael Hanselmann
622 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
623 dc1e2262 Michael Hanselmann
    self._CheckCancel()
624 dc1e2262 Michael Hanselmann
625 e4e59de8 Michael Hanselmann
    return self._op.priority
626 e4e59de8 Michael Hanselmann
627 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
628 6a373640 Michael Hanselmann
    """Submits jobs for processing.
629 6a373640 Michael Hanselmann

630 6a373640 Michael Hanselmann
    See L{JobQueue.SubmitManyJobs}.
631 6a373640 Michael Hanselmann

632 6a373640 Michael Hanselmann
    """
633 6a373640 Michael Hanselmann
    # Locking is done in job queue
634 6a373640 Michael Hanselmann
    return self._queue.SubmitManyJobs(jobs)
635 6a373640 Michael Hanselmann
636 031a3e57 Michael Hanselmann
637 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
638 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
639 989a8bee Michael Hanselmann
    """Initializes this class.
640 6c2549d6 Guido Trotter

641 989a8bee Michael Hanselmann
    @type fields: list of strings
642 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
643 989a8bee Michael Hanselmann
    @type prev_job_info: string
644 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
645 989a8bee Michael Hanselmann
    @type prev_log_serial: string
646 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
647 6c2549d6 Guido Trotter

648 989a8bee Michael Hanselmann
    """
649 dc2879ea Michael Hanselmann
    self._squery = _SimpleJobQuery(fields)
650 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
651 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
652 6c2549d6 Guido Trotter
653 989a8bee Michael Hanselmann
  def __call__(self, job):
654 989a8bee Michael Hanselmann
    """Checks whether job has changed.
655 6c2549d6 Guido Trotter

656 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
657 989a8bee Michael Hanselmann
    @param job: Job object
658 6c2549d6 Guido Trotter

659 6c2549d6 Guido Trotter
    """
660 c0f6d0d8 Michael Hanselmann
    assert not job.writable, "Expected read-only job"
661 c0f6d0d8 Michael Hanselmann
662 989a8bee Michael Hanselmann
    status = job.CalcStatus()
663 dc2879ea Michael Hanselmann
    job_info = self._squery(job)
664 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
665 6c2549d6 Guido Trotter
666 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
667 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
668 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
669 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
670 6c2549d6 Guido Trotter
    # significantly different.
671 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
672 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
673 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
674 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
675 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
676 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
677 6c2549d6 Guido Trotter
678 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
679 6c2549d6 Guido Trotter
    # no changes.
680 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
681 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
682 47099cd1 Michael Hanselmann
                       constants.JOB_STATUS_WAITING) or
683 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
684 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
685 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
686 989a8bee Michael Hanselmann
      return (job_info, log_entries)
687 6c2549d6 Guido Trotter
688 989a8bee Michael Hanselmann
    return None
689 989a8bee Michael Hanselmann
690 989a8bee Michael Hanselmann
691 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
692 383477e9 Michael Hanselmann
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
693 989a8bee Michael Hanselmann
    """Initializes this class.
694 989a8bee Michael Hanselmann

695 989a8bee Michael Hanselmann
    @type filename: string
696 989a8bee Michael Hanselmann
    @param filename: Path to job file
697 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
698 6c2549d6 Guido Trotter

699 989a8bee Michael Hanselmann
    """
700 383477e9 Michael Hanselmann
    self._wm = _inotify_wm_cls()
701 989a8bee Michael Hanselmann
    self._inotify_handler = \
702 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
703 989a8bee Michael Hanselmann
    self._notifier = \
704 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
705 989a8bee Michael Hanselmann
    try:
706 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
707 989a8bee Michael Hanselmann
    except Exception:
708 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
709 989a8bee Michael Hanselmann
      self._notifier.stop()
710 989a8bee Michael Hanselmann
      raise
711 989a8bee Michael Hanselmann
712 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
713 989a8bee Michael Hanselmann
    """Callback for inotify.
714 989a8bee Michael Hanselmann

715 989a8bee Michael Hanselmann
    """
716 6c2549d6 Guido Trotter
    if not notifier_enabled:
717 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
718 989a8bee Michael Hanselmann
719 989a8bee Michael Hanselmann
  def Wait(self, timeout):
720 989a8bee Michael Hanselmann
    """Waits for the job file to change.
721 989a8bee Michael Hanselmann

722 989a8bee Michael Hanselmann
    @type timeout: float
723 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
724 989a8bee Michael Hanselmann
    @return: Whether there have been events
725 989a8bee Michael Hanselmann

726 989a8bee Michael Hanselmann
    """
727 989a8bee Michael Hanselmann
    assert timeout >= 0
728 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
729 989a8bee Michael Hanselmann
    if have_events:
730 989a8bee Michael Hanselmann
      self._notifier.read_events()
731 989a8bee Michael Hanselmann
    self._notifier.process_events()
732 989a8bee Michael Hanselmann
    return have_events
733 989a8bee Michael Hanselmann
734 989a8bee Michael Hanselmann
  def Close(self):
735 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
736 989a8bee Michael Hanselmann

737 989a8bee Michael Hanselmann
    """
738 989a8bee Michael Hanselmann
    self._notifier.stop()
739 989a8bee Michael Hanselmann
740 989a8bee Michael Hanselmann
741 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
742 383477e9 Michael Hanselmann
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
743 989a8bee Michael Hanselmann
    """Initializes this class.
744 989a8bee Michael Hanselmann

745 989a8bee Michael Hanselmann
    @type filename: string
746 989a8bee Michael Hanselmann
    @param filename: Path to job file
747 989a8bee Michael Hanselmann

748 989a8bee Michael Hanselmann
    """
749 989a8bee Michael Hanselmann
    self._filewaiter = None
750 989a8bee Michael Hanselmann
    self._filename = filename
751 383477e9 Michael Hanselmann
    self._waiter_cls = _waiter_cls
752 6c2549d6 Guido Trotter
753 989a8bee Michael Hanselmann
  def Wait(self, timeout):
754 989a8bee Michael Hanselmann
    """Waits for a job to change.
755 6c2549d6 Guido Trotter

756 989a8bee Michael Hanselmann
    @type timeout: float
757 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
758 989a8bee Michael Hanselmann
    @return: Whether there have been events
759 989a8bee Michael Hanselmann

760 989a8bee Michael Hanselmann
    """
761 989a8bee Michael Hanselmann
    if self._filewaiter:
762 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
763 989a8bee Michael Hanselmann
764 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
765 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
766 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
767 989a8bee Michael Hanselmann
    # race condition.
768 383477e9 Michael Hanselmann
    self._filewaiter = self._waiter_cls(self._filename)
769 989a8bee Michael Hanselmann
770 989a8bee Michael Hanselmann
    return True
771 989a8bee Michael Hanselmann
772 989a8bee Michael Hanselmann
  def Close(self):
773 989a8bee Michael Hanselmann
    """Closes underlying waiter.
774 989a8bee Michael Hanselmann

775 989a8bee Michael Hanselmann
    """
776 989a8bee Michael Hanselmann
    if self._filewaiter:
777 989a8bee Michael Hanselmann
      self._filewaiter.Close()
778 989a8bee Michael Hanselmann
779 989a8bee Michael Hanselmann
780 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
781 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
782 989a8bee Michael Hanselmann

783 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
784 989a8bee Michael Hanselmann
  the current job status has changed.
785 989a8bee Michael Hanselmann

786 989a8bee Michael Hanselmann
  """
787 989a8bee Michael Hanselmann
  @staticmethod
788 dfc8824a Michael Hanselmann
  def _CheckForChanges(counter, job_load_fn, check_fn):
789 dfc8824a Michael Hanselmann
    if counter.next() > 0:
790 dfc8824a Michael Hanselmann
      # If this isn't the first check the job is given some more time to change
791 dfc8824a Michael Hanselmann
      # again. This gives better performance for jobs generating many
792 dfc8824a Michael Hanselmann
      # changes/messages.
793 dfc8824a Michael Hanselmann
      time.sleep(0.1)
794 dfc8824a Michael Hanselmann
795 989a8bee Michael Hanselmann
    job = job_load_fn()
796 989a8bee Michael Hanselmann
    if not job:
797 989a8bee Michael Hanselmann
      raise errors.JobLost()
798 989a8bee Michael Hanselmann
799 989a8bee Michael Hanselmann
    result = check_fn(job)
800 989a8bee Michael Hanselmann
    if result is None:
801 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
802 989a8bee Michael Hanselmann
803 989a8bee Michael Hanselmann
    return result
804 989a8bee Michael Hanselmann
805 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
806 383477e9 Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout,
807 383477e9 Michael Hanselmann
               _waiter_cls=_JobChangesWaiter):
808 989a8bee Michael Hanselmann
    """Waits for changes on a job.
809 989a8bee Michael Hanselmann

810 989a8bee Michael Hanselmann
    @type filename: string
811 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
812 989a8bee Michael Hanselmann
    @type job_load_fn: callable
813 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
814 989a8bee Michael Hanselmann
    @type fields: list of strings
815 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
816 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
817 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
818 989a8bee Michael Hanselmann
    @type prev_log_serial: int
819 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
820 989a8bee Michael Hanselmann
    @type timeout: float
821 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
822 989a8bee Michael Hanselmann

823 989a8bee Michael Hanselmann
    """
824 dfc8824a Michael Hanselmann
    counter = itertools.count()
825 6c2549d6 Guido Trotter
    try:
826 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
827 383477e9 Michael Hanselmann
      waiter = _waiter_cls(filename)
828 989a8bee Michael Hanselmann
      try:
829 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
830 dfc8824a Michael Hanselmann
                                          counter, job_load_fn, check_fn),
831 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
832 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
833 989a8bee Michael Hanselmann
      finally:
834 989a8bee Michael Hanselmann
        waiter.Close()
835 383477e9 Michael Hanselmann
    except errors.JobLost:
836 6c2549d6 Guido Trotter
      return None
837 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
838 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
839 6c2549d6 Guido Trotter
840 6c2549d6 Guido Trotter
841 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
842 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
843 6760e4ed Michael Hanselmann

844 6760e4ed Michael Hanselmann
  """
845 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
846 6760e4ed Michael Hanselmann
    to_encode = err
847 6760e4ed Michael Hanselmann
  else:
848 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
849 6760e4ed Michael Hanselmann
850 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
851 6760e4ed Michael Hanselmann
852 6760e4ed Michael Hanselmann
853 26d3fd2f Michael Hanselmann
class _TimeoutStrategyWrapper:
854 26d3fd2f Michael Hanselmann
  def __init__(self, fn):
855 26d3fd2f Michael Hanselmann
    """Initializes this class.
856 26d3fd2f Michael Hanselmann

857 26d3fd2f Michael Hanselmann
    """
858 26d3fd2f Michael Hanselmann
    self._fn = fn
859 26d3fd2f Michael Hanselmann
    self._next = None
860 26d3fd2f Michael Hanselmann
861 26d3fd2f Michael Hanselmann
  def _Advance(self):
862 26d3fd2f Michael Hanselmann
    """Gets the next timeout if necessary.
863 26d3fd2f Michael Hanselmann

864 26d3fd2f Michael Hanselmann
    """
865 26d3fd2f Michael Hanselmann
    if self._next is None:
866 26d3fd2f Michael Hanselmann
      self._next = self._fn()
867 26d3fd2f Michael Hanselmann
868 26d3fd2f Michael Hanselmann
  def Peek(self):
869 26d3fd2f Michael Hanselmann
    """Returns the next timeout.
870 26d3fd2f Michael Hanselmann

871 26d3fd2f Michael Hanselmann
    """
872 26d3fd2f Michael Hanselmann
    self._Advance()
873 26d3fd2f Michael Hanselmann
    return self._next
874 26d3fd2f Michael Hanselmann
875 26d3fd2f Michael Hanselmann
  def Next(self):
876 26d3fd2f Michael Hanselmann
    """Returns the current timeout and advances the internal state.
877 26d3fd2f Michael Hanselmann

878 26d3fd2f Michael Hanselmann
    """
879 26d3fd2f Michael Hanselmann
    self._Advance()
880 26d3fd2f Michael Hanselmann
    result = self._next
881 26d3fd2f Michael Hanselmann
    self._next = None
882 26d3fd2f Michael Hanselmann
    return result
883 26d3fd2f Michael Hanselmann
884 26d3fd2f Michael Hanselmann
885 b80cc518 Michael Hanselmann
class _OpExecContext:
886 26d3fd2f Michael Hanselmann
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
887 b80cc518 Michael Hanselmann
    """Initializes this class.
888 b80cc518 Michael Hanselmann

889 b80cc518 Michael Hanselmann
    """
890 b80cc518 Michael Hanselmann
    self.op = op
891 b80cc518 Michael Hanselmann
    self.index = index
892 b80cc518 Michael Hanselmann
    self.log_prefix = log_prefix
893 b80cc518 Michael Hanselmann
    self.summary = op.input.Summary()
894 b80cc518 Michael Hanselmann
895 b95479a5 Michael Hanselmann
    # Create local copy to modify
896 b95479a5 Michael Hanselmann
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
897 b95479a5 Michael Hanselmann
      self.jobdeps = op.input.depends[:]
898 b95479a5 Michael Hanselmann
    else:
899 b95479a5 Michael Hanselmann
      self.jobdeps = None
900 b95479a5 Michael Hanselmann
901 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = timeout_strategy_factory
902 26d3fd2f Michael Hanselmann
    self._ResetTimeoutStrategy()
903 26d3fd2f Michael Hanselmann
904 26d3fd2f Michael Hanselmann
  def _ResetTimeoutStrategy(self):
905 26d3fd2f Michael Hanselmann
    """Creates a new timeout strategy.
906 26d3fd2f Michael Hanselmann

907 26d3fd2f Michael Hanselmann
    """
908 26d3fd2f Michael Hanselmann
    self._timeout_strategy = \
909 26d3fd2f Michael Hanselmann
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
910 26d3fd2f Michael Hanselmann
911 26d3fd2f Michael Hanselmann
  def CheckPriorityIncrease(self):
912 26d3fd2f Michael Hanselmann
    """Checks whether priority can and should be increased.
913 26d3fd2f Michael Hanselmann

914 26d3fd2f Michael Hanselmann
    Called when locks couldn't be acquired.
915 26d3fd2f Michael Hanselmann

916 26d3fd2f Michael Hanselmann
    """
917 26d3fd2f Michael Hanselmann
    op = self.op
918 26d3fd2f Michael Hanselmann
919 26d3fd2f Michael Hanselmann
    # Exhausted all retries and next round should not use blocking acquire
920 26d3fd2f Michael Hanselmann
    # for locks?
921 26d3fd2f Michael Hanselmann
    if (self._timeout_strategy.Peek() is None and
922 26d3fd2f Michael Hanselmann
        op.priority > constants.OP_PRIO_HIGHEST):
923 26d3fd2f Michael Hanselmann
      logging.debug("Increasing priority")
924 26d3fd2f Michael Hanselmann
      op.priority -= 1
925 26d3fd2f Michael Hanselmann
      self._ResetTimeoutStrategy()
926 26d3fd2f Michael Hanselmann
      return True
927 26d3fd2f Michael Hanselmann
928 26d3fd2f Michael Hanselmann
    return False
929 26d3fd2f Michael Hanselmann
930 26d3fd2f Michael Hanselmann
  def GetNextLockTimeout(self):
931 26d3fd2f Michael Hanselmann
    """Returns the next lock acquire timeout.
932 26d3fd2f Michael Hanselmann

933 26d3fd2f Michael Hanselmann
    """
934 26d3fd2f Michael Hanselmann
    return self._timeout_strategy.Next()
935 26d3fd2f Michael Hanselmann
936 b80cc518 Michael Hanselmann
937 be760ba8 Michael Hanselmann
class _JobProcessor(object):
938 75d81fc8 Michael Hanselmann
  (DEFER,
939 75d81fc8 Michael Hanselmann
   WAITDEP,
940 75d81fc8 Michael Hanselmann
   FINISHED) = range(1, 4)
941 75d81fc8 Michael Hanselmann
942 26d3fd2f Michael Hanselmann
  def __init__(self, queue, opexec_fn, job,
943 26d3fd2f Michael Hanselmann
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
944 be760ba8 Michael Hanselmann
    """Initializes this class.
945 be760ba8 Michael Hanselmann

946 be760ba8 Michael Hanselmann
    """
947 be760ba8 Michael Hanselmann
    self.queue = queue
948 be760ba8 Michael Hanselmann
    self.opexec_fn = opexec_fn
949 be760ba8 Michael Hanselmann
    self.job = job
950 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = _timeout_strategy_factory
951 be760ba8 Michael Hanselmann
952 be760ba8 Michael Hanselmann
  @staticmethod
953 26d3fd2f Michael Hanselmann
  def _FindNextOpcode(job, timeout_strategy_factory):
954 be760ba8 Michael Hanselmann
    """Locates the next opcode to run.
955 be760ba8 Michael Hanselmann

956 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
957 be760ba8 Michael Hanselmann
    @param job: Job object
958 26d3fd2f Michael Hanselmann
    @param timeout_strategy_factory: Callable to create new timeout strategy
959 be760ba8 Michael Hanselmann

960 be760ba8 Michael Hanselmann
    """
961 be760ba8 Michael Hanselmann
    # Create some sort of a cache to speed up locating next opcode for future
962 be760ba8 Michael Hanselmann
    # lookups
963 be760ba8 Michael Hanselmann
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
964 be760ba8 Michael Hanselmann
    # pending and one for processed ops.
965 03b63608 Michael Hanselmann
    if job.ops_iter is None:
966 03b63608 Michael Hanselmann
      job.ops_iter = enumerate(job.ops)
967 be760ba8 Michael Hanselmann
968 be760ba8 Michael Hanselmann
    # Find next opcode to run
969 be760ba8 Michael Hanselmann
    while True:
970 be760ba8 Michael Hanselmann
      try:
971 03b63608 Michael Hanselmann
        (idx, op) = job.ops_iter.next()
972 be760ba8 Michael Hanselmann
      except StopIteration:
973 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for a finished job")
974 be760ba8 Michael Hanselmann
975 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_RUNNING:
976 be760ba8 Michael Hanselmann
        # Found an opcode already marked as running
977 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for job marked as running")
978 be760ba8 Michael Hanselmann
979 26d3fd2f Michael Hanselmann
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
980 26d3fd2f Michael Hanselmann
                             timeout_strategy_factory)
981 be760ba8 Michael Hanselmann
982 66bd7445 Michael Hanselmann
      if op.status not in constants.OPS_FINALIZED:
983 66bd7445 Michael Hanselmann
        return opctx
984 be760ba8 Michael Hanselmann
985 66bd7445 Michael Hanselmann
      # This is a job that was partially completed before master daemon
986 66bd7445 Michael Hanselmann
      # shutdown, so it can be expected that some opcodes are already
987 66bd7445 Michael Hanselmann
      # completed successfully (if any did error out, then the whole job
988 66bd7445 Michael Hanselmann
      # should have been aborted and not resubmitted for processing).
989 66bd7445 Michael Hanselmann
      logging.info("%s: opcode %s already processed, skipping",
990 66bd7445 Michael Hanselmann
                   opctx.log_prefix, opctx.summary)
991 be760ba8 Michael Hanselmann
992 be760ba8 Michael Hanselmann
  @staticmethod
993 be760ba8 Michael Hanselmann
  def _MarkWaitlock(job, op):
994 be760ba8 Michael Hanselmann
    """Marks an opcode as waiting for locks.
995 be760ba8 Michael Hanselmann

996 be760ba8 Michael Hanselmann
    The job's start timestamp is also set if necessary.
997 be760ba8 Michael Hanselmann

998 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
999 be760ba8 Michael Hanselmann
    @param job: Job object
1000 a38e8674 Michael Hanselmann
    @type op: L{_QueuedOpCode}
1001 a38e8674 Michael Hanselmann
    @param op: Opcode object
1002 be760ba8 Michael Hanselmann

1003 be760ba8 Michael Hanselmann
    """
1004 be760ba8 Michael Hanselmann
    assert op in job.ops
1005 5fd6b694 Michael Hanselmann
    assert op.status in (constants.OP_STATUS_QUEUED,
1006 47099cd1 Michael Hanselmann
                         constants.OP_STATUS_WAITING)
1007 5fd6b694 Michael Hanselmann
1008 5fd6b694 Michael Hanselmann
    update = False
1009 be760ba8 Michael Hanselmann
1010 be760ba8 Michael Hanselmann
    op.result = None
1011 5fd6b694 Michael Hanselmann
1012 5fd6b694 Michael Hanselmann
    if op.status == constants.OP_STATUS_QUEUED:
1013 47099cd1 Michael Hanselmann
      op.status = constants.OP_STATUS_WAITING
1014 5fd6b694 Michael Hanselmann
      update = True
1015 5fd6b694 Michael Hanselmann
1016 5fd6b694 Michael Hanselmann
    if op.start_timestamp is None:
1017 5fd6b694 Michael Hanselmann
      op.start_timestamp = TimeStampNow()
1018 5fd6b694 Michael Hanselmann
      update = True
1019 be760ba8 Michael Hanselmann
1020 be760ba8 Michael Hanselmann
    if job.start_timestamp is None:
1021 be760ba8 Michael Hanselmann
      job.start_timestamp = op.start_timestamp
1022 5fd6b694 Michael Hanselmann
      update = True
1023 5fd6b694 Michael Hanselmann
1024 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1025 5fd6b694 Michael Hanselmann
1026 5fd6b694 Michael Hanselmann
    return update
1027 be760ba8 Michael Hanselmann
1028 b95479a5 Michael Hanselmann
  @staticmethod
1029 b95479a5 Michael Hanselmann
  def _CheckDependencies(queue, job, opctx):
1030 b95479a5 Michael Hanselmann
    """Checks if an opcode has dependencies and if so, processes them.
1031 b95479a5 Michael Hanselmann

1032 b95479a5 Michael Hanselmann
    @type queue: L{JobQueue}
1033 b95479a5 Michael Hanselmann
    @param queue: Queue object
1034 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1035 b95479a5 Michael Hanselmann
    @param job: Job object
1036 b95479a5 Michael Hanselmann
    @type opctx: L{_OpExecContext}
1037 b95479a5 Michael Hanselmann
    @param opctx: Opcode execution context
1038 b95479a5 Michael Hanselmann
    @rtype: bool
1039 b95479a5 Michael Hanselmann
    @return: Whether opcode will be re-scheduled by dependency tracker
1040 b95479a5 Michael Hanselmann

1041 b95479a5 Michael Hanselmann
    """
1042 b95479a5 Michael Hanselmann
    op = opctx.op
1043 b95479a5 Michael Hanselmann
1044 b95479a5 Michael Hanselmann
    result = False
1045 b95479a5 Michael Hanselmann
1046 b95479a5 Michael Hanselmann
    while opctx.jobdeps:
1047 b95479a5 Michael Hanselmann
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1048 b95479a5 Michael Hanselmann
1049 b95479a5 Michael Hanselmann
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1050 b95479a5 Michael Hanselmann
                                                          dep_status)
1051 b95479a5 Michael Hanselmann
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1052 b95479a5 Michael Hanselmann
1053 b95479a5 Michael Hanselmann
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1054 b95479a5 Michael Hanselmann
1055 b95479a5 Michael Hanselmann
      if depresult == _JobDependencyManager.CONTINUE:
1056 b95479a5 Michael Hanselmann
        # Remove dependency and continue
1057 b95479a5 Michael Hanselmann
        opctx.jobdeps.pop(0)
1058 b95479a5 Michael Hanselmann
1059 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.WAIT:
1060 b95479a5 Michael Hanselmann
        # Need to wait for notification, dependency tracker will re-add job
1061 b95479a5 Michael Hanselmann
        # to workerpool
1062 b95479a5 Michael Hanselmann
        result = True
1063 b95479a5 Michael Hanselmann
        break
1064 b95479a5 Michael Hanselmann
1065 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.CANCEL:
1066 b95479a5 Michael Hanselmann
        # Job was cancelled, cancel this job as well
1067 b95479a5 Michael Hanselmann
        job.Cancel()
1068 b95479a5 Michael Hanselmann
        assert op.status == constants.OP_STATUS_CANCELING
1069 b95479a5 Michael Hanselmann
        break
1070 b95479a5 Michael Hanselmann
1071 b95479a5 Michael Hanselmann
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1072 b95479a5 Michael Hanselmann
                         _JobDependencyManager.ERROR):
1073 b95479a5 Michael Hanselmann
        # Job failed or there was an error, this job must fail
1074 b95479a5 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1075 b95479a5 Michael Hanselmann
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1076 b95479a5 Michael Hanselmann
        break
1077 b95479a5 Michael Hanselmann
1078 b95479a5 Michael Hanselmann
      else:
1079 b95479a5 Michael Hanselmann
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1080 b95479a5 Michael Hanselmann
                                     depresult)
1081 b95479a5 Michael Hanselmann
1082 b95479a5 Michael Hanselmann
    return result
1083 b95479a5 Michael Hanselmann
1084 b80cc518 Michael Hanselmann
  def _ExecOpCodeUnlocked(self, opctx):
1085 be760ba8 Michael Hanselmann
    """Processes one opcode and returns the result.
1086 be760ba8 Michael Hanselmann

1087 be760ba8 Michael Hanselmann
    """
1088 b80cc518 Michael Hanselmann
    op = opctx.op
1089 b80cc518 Michael Hanselmann
1090 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1091 be760ba8 Michael Hanselmann
1092 26d3fd2f Michael Hanselmann
    timeout = opctx.GetNextLockTimeout()
1093 26d3fd2f Michael Hanselmann
1094 be760ba8 Michael Hanselmann
    try:
1095 be760ba8 Michael Hanselmann
      # Make sure not to hold queue lock while calling ExecOpCode
1096 be760ba8 Michael Hanselmann
      result = self.opexec_fn(op.input,
1097 26d3fd2f Michael Hanselmann
                              _OpExecCallbacks(self.queue, self.job, op),
1098 e4e59de8 Michael Hanselmann
                              timeout=timeout)
1099 26d3fd2f Michael Hanselmann
    except mcpu.LockAcquireTimeout:
1100 26d3fd2f Michael Hanselmann
      assert timeout is not None, "Received timeout for blocking acquire"
1101 26d3fd2f Michael Hanselmann
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1102 9e49dfc5 Michael Hanselmann
1103 47099cd1 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_WAITING,
1104 9e49dfc5 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1105 9e49dfc5 Michael Hanselmann
1106 9e49dfc5 Michael Hanselmann
      # Was job cancelled while we were waiting for the lock?
1107 9e49dfc5 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELING:
1108 9e49dfc5 Michael Hanselmann
        return (constants.OP_STATUS_CANCELING, None)
1109 9e49dfc5 Michael Hanselmann
1110 942e2262 Michael Hanselmann
      # Queue is shutting down, return to queued
1111 942e2262 Michael Hanselmann
      if not self.queue.AcceptingJobsUnlocked():
1112 942e2262 Michael Hanselmann
        return (constants.OP_STATUS_QUEUED, None)
1113 942e2262 Michael Hanselmann
1114 5fd6b694 Michael Hanselmann
      # Stay in waitlock while trying to re-acquire lock
1115 47099cd1 Michael Hanselmann
      return (constants.OP_STATUS_WAITING, None)
1116 be760ba8 Michael Hanselmann
    except CancelJob:
1117 b80cc518 Michael Hanselmann
      logging.exception("%s: Canceling job", opctx.log_prefix)
1118 be760ba8 Michael Hanselmann
      assert op.status == constants.OP_STATUS_CANCELING
1119 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_CANCELING, None)
1120 942e2262 Michael Hanselmann
1121 942e2262 Michael Hanselmann
    except QueueShutdown:
1122 942e2262 Michael Hanselmann
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1123 942e2262 Michael Hanselmann
1124 942e2262 Michael Hanselmann
      assert op.status == constants.OP_STATUS_WAITING
1125 942e2262 Michael Hanselmann
1126 942e2262 Michael Hanselmann
      # Job hadn't been started yet, so it should return to the queue
1127 942e2262 Michael Hanselmann
      return (constants.OP_STATUS_QUEUED, None)
1128 942e2262 Michael Hanselmann
1129 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1130 b80cc518 Michael Hanselmann
      logging.exception("%s: Caught exception in %s",
1131 b80cc518 Michael Hanselmann
                        opctx.log_prefix, opctx.summary)
1132 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1133 be760ba8 Michael Hanselmann
    else:
1134 b80cc518 Michael Hanselmann
      logging.debug("%s: %s successful",
1135 b80cc518 Michael Hanselmann
                    opctx.log_prefix, opctx.summary)
1136 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_SUCCESS, result)
1137 be760ba8 Michael Hanselmann
1138 26d3fd2f Michael Hanselmann
  def __call__(self, _nextop_fn=None):
1139 be760ba8 Michael Hanselmann
    """Continues execution of a job.
1140 be760ba8 Michael Hanselmann

1141 26d3fd2f Michael Hanselmann
    @param _nextop_fn: Callback function for tests
1142 75d81fc8 Michael Hanselmann
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1143 75d81fc8 Michael Hanselmann
      be deferred and C{WAITDEP} if the dependency manager
1144 75d81fc8 Michael Hanselmann
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1145 be760ba8 Michael Hanselmann

1146 be760ba8 Michael Hanselmann
    """
1147 be760ba8 Michael Hanselmann
    queue = self.queue
1148 be760ba8 Michael Hanselmann
    job = self.job
1149 be760ba8 Michael Hanselmann
1150 be760ba8 Michael Hanselmann
    logging.debug("Processing job %s", job.id)
1151 be760ba8 Michael Hanselmann
1152 be760ba8 Michael Hanselmann
    queue.acquire(shared=1)
1153 be760ba8 Michael Hanselmann
    try:
1154 be760ba8 Michael Hanselmann
      opcount = len(job.ops)
1155 be760ba8 Michael Hanselmann
1156 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Expected writable job"
1157 c0f6d0d8 Michael Hanselmann
1158 66bd7445 Michael Hanselmann
      # Don't do anything for finalized jobs
1159 66bd7445 Michael Hanselmann
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1160 75d81fc8 Michael Hanselmann
        return self.FINISHED
1161 66bd7445 Michael Hanselmann
1162 26d3fd2f Michael Hanselmann
      # Is a previous opcode still pending?
1163 26d3fd2f Michael Hanselmann
      if job.cur_opctx:
1164 26d3fd2f Michael Hanselmann
        opctx = job.cur_opctx
1165 5fd6b694 Michael Hanselmann
        job.cur_opctx = None
1166 26d3fd2f Michael Hanselmann
      else:
1167 26d3fd2f Michael Hanselmann
        if __debug__ and _nextop_fn:
1168 26d3fd2f Michael Hanselmann
          _nextop_fn()
1169 26d3fd2f Michael Hanselmann
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1170 26d3fd2f Michael Hanselmann
1171 b80cc518 Michael Hanselmann
      op = opctx.op
1172 be760ba8 Michael Hanselmann
1173 be760ba8 Michael Hanselmann
      # Consistency check
1174 be760ba8 Michael Hanselmann
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1175 66bd7445 Michael Hanselmann
                                     constants.OP_STATUS_CANCELING)
1176 5fd6b694 Michael Hanselmann
                        for i in job.ops[opctx.index + 1:])
1177 be760ba8 Michael Hanselmann
1178 be760ba8 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_QUEUED,
1179 47099cd1 Michael Hanselmann
                           constants.OP_STATUS_WAITING,
1180 66bd7445 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1181 be760ba8 Michael Hanselmann
1182 26d3fd2f Michael Hanselmann
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1183 26d3fd2f Michael Hanselmann
              op.priority >= constants.OP_PRIO_HIGHEST)
1184 26d3fd2f Michael Hanselmann
1185 b95479a5 Michael Hanselmann
      waitjob = None
1186 b95479a5 Michael Hanselmann
1187 66bd7445 Michael Hanselmann
      if op.status != constants.OP_STATUS_CANCELING:
1188 30c945d0 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
1189 47099cd1 Michael Hanselmann
                             constants.OP_STATUS_WAITING)
1190 30c945d0 Michael Hanselmann
1191 be760ba8 Michael Hanselmann
        # Prepare to start opcode
1192 5fd6b694 Michael Hanselmann
        if self._MarkWaitlock(job, op):
1193 5fd6b694 Michael Hanselmann
          # Write to disk
1194 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1195 be760ba8 Michael Hanselmann
1196 47099cd1 Michael Hanselmann
        assert op.status == constants.OP_STATUS_WAITING
1197 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1198 5fd6b694 Michael Hanselmann
        assert job.start_timestamp and op.start_timestamp
1199 b95479a5 Michael Hanselmann
        assert waitjob is None
1200 b95479a5 Michael Hanselmann
1201 b95479a5 Michael Hanselmann
        # Check if waiting for a job is necessary
1202 b95479a5 Michael Hanselmann
        waitjob = self._CheckDependencies(queue, job, opctx)
1203 be760ba8 Michael Hanselmann
1204 47099cd1 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_WAITING,
1205 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_CANCELING,
1206 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_ERROR)
1207 be760ba8 Michael Hanselmann
1208 b95479a5 Michael Hanselmann
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1209 b95479a5 Michael Hanselmann
                                         constants.OP_STATUS_ERROR)):
1210 b95479a5 Michael Hanselmann
          logging.info("%s: opcode %s waiting for locks",
1211 b95479a5 Michael Hanselmann
                       opctx.log_prefix, opctx.summary)
1212 be760ba8 Michael Hanselmann
1213 b95479a5 Michael Hanselmann
          assert not opctx.jobdeps, "Not all dependencies were removed"
1214 b95479a5 Michael Hanselmann
1215 b95479a5 Michael Hanselmann
          queue.release()
1216 b95479a5 Michael Hanselmann
          try:
1217 b95479a5 Michael Hanselmann
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1218 b95479a5 Michael Hanselmann
          finally:
1219 b95479a5 Michael Hanselmann
            queue.acquire(shared=1)
1220 b95479a5 Michael Hanselmann
1221 b95479a5 Michael Hanselmann
          op.status = op_status
1222 b95479a5 Michael Hanselmann
          op.result = op_result
1223 b95479a5 Michael Hanselmann
1224 b95479a5 Michael Hanselmann
          assert not waitjob
1225 be760ba8 Michael Hanselmann
1226 942e2262 Michael Hanselmann
        if op.status in (constants.OP_STATUS_WAITING,
1227 942e2262 Michael Hanselmann
                         constants.OP_STATUS_QUEUED):
1228 942e2262 Michael Hanselmann
          # waiting: Couldn't get locks in time
1229 942e2262 Michael Hanselmann
          # queued: Queue is shutting down
1230 26d3fd2f Michael Hanselmann
          assert not op.end_timestamp
1231 be760ba8 Michael Hanselmann
        else:
1232 26d3fd2f Michael Hanselmann
          # Finalize opcode
1233 26d3fd2f Michael Hanselmann
          op.end_timestamp = TimeStampNow()
1234 be760ba8 Michael Hanselmann
1235 26d3fd2f Michael Hanselmann
          if op.status == constants.OP_STATUS_CANCELING:
1236 26d3fd2f Michael Hanselmann
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1237 26d3fd2f Michael Hanselmann
                                  for i in job.ops[opctx.index:])
1238 26d3fd2f Michael Hanselmann
          else:
1239 26d3fd2f Michael Hanselmann
            assert op.status in constants.OPS_FINALIZED
1240 be760ba8 Michael Hanselmann
1241 942e2262 Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
1242 942e2262 Michael Hanselmann
        # Queue is shutting down
1243 942e2262 Michael Hanselmann
        assert not waitjob
1244 942e2262 Michael Hanselmann
1245 942e2262 Michael Hanselmann
        finalize = False
1246 942e2262 Michael Hanselmann
1247 942e2262 Michael Hanselmann
        # Reset context
1248 942e2262 Michael Hanselmann
        job.cur_opctx = None
1249 942e2262 Michael Hanselmann
1250 942e2262 Michael Hanselmann
        # In no case must the status be finalized here
1251 942e2262 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1252 942e2262 Michael Hanselmann
1253 942e2262 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1254 be760ba8 Michael Hanselmann
        finalize = False
1255 be760ba8 Michael Hanselmann
1256 b95479a5 Michael Hanselmann
        if not waitjob and opctx.CheckPriorityIncrease():
1257 5fd6b694 Michael Hanselmann
          # Priority was changed, need to update on-disk file
1258 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1259 be760ba8 Michael Hanselmann
1260 26d3fd2f Michael Hanselmann
        # Keep around for another round
1261 26d3fd2f Michael Hanselmann
        job.cur_opctx = opctx
1262 be760ba8 Michael Hanselmann
1263 26d3fd2f Michael Hanselmann
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1264 26d3fd2f Michael Hanselmann
                op.priority >= constants.OP_PRIO_HIGHEST)
1265 be760ba8 Michael Hanselmann
1266 26d3fd2f Michael Hanselmann
        # In no case must the status be finalized here
1267 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1268 be760ba8 Michael Hanselmann
1269 be760ba8 Michael Hanselmann
      else:
1270 26d3fd2f Michael Hanselmann
        # Ensure all opcodes so far have been successful
1271 26d3fd2f Michael Hanselmann
        assert (opctx.index == 0 or
1272 26d3fd2f Michael Hanselmann
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1273 26d3fd2f Michael Hanselmann
                           for i in job.ops[:opctx.index]))
1274 26d3fd2f Michael Hanselmann
1275 26d3fd2f Michael Hanselmann
        # Reset context
1276 26d3fd2f Michael Hanselmann
        job.cur_opctx = None
1277 26d3fd2f Michael Hanselmann
1278 26d3fd2f Michael Hanselmann
        if op.status == constants.OP_STATUS_SUCCESS:
1279 26d3fd2f Michael Hanselmann
          finalize = False
1280 26d3fd2f Michael Hanselmann
1281 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_ERROR:
1282 26d3fd2f Michael Hanselmann
          # Ensure failed opcode has an exception as its result
1283 26d3fd2f Michael Hanselmann
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1284 26d3fd2f Michael Hanselmann
1285 26d3fd2f Michael Hanselmann
          to_encode = errors.OpExecError("Preceding opcode failed")
1286 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1287 26d3fd2f Michael Hanselmann
                                _EncodeOpError(to_encode))
1288 26d3fd2f Michael Hanselmann
          finalize = True
1289 be760ba8 Michael Hanselmann
1290 26d3fd2f Michael Hanselmann
          # Consistency check
1291 26d3fd2f Michael Hanselmann
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1292 26d3fd2f Michael Hanselmann
                            errors.GetEncodedError(i.result)
1293 26d3fd2f Michael Hanselmann
                            for i in job.ops[opctx.index:])
1294 be760ba8 Michael Hanselmann
1295 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELING:
1296 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1297 26d3fd2f Michael Hanselmann
                                "Job canceled by request")
1298 26d3fd2f Michael Hanselmann
          finalize = True
1299 26d3fd2f Michael Hanselmann
1300 26d3fd2f Michael Hanselmann
        else:
1301 26d3fd2f Michael Hanselmann
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1302 26d3fd2f Michael Hanselmann
1303 66bd7445 Michael Hanselmann
        if opctx.index == (opcount - 1):
1304 66bd7445 Michael Hanselmann
          # Finalize on last opcode
1305 66bd7445 Michael Hanselmann
          finalize = True
1306 66bd7445 Michael Hanselmann
1307 66bd7445 Michael Hanselmann
        if finalize:
1308 26d3fd2f Michael Hanselmann
          # All opcodes have been run, finalize job
1309 66bd7445 Michael Hanselmann
          job.Finalize()
1310 26d3fd2f Michael Hanselmann
1311 26d3fd2f Michael Hanselmann
        # Write to disk. If the job status is final, this is the final write
1312 26d3fd2f Michael Hanselmann
        # allowed. Once the file has been written, it can be archived anytime.
1313 26d3fd2f Michael Hanselmann
        queue.UpdateJobUnlocked(job)
1314 be760ba8 Michael Hanselmann
1315 b95479a5 Michael Hanselmann
        assert not waitjob
1316 b95479a5 Michael Hanselmann
1317 66bd7445 Michael Hanselmann
        if finalize:
1318 26d3fd2f Michael Hanselmann
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1319 75d81fc8 Michael Hanselmann
          return self.FINISHED
1320 be760ba8 Michael Hanselmann
1321 b95479a5 Michael Hanselmann
      assert not waitjob or queue.depmgr.JobWaiting(job)
1322 b95479a5 Michael Hanselmann
1323 75d81fc8 Michael Hanselmann
      if waitjob:
1324 75d81fc8 Michael Hanselmann
        return self.WAITDEP
1325 75d81fc8 Michael Hanselmann
      else:
1326 75d81fc8 Michael Hanselmann
        return self.DEFER
1327 be760ba8 Michael Hanselmann
    finally:
1328 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Job became read-only while being processed"
1329 be760ba8 Michael Hanselmann
      queue.release()
1330 be760ba8 Michael Hanselmann
1331 be760ba8 Michael Hanselmann
1332 df5a5730 Michael Hanselmann
def _EvaluateJobProcessorResult(depmgr, job, result):
1333 df5a5730 Michael Hanselmann
  """Looks at a result from L{_JobProcessor} for a job.
1334 df5a5730 Michael Hanselmann

1335 df5a5730 Michael Hanselmann
  To be used in a L{_JobQueueWorker}.
1336 df5a5730 Michael Hanselmann

1337 df5a5730 Michael Hanselmann
  """
1338 df5a5730 Michael Hanselmann
  if result == _JobProcessor.FINISHED:
1339 df5a5730 Michael Hanselmann
    # Notify waiting jobs
1340 df5a5730 Michael Hanselmann
    depmgr.NotifyWaiters(job.id)
1341 df5a5730 Michael Hanselmann
1342 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.DEFER:
1343 df5a5730 Michael Hanselmann
    # Schedule again
1344 df5a5730 Michael Hanselmann
    raise workerpool.DeferTask(priority=job.CalcPriority())
1345 df5a5730 Michael Hanselmann
1346 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.WAITDEP:
1347 df5a5730 Michael Hanselmann
    # No-op, dependency manager will re-schedule
1348 df5a5730 Michael Hanselmann
    pass
1349 df5a5730 Michael Hanselmann
1350 df5a5730 Michael Hanselmann
  else:
1351 df5a5730 Michael Hanselmann
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1352 df5a5730 Michael Hanselmann
                                 (result, ))
1353 df5a5730 Michael Hanselmann
1354 df5a5730 Michael Hanselmann
1355 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
1356 031a3e57 Michael Hanselmann
  """The actual job workers.
1357 031a3e57 Michael Hanselmann

1358 031a3e57 Michael Hanselmann
  """
1359 b459a848 Andrea Spadaccini
  def RunTask(self, job): # pylint: disable=W0221
1360 e2715f69 Michael Hanselmann
    """Job executor.
1361 e2715f69 Michael Hanselmann

1362 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1363 ea03467c Iustin Pop
    @param job: the job to be processed
1364 ea03467c Iustin Pop

1365 e2715f69 Michael Hanselmann
    """
1366 f8a4adfa Michael Hanselmann
    assert job.writable, "Expected writable job"
1367 f8a4adfa Michael Hanselmann
1368 b95479a5 Michael Hanselmann
    # Ensure only one worker is active on a single job. If a job registers for
1369 b95479a5 Michael Hanselmann
    # a dependency job, and the other job notifies before the first worker is
1370 b95479a5 Michael Hanselmann
    # done, the job can end up in the tasklist more than once.
1371 b95479a5 Michael Hanselmann
    job.processor_lock.acquire()
1372 b95479a5 Michael Hanselmann
    try:
1373 b95479a5 Michael Hanselmann
      return self._RunTaskInner(job)
1374 b95479a5 Michael Hanselmann
    finally:
1375 b95479a5 Michael Hanselmann
      job.processor_lock.release()
1376 b95479a5 Michael Hanselmann
1377 b95479a5 Michael Hanselmann
  def _RunTaskInner(self, job):
1378 b95479a5 Michael Hanselmann
    """Executes a job.
1379 b95479a5 Michael Hanselmann

1380 b95479a5 Michael Hanselmann
    Must be called with per-job lock acquired.
1381 b95479a5 Michael Hanselmann

1382 b95479a5 Michael Hanselmann
    """
1383 be760ba8 Michael Hanselmann
    queue = job.queue
1384 be760ba8 Michael Hanselmann
    assert queue == self.pool.queue
1385 be760ba8 Michael Hanselmann
1386 0aeeb6e3 Michael Hanselmann
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1387 0aeeb6e3 Michael Hanselmann
    setname_fn(None)
1388 daba67c7 Michael Hanselmann
1389 be760ba8 Michael Hanselmann
    proc = mcpu.Processor(queue.context, job.id)
1390 be760ba8 Michael Hanselmann
1391 0aeeb6e3 Michael Hanselmann
    # Create wrapper for setting thread name
1392 0aeeb6e3 Michael Hanselmann
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1393 0aeeb6e3 Michael Hanselmann
                                    proc.ExecOpCode)
1394 0aeeb6e3 Michael Hanselmann
1395 df5a5730 Michael Hanselmann
    _EvaluateJobProcessorResult(queue.depmgr, job,
1396 df5a5730 Michael Hanselmann
                                _JobProcessor(queue, wrap_execop_fn, job)())
1397 75d81fc8 Michael Hanselmann
1398 0aeeb6e3 Michael Hanselmann
  @staticmethod
1399 0aeeb6e3 Michael Hanselmann
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1400 0aeeb6e3 Michael Hanselmann
    """Updates the worker thread name to include a short summary of the opcode.
1401 0aeeb6e3 Michael Hanselmann

1402 0aeeb6e3 Michael Hanselmann
    @param setname_fn: Callable setting worker thread name
1403 0aeeb6e3 Michael Hanselmann
    @param execop_fn: Callable for executing opcode (usually
1404 0aeeb6e3 Michael Hanselmann
                      L{mcpu.Processor.ExecOpCode})
1405 0aeeb6e3 Michael Hanselmann

1406 0aeeb6e3 Michael Hanselmann
    """
1407 0aeeb6e3 Michael Hanselmann
    setname_fn(op)
1408 0aeeb6e3 Michael Hanselmann
    try:
1409 0aeeb6e3 Michael Hanselmann
      return execop_fn(op, *args, **kwargs)
1410 0aeeb6e3 Michael Hanselmann
    finally:
1411 0aeeb6e3 Michael Hanselmann
      setname_fn(None)
1412 0aeeb6e3 Michael Hanselmann
1413 0aeeb6e3 Michael Hanselmann
  @staticmethod
1414 0aeeb6e3 Michael Hanselmann
  def _GetWorkerName(job, op):
1415 0aeeb6e3 Michael Hanselmann
    """Sets the worker thread name.
1416 0aeeb6e3 Michael Hanselmann

1417 0aeeb6e3 Michael Hanselmann
    @type job: L{_QueuedJob}
1418 0aeeb6e3 Michael Hanselmann
    @type op: L{opcodes.OpCode}
1419 0aeeb6e3 Michael Hanselmann

1420 0aeeb6e3 Michael Hanselmann
    """
1421 0aeeb6e3 Michael Hanselmann
    parts = ["Job%s" % job.id]
1422 0aeeb6e3 Michael Hanselmann
1423 0aeeb6e3 Michael Hanselmann
    if op:
1424 0aeeb6e3 Michael Hanselmann
      parts.append(op.TinySummary())
1425 0aeeb6e3 Michael Hanselmann
1426 0aeeb6e3 Michael Hanselmann
    return "/".join(parts)
1427 0aeeb6e3 Michael Hanselmann
1428 e2715f69 Michael Hanselmann
1429 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
1430 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
1431 ea03467c Iustin Pop

1432 ea03467c Iustin Pop
  """
1433 5bdce580 Michael Hanselmann
  def __init__(self, queue):
1434 0aeeb6e3 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("Jq",
1435 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
1436 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
1437 5bdce580 Michael Hanselmann
    self.queue = queue
1438 e2715f69 Michael Hanselmann
1439 e2715f69 Michael Hanselmann
1440 b95479a5 Michael Hanselmann
class _JobDependencyManager:
1441 b95479a5 Michael Hanselmann
  """Keeps track of job dependencies.
1442 b95479a5 Michael Hanselmann

1443 b95479a5 Michael Hanselmann
  """
1444 b95479a5 Michael Hanselmann
  (WAIT,
1445 b95479a5 Michael Hanselmann
   ERROR,
1446 b95479a5 Michael Hanselmann
   CANCEL,
1447 b95479a5 Michael Hanselmann
   CONTINUE,
1448 b95479a5 Michael Hanselmann
   WRONGSTATUS) = range(1, 6)
1449 b95479a5 Michael Hanselmann
1450 b95479a5 Michael Hanselmann
  def __init__(self, getstatus_fn, enqueue_fn):
1451 b95479a5 Michael Hanselmann
    """Initializes this class.
1452 b95479a5 Michael Hanselmann

1453 b95479a5 Michael Hanselmann
    """
1454 b95479a5 Michael Hanselmann
    self._getstatus_fn = getstatus_fn
1455 b95479a5 Michael Hanselmann
    self._enqueue_fn = enqueue_fn
1456 b95479a5 Michael Hanselmann
1457 b95479a5 Michael Hanselmann
    self._waiters = {}
1458 b95479a5 Michael Hanselmann
    self._lock = locking.SharedLock("JobDepMgr")
1459 b95479a5 Michael Hanselmann
1460 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1461 b459a848 Andrea Spadaccini
  def GetLockInfo(self, requested): # pylint: disable=W0613
1462 fcb21ad7 Michael Hanselmann
    """Retrieves information about waiting jobs.
1463 fcb21ad7 Michael Hanselmann

1464 fcb21ad7 Michael Hanselmann
    @type requested: set
1465 fcb21ad7 Michael Hanselmann
    @param requested: Requested information, see C{query.LQ_*}
1466 fcb21ad7 Michael Hanselmann

1467 fcb21ad7 Michael Hanselmann
    """
1468 fcb21ad7 Michael Hanselmann
    # No need to sort here, that's being done by the lock manager and query
1469 fcb21ad7 Michael Hanselmann
    # library. There are no priorities for notifying jobs, hence all show up as
1470 fcb21ad7 Michael Hanselmann
    # one item under "pending".
1471 fcb21ad7 Michael Hanselmann
    return [("job/%s" % job_id, None, None,
1472 fcb21ad7 Michael Hanselmann
             [("job", [job.id for job in waiters])])
1473 fcb21ad7 Michael Hanselmann
            for job_id, waiters in self._waiters.items()
1474 fcb21ad7 Michael Hanselmann
            if waiters]
1475 fcb21ad7 Michael Hanselmann
1476 fcb21ad7 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1477 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
1478 b95479a5 Michael Hanselmann
    """Checks if a job is waiting.
1479 b95479a5 Michael Hanselmann

1480 b95479a5 Michael Hanselmann
    """
1481 b95479a5 Michael Hanselmann
    return compat.any(job in jobs
1482 b95479a5 Michael Hanselmann
                      for jobs in self._waiters.values())
1483 b95479a5 Michael Hanselmann
1484 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1485 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1486 b95479a5 Michael Hanselmann
    """Checks if a dependency job has the requested status.
1487 b95479a5 Michael Hanselmann

1488 b95479a5 Michael Hanselmann
    If the other job is not yet in a finalized status, the calling job will be
1489 b95479a5 Michael Hanselmann
    notified (re-added to the workerpool) at a later point.
1490 b95479a5 Michael Hanselmann

1491 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1492 b95479a5 Michael Hanselmann
    @param job: Job object
1493 76b62028 Iustin Pop
    @type dep_job_id: int
1494 b95479a5 Michael Hanselmann
    @param dep_job_id: ID of dependency job
1495 b95479a5 Michael Hanselmann
    @type dep_status: list
1496 b95479a5 Michael Hanselmann
    @param dep_status: Required status
1497 b95479a5 Michael Hanselmann

1498 b95479a5 Michael Hanselmann
    """
1499 76b62028 Iustin Pop
    assert ht.TJobId(job.id)
1500 76b62028 Iustin Pop
    assert ht.TJobId(dep_job_id)
1501 b95479a5 Michael Hanselmann
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1502 b95479a5 Michael Hanselmann
1503 b95479a5 Michael Hanselmann
    if job.id == dep_job_id:
1504 b95479a5 Michael Hanselmann
      return (self.ERROR, "Job can't depend on itself")
1505 b95479a5 Michael Hanselmann
1506 b95479a5 Michael Hanselmann
    # Get status of dependency job
1507 b95479a5 Michael Hanselmann
    try:
1508 b95479a5 Michael Hanselmann
      status = self._getstatus_fn(dep_job_id)
1509 b95479a5 Michael Hanselmann
    except errors.JobLost, err:
1510 b95479a5 Michael Hanselmann
      return (self.ERROR, "Dependency error: %s" % err)
1511 b95479a5 Michael Hanselmann
1512 b95479a5 Michael Hanselmann
    assert status in constants.JOB_STATUS_ALL
1513 b95479a5 Michael Hanselmann
1514 b95479a5 Michael Hanselmann
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1515 b95479a5 Michael Hanselmann
1516 b95479a5 Michael Hanselmann
    if status not in constants.JOBS_FINALIZED:
1517 b95479a5 Michael Hanselmann
      # Register for notification and wait for job to finish
1518 b95479a5 Michael Hanselmann
      job_id_waiters.add(job)
1519 b95479a5 Michael Hanselmann
      return (self.WAIT,
1520 b95479a5 Michael Hanselmann
              "Need to wait for job %s, wanted status '%s'" %
1521 b95479a5 Michael Hanselmann
              (dep_job_id, dep_status))
1522 b95479a5 Michael Hanselmann
1523 b95479a5 Michael Hanselmann
    # Remove from waiters list
1524 b95479a5 Michael Hanselmann
    if job in job_id_waiters:
1525 b95479a5 Michael Hanselmann
      job_id_waiters.remove(job)
1526 b95479a5 Michael Hanselmann
1527 b95479a5 Michael Hanselmann
    if (status == constants.JOB_STATUS_CANCELED and
1528 b95479a5 Michael Hanselmann
        constants.JOB_STATUS_CANCELED not in dep_status):
1529 b95479a5 Michael Hanselmann
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1530 b95479a5 Michael Hanselmann
1531 b95479a5 Michael Hanselmann
    elif not dep_status or status in dep_status:
1532 b95479a5 Michael Hanselmann
      return (self.CONTINUE,
1533 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s'" %
1534 b95479a5 Michael Hanselmann
              (dep_job_id, status))
1535 b95479a5 Michael Hanselmann
1536 b95479a5 Michael Hanselmann
    else:
1537 b95479a5 Michael Hanselmann
      return (self.WRONGSTATUS,
1538 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s',"
1539 b95479a5 Michael Hanselmann
              " not one of '%s' as required" %
1540 b95479a5 Michael Hanselmann
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1541 b95479a5 Michael Hanselmann
1542 37d76f1e Michael Hanselmann
  def _RemoveEmptyWaitersUnlocked(self):
1543 37d76f1e Michael Hanselmann
    """Remove all jobs without actual waiters.
1544 37d76f1e Michael Hanselmann

1545 37d76f1e Michael Hanselmann
    """
1546 37d76f1e Michael Hanselmann
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1547 37d76f1e Michael Hanselmann
                   if not waiters]:
1548 37d76f1e Michael Hanselmann
      del self._waiters[job_id]
1549 37d76f1e Michael Hanselmann
1550 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
1551 b95479a5 Michael Hanselmann
    """Notifies all jobs waiting for a certain job ID.
1552 b95479a5 Michael Hanselmann

1553 1316ebc2 Michael Hanselmann
    @attention: Do not call until L{CheckAndRegister} returned a status other
1554 1316ebc2 Michael Hanselmann
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1555 76b62028 Iustin Pop
    @type job_id: int
1556 b95479a5 Michael Hanselmann
    @param job_id: Job ID
1557 b95479a5 Michael Hanselmann

1558 b95479a5 Michael Hanselmann
    """
1559 76b62028 Iustin Pop
    assert ht.TJobId(job_id)
1560 b95479a5 Michael Hanselmann
1561 37d76f1e Michael Hanselmann
    self._lock.acquire()
1562 37d76f1e Michael Hanselmann
    try:
1563 37d76f1e Michael Hanselmann
      self._RemoveEmptyWaitersUnlocked()
1564 37d76f1e Michael Hanselmann
1565 37d76f1e Michael Hanselmann
      jobs = self._waiters.pop(job_id, None)
1566 37d76f1e Michael Hanselmann
    finally:
1567 37d76f1e Michael Hanselmann
      self._lock.release()
1568 37d76f1e Michael Hanselmann
1569 b95479a5 Michael Hanselmann
    if jobs:
1570 b95479a5 Michael Hanselmann
      # Re-add jobs to workerpool
1571 b95479a5 Michael Hanselmann
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1572 b95479a5 Michael Hanselmann
                    len(jobs), job_id)
1573 b95479a5 Michael Hanselmann
      self._enqueue_fn(jobs)
1574 b95479a5 Michael Hanselmann
1575 b95479a5 Michael Hanselmann
1576 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
1577 6c881c52 Iustin Pop
  """Decorator for "public" functions.
1578 ea03467c Iustin Pop

1579 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
1580 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
1581 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
1582 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
1583 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
1584 ea03467c Iustin Pop

1585 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
1586 f1da30e6 Michael Hanselmann

1587 6c881c52 Iustin Pop
  Example::
1588 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
1589 6c881c52 Iustin Pop
    @_RequireOpenQueue
1590 6c881c52 Iustin Pop
    def Example(self):
1591 6c881c52 Iustin Pop
      pass
1592 db37da70 Michael Hanselmann

1593 6c881c52 Iustin Pop
  """
1594 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
1595 b459a848 Andrea Spadaccini
    # pylint: disable=W0212
1596 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
1597 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
1598 6c881c52 Iustin Pop
  return wrapper
1599 db37da70 Michael Hanselmann
1600 db37da70 Michael Hanselmann
1601 c8d0be94 Michael Hanselmann
def _RequireNonDrainedQueue(fn):
1602 c8d0be94 Michael Hanselmann
  """Decorator checking for a non-drained queue.
1603 c8d0be94 Michael Hanselmann

1604 c8d0be94 Michael Hanselmann
  To be used with functions submitting new jobs.
1605 c8d0be94 Michael Hanselmann

1606 c8d0be94 Michael Hanselmann
  """
1607 c8d0be94 Michael Hanselmann
  def wrapper(self, *args, **kwargs):
1608 c8d0be94 Michael Hanselmann
    """Wrapper function.
1609 c8d0be94 Michael Hanselmann

1610 c8d0be94 Michael Hanselmann
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1611 c8d0be94 Michael Hanselmann

1612 c8d0be94 Michael Hanselmann
    """
1613 c8d0be94 Michael Hanselmann
    # Ok when sharing the big job queue lock, as the drain file is created when
1614 c8d0be94 Michael Hanselmann
    # the lock is exclusive.
1615 c8d0be94 Michael Hanselmann
    # Needs access to protected member, pylint: disable=W0212
1616 c8d0be94 Michael Hanselmann
    if self._drained:
1617 c8d0be94 Michael Hanselmann
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1618 6d5ea385 Michael Hanselmann
1619 6d5ea385 Michael Hanselmann
    if not self._accepting_jobs:
1620 6d5ea385 Michael Hanselmann
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1621 6d5ea385 Michael Hanselmann
1622 c8d0be94 Michael Hanselmann
    return fn(self, *args, **kwargs)
1623 c8d0be94 Michael Hanselmann
  return wrapper
1624 c8d0be94 Michael Hanselmann
1625 c8d0be94 Michael Hanselmann
1626 6c881c52 Iustin Pop
class JobQueue(object):
1627 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
1628 db37da70 Michael Hanselmann

1629 6c881c52 Iustin Pop
  """
1630 85f03e0d Michael Hanselmann
  def __init__(self, context):
1631 ea03467c Iustin Pop
    """Constructor for JobQueue.
1632 ea03467c Iustin Pop

1633 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
1634 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
1635 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
1636 ea03467c Iustin Pop
    running).
1637 ea03467c Iustin Pop

1638 ea03467c Iustin Pop
    @type context: GanetiContext
1639 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
1640 ea03467c Iustin Pop
        data and other ganeti objects
1641 ea03467c Iustin Pop

1642 ea03467c Iustin Pop
    """
1643 5bdce580 Michael Hanselmann
    self.context = context
1644 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
1645 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
1646 f1da30e6 Michael Hanselmann
1647 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
1648 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
1649 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
1650 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
1651 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
1652 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
1653 ebb80afa Guido Trotter
1654 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
1655 ebb80afa Guido Trotter
    self.release = self._lock.release
1656 85f03e0d Michael Hanselmann
1657 6d5ea385 Michael Hanselmann
    # Accept jobs by default
1658 6d5ea385 Michael Hanselmann
    self._accepting_jobs = True
1659 6d5ea385 Michael Hanselmann
1660 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
1661 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
1662 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1663 f1da30e6 Michael Hanselmann
1664 04ab05ce Michael Hanselmann
    # Read serial file
1665 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
1666 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
1667 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
1668 c4beba1c Iustin Pop
1669 23752136 Michael Hanselmann
    # Get initial list of nodes
1670 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
1671 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
1672 59303563 Iustin Pop
                       if n.master_candidate)
1673 8e00939c Michael Hanselmann
1674 8e00939c Michael Hanselmann
    # Remove master node
1675 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
1676 23752136 Michael Hanselmann
1677 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
1678 23752136 Michael Hanselmann
1679 6d5ea385 Michael Hanselmann
    self._queue_size = None
1680 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1681 6d5ea385 Michael Hanselmann
    assert ht.TInt(self._queue_size)
1682 ff699aa9 Michael Hanselmann
    self._drained = jstore.CheckDrainFlag()
1683 20571a26 Guido Trotter
1684 b95479a5 Michael Hanselmann
    # Job dependencies
1685 b95479a5 Michael Hanselmann
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1686 b95479a5 Michael Hanselmann
                                        self._EnqueueJobs)
1687 fcb21ad7 Michael Hanselmann
    self.context.glm.AddToLockMonitor(self.depmgr)
1688 b95479a5 Michael Hanselmann
1689 85f03e0d Michael Hanselmann
    # Setup worker pool
1690 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
1691 85f03e0d Michael Hanselmann
    try:
1692 de9d02c7 Michael Hanselmann
      self._InspectQueue()
1693 de9d02c7 Michael Hanselmann
    except:
1694 de9d02c7 Michael Hanselmann
      self._wpool.TerminateWorkers()
1695 de9d02c7 Michael Hanselmann
      raise
1696 711b5124 Michael Hanselmann
1697 de9d02c7 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1698 de9d02c7 Michael Hanselmann
  @_RequireOpenQueue
1699 de9d02c7 Michael Hanselmann
  def _InspectQueue(self):
1700 de9d02c7 Michael Hanselmann
    """Loads the whole job queue and resumes unfinished jobs.
1701 de9d02c7 Michael Hanselmann

1702 de9d02c7 Michael Hanselmann
    This function needs the lock here because WorkerPool.AddTask() may start a
1703 de9d02c7 Michael Hanselmann
    job while we're still doing our work.
1704 711b5124 Michael Hanselmann

1705 de9d02c7 Michael Hanselmann
    """
1706 de9d02c7 Michael Hanselmann
    logging.info("Inspecting job queue")
1707 de9d02c7 Michael Hanselmann
1708 7b5c4a69 Michael Hanselmann
    restartjobs = []
1709 7b5c4a69 Michael Hanselmann
1710 de9d02c7 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked()
1711 de9d02c7 Michael Hanselmann
    jobs_count = len(all_job_ids)
1712 de9d02c7 Michael Hanselmann
    lastinfo = time.time()
1713 de9d02c7 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1714 de9d02c7 Michael Hanselmann
      # Give an update every 1000 jobs or 10 seconds
1715 de9d02c7 Michael Hanselmann
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1716 de9d02c7 Michael Hanselmann
          idx == (jobs_count - 1)):
1717 de9d02c7 Michael Hanselmann
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1718 de9d02c7 Michael Hanselmann
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1719 711b5124 Michael Hanselmann
        lastinfo = time.time()
1720 94ed59a5 Iustin Pop
1721 de9d02c7 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1722 85f03e0d Michael Hanselmann
1723 de9d02c7 Michael Hanselmann
      # a failure in loading the job can cause 'None' to be returned
1724 de9d02c7 Michael Hanselmann
      if job is None:
1725 de9d02c7 Michael Hanselmann
        continue
1726 85f03e0d Michael Hanselmann
1727 de9d02c7 Michael Hanselmann
      status = job.CalcStatus()
1728 711b5124 Michael Hanselmann
1729 320d1daf Michael Hanselmann
      if status == constants.JOB_STATUS_QUEUED:
1730 7b5c4a69 Michael Hanselmann
        restartjobs.append(job)
1731 de9d02c7 Michael Hanselmann
1732 de9d02c7 Michael Hanselmann
      elif status in (constants.JOB_STATUS_RUNNING,
1733 47099cd1 Michael Hanselmann
                      constants.JOB_STATUS_WAITING,
1734 de9d02c7 Michael Hanselmann
                      constants.JOB_STATUS_CANCELING):
1735 de9d02c7 Michael Hanselmann
        logging.warning("Unfinished job %s found: %s", job.id, job)
1736 320d1daf Michael Hanselmann
1737 47099cd1 Michael Hanselmann
        if status == constants.JOB_STATUS_WAITING:
1738 320d1daf Michael Hanselmann
          # Restart job
1739 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1740 320d1daf Michael Hanselmann
          restartjobs.append(job)
1741 320d1daf Michael Hanselmann
        else:
1742 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1743 320d1daf Michael Hanselmann
                                "Unclean master daemon shutdown")
1744 45df0793 Michael Hanselmann
          job.Finalize()
1745 320d1daf Michael Hanselmann
1746 de9d02c7 Michael Hanselmann
        self.UpdateJobUnlocked(job)
1747 de9d02c7 Michael Hanselmann
1748 7b5c4a69 Michael Hanselmann
    if restartjobs:
1749 7b5c4a69 Michael Hanselmann
      logging.info("Restarting %s jobs", len(restartjobs))
1750 75d81fc8 Michael Hanselmann
      self._EnqueueJobsUnlocked(restartjobs)
1751 7b5c4a69 Michael Hanselmann
1752 de9d02c7 Michael Hanselmann
    logging.info("Job queue inspection finished")
1753 85f03e0d Michael Hanselmann
1754 fb1ffbca Michael Hanselmann
  def _GetRpc(self, address_list):
1755 fb1ffbca Michael Hanselmann
    """Gets RPC runner with context.
1756 fb1ffbca Michael Hanselmann

1757 fb1ffbca Michael Hanselmann
    """
1758 fb1ffbca Michael Hanselmann
    return rpc.JobQueueRunner(self.context, address_list)
1759 fb1ffbca Michael Hanselmann
1760 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1761 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1762 99aabbed Iustin Pop
  def AddNode(self, node):
1763 99aabbed Iustin Pop
    """Register a new node with the queue.
1764 99aabbed Iustin Pop

1765 99aabbed Iustin Pop
    @type node: L{objects.Node}
1766 99aabbed Iustin Pop
    @param node: the node object to be added
1767 99aabbed Iustin Pop

1768 99aabbed Iustin Pop
    """
1769 99aabbed Iustin Pop
    node_name = node.name
1770 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1771 23752136 Michael Hanselmann
1772 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1773 fb1ffbca Michael Hanselmann
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1774 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1775 c8457ce7 Iustin Pop
    if msg:
1776 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1777 c8457ce7 Iustin Pop
                      node_name, msg)
1778 23752136 Michael Hanselmann
1779 59303563 Iustin Pop
    if not node.master_candidate:
1780 59303563 Iustin Pop
      # remove if existing, ignoring errors
1781 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1782 59303563 Iustin Pop
      # and skip the replication of the job ids
1783 59303563 Iustin Pop
      return
1784 59303563 Iustin Pop
1785 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1786 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1787 23752136 Michael Hanselmann
1788 d2e03a33 Michael Hanselmann
    # Upload current serial file
1789 e2b4a7ba Michael Hanselmann
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1790 d2e03a33 Michael Hanselmann
1791 fb1ffbca Michael Hanselmann
    # Static address list
1792 fb1ffbca Michael Hanselmann
    addrs = [node.primary_ip]
1793 fb1ffbca Michael Hanselmann
1794 d2e03a33 Michael Hanselmann
    for file_name in files:
1795 9f774ee8 Michael Hanselmann
      # Read file content
1796 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1797 9f774ee8 Michael Hanselmann
1798 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1799 cffbbae7 Michael Hanselmann
                             file_name, content)
1800 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1801 c8457ce7 Iustin Pop
      if msg:
1802 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1803 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1804 d2e03a33 Michael Hanselmann
1805 be6c403e Michael Hanselmann
    # Set queue drained flag
1806 be6c403e Michael Hanselmann
    result = \
1807 be6c403e Michael Hanselmann
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1808 be6c403e Michael Hanselmann
                                                       self._drained)
1809 be6c403e Michael Hanselmann
    msg = result[node_name].fail_msg
1810 be6c403e Michael Hanselmann
    if msg:
1811 be6c403e Michael Hanselmann
      logging.error("Failed to set queue drained flag on node %s: %s",
1812 be6c403e Michael Hanselmann
                    node_name, msg)
1813 be6c403e Michael Hanselmann
1814 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1815 d2e03a33 Michael Hanselmann
1816 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1817 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1818 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1819 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1820 ea03467c Iustin Pop

1821 ea03467c Iustin Pop
    @type node_name: str
1822 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1823 ea03467c Iustin Pop

1824 ea03467c Iustin Pop
    """
1825 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1826 23752136 Michael Hanselmann
1827 7e950d31 Iustin Pop
  @staticmethod
1828 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1829 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1830 ea03467c Iustin Pop

1831 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1832 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1833 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1834 ea03467c Iustin Pop

1835 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1836 ea03467c Iustin Pop
    @type nodes: list
1837 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1838 ea03467c Iustin Pop
    @type failmsg: str
1839 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1840 ea03467c Iustin Pop

1841 ea03467c Iustin Pop
    """
1842 e74798c1 Michael Hanselmann
    failed = []
1843 e74798c1 Michael Hanselmann
    success = []
1844 e74798c1 Michael Hanselmann
1845 e74798c1 Michael Hanselmann
    for node in nodes:
1846 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1847 c8457ce7 Iustin Pop
      if msg:
1848 e74798c1 Michael Hanselmann
        failed.append(node)
1849 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1850 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1851 c8457ce7 Iustin Pop
      else:
1852 c8457ce7 Iustin Pop
        success.append(node)
1853 e74798c1 Michael Hanselmann
1854 e74798c1 Michael Hanselmann
    # +1 for the master node
1855 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1856 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1857 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1858 e74798c1 Michael Hanselmann
1859 99aabbed Iustin Pop
  def _GetNodeIp(self):
1860 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1861 99aabbed Iustin Pop

1862 ea03467c Iustin Pop
    @rtype: (list, list)
1863 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1864 ea03467c Iustin Pop
        names and the second one with the node addresses
1865 ea03467c Iustin Pop

1866 99aabbed Iustin Pop
    """
1867 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1868 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1869 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1870 99aabbed Iustin Pop
    return name_list, addr_list
1871 99aabbed Iustin Pop
1872 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1873 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1874 8e00939c Michael Hanselmann

1875 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1876 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1877 ea03467c Iustin Pop

1878 ea03467c Iustin Pop
    @type file_name: str
1879 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1880 ea03467c Iustin Pop
    @type data: str
1881 ea03467c Iustin Pop
    @param data: the new contents of the file
1882 4c36bdf5 Guido Trotter
    @type replicate: boolean
1883 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1884 ea03467c Iustin Pop

1885 8e00939c Michael Hanselmann
    """
1886 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1887 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1888 fe05a931 Michele Tartara
                    gid=getents.daemons_gid,
1889 fe05a931 Michele Tartara
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1890 8e00939c Michael Hanselmann
1891 4c36bdf5 Guido Trotter
    if replicate:
1892 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1893 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1894 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1895 23752136 Michael Hanselmann
1896 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1897 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1898 ea03467c Iustin Pop

1899 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1900 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1901 ea03467c Iustin Pop

1902 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1903 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1904 ea03467c Iustin Pop

1905 ea03467c Iustin Pop
    """
1906 dd875d32 Michael Hanselmann
    # Rename them locally
1907 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1908 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1909 abc1f2ce Michael Hanselmann
1910 dd875d32 Michael Hanselmann
    # ... and on all nodes
1911 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1912 fb1ffbca Michael Hanselmann
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1913 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1914 abc1f2ce Michael Hanselmann
1915 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1916 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1917 f1da30e6 Michael Hanselmann

1918 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1919 f1da30e6 Michael Hanselmann

1920 009e73d0 Iustin Pop
    @type count: integer
1921 009e73d0 Iustin Pop
    @param count: how many serials to return
1922 76b62028 Iustin Pop
    @rtype: list of int
1923 76b62028 Iustin Pop
    @return: a list of job identifiers.
1924 f1da30e6 Michael Hanselmann

1925 f1da30e6 Michael Hanselmann
    """
1926 2c9fa1ff Iustin Pop
    assert ht.TNonNegativeInt(count)
1927 719f8fba Michael Hanselmann
1928 f1da30e6 Michael Hanselmann
    # New number
1929 009e73d0 Iustin Pop
    serial = self._last_serial + count
1930 f1da30e6 Michael Hanselmann
1931 f1da30e6 Michael Hanselmann
    # Write to file
1932 e2b4a7ba Michael Hanselmann
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1933 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1934 f1da30e6 Michael Hanselmann
1935 1410a389 Michael Hanselmann
    result = [jstore.FormatJobID(v)
1936 3c88bf36 Michael Hanselmann
              for v in range(self._last_serial + 1, serial + 1)]
1937 3c88bf36 Michael Hanselmann
1938 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1939 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1940 f1da30e6 Michael Hanselmann
1941 3c88bf36 Michael Hanselmann
    assert len(result) == count
1942 3c88bf36 Michael Hanselmann
1943 009e73d0 Iustin Pop
    return result
1944 f1da30e6 Michael Hanselmann
1945 85f03e0d Michael Hanselmann
  @staticmethod
1946 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1947 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1948 ea03467c Iustin Pop

1949 ea03467c Iustin Pop
    @type job_id: str
1950 ea03467c Iustin Pop
    @param job_id: the job identifier
1951 ea03467c Iustin Pop
    @rtype: str
1952 ea03467c Iustin Pop
    @return: the path to the job file
1953 ea03467c Iustin Pop

1954 ea03467c Iustin Pop
    """
1955 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1956 f1da30e6 Michael Hanselmann
1957 1410a389 Michael Hanselmann
  @staticmethod
1958 1410a389 Michael Hanselmann
  def _GetArchivedJobPath(job_id):
1959 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1960 ea03467c Iustin Pop

1961 ea03467c Iustin Pop
    @type job_id: str
1962 ea03467c Iustin Pop
    @param job_id: the job identifier
1963 ea03467c Iustin Pop
    @rtype: str
1964 ea03467c Iustin Pop
    @return: the path to the archived job file
1965 ea03467c Iustin Pop

1966 ea03467c Iustin Pop
    """
1967 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1968 1410a389 Michael Hanselmann
                          jstore.GetArchiveDirectory(job_id),
1969 1410a389 Michael Hanselmann
                          "job-%s" % job_id)
1970 0cb94105 Michael Hanselmann
1971 cb66225d Michael Hanselmann
  @staticmethod
1972 0422250e Michael Hanselmann
  def _DetermineJobDirectories(archived):
1973 bb921668 Michael Hanselmann
    """Build list of directories containing job files.
1974 bb921668 Michael Hanselmann

1975 bb921668 Michael Hanselmann
    @type archived: bool
1976 bb921668 Michael Hanselmann
    @param archived: Whether to include directories for archived jobs
1977 bb921668 Michael Hanselmann
    @rtype: list
1978 bb921668 Michael Hanselmann

1979 bb921668 Michael Hanselmann
    """
1980 0422250e Michael Hanselmann
    result = [pathutils.QUEUE_DIR]
1981 0422250e Michael Hanselmann
1982 0422250e Michael Hanselmann
    if archived:
1983 0422250e Michael Hanselmann
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1984 0422250e Michael Hanselmann
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1985 0422250e Michael Hanselmann
                        utils.ListVisibleFiles(archive_path)))
1986 0422250e Michael Hanselmann
1987 0422250e Michael Hanselmann
    return result
1988 0422250e Michael Hanselmann
1989 0422250e Michael Hanselmann
  @classmethod
1990 0422250e Michael Hanselmann
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1991 911a495b Iustin Pop
    """Return all known job IDs.
1992 911a495b Iustin Pop

1993 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1994 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1995 ac0930b9 Iustin Pop
    extra IDs).
1996 ac0930b9 Iustin Pop

1997 85a1c57d Guido Trotter
    @type sort: boolean
1998 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1999 ea03467c Iustin Pop
    @rtype: list
2000 ea03467c Iustin Pop
    @return: the list of job IDs
2001 ea03467c Iustin Pop

2002 911a495b Iustin Pop
    """
2003 85a1c57d Guido Trotter
    jlist = []
2004 0422250e Michael Hanselmann
2005 0422250e Michael Hanselmann
    for path in cls._DetermineJobDirectories(archived):
2006 0422250e Michael Hanselmann
      for filename in utils.ListVisibleFiles(path):
2007 0422250e Michael Hanselmann
        m = constants.JOB_FILE_RE.match(filename)
2008 0422250e Michael Hanselmann
        if m:
2009 0422250e Michael Hanselmann
          jlist.append(int(m.group(1)))
2010 0422250e Michael Hanselmann
2011 85a1c57d Guido Trotter
    if sort:
2012 76b62028 Iustin Pop
      jlist.sort()
2013 f0d874fe Iustin Pop
    return jlist
2014 911a495b Iustin Pop
2015 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
2016 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
2017 ea03467c Iustin Pop

2018 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
2019 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
2020 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
2021 ea03467c Iustin Pop

2022 76b62028 Iustin Pop
    @type job_id: int
2023 ea03467c Iustin Pop
    @param job_id: the job id
2024 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
2025 ea03467c Iustin Pop
    @return: either None or the job object
2026 ea03467c Iustin Pop

2027 ea03467c Iustin Pop
    """
2028 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
2029 5685c1a5 Michael Hanselmann
    if job:
2030 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
2031 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Found read-only job in memcache"
2032 5685c1a5 Michael Hanselmann
      return job
2033 ac0930b9 Iustin Pop
2034 3d6c5566 Guido Trotter
    try:
2035 194c8ca4 Michael Hanselmann
      job = self._LoadJobFromDisk(job_id, False)
2036 aa9f8167 Iustin Pop
      if job is None:
2037 aa9f8167 Iustin Pop
        return job
2038 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
2039 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
2040 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
2041 3d6c5566 Guido Trotter
      if old_path == new_path:
2042 3d6c5566 Guido Trotter
        # job already archived (future case)
2043 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
2044 3d6c5566 Guido Trotter
      else:
2045 3d6c5566 Guido Trotter
        # non-archived case
2046 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
2047 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
2048 3d6c5566 Guido Trotter
      return None
2049 162c8636 Guido Trotter
2050 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Job just loaded is not writable"
2051 c0f6d0d8 Michael Hanselmann
2052 162c8636 Guido Trotter
    self._memcache[job_id] = job
2053 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
2054 162c8636 Guido Trotter
    return job
2055 162c8636 Guido Trotter
2056 c0f6d0d8 Michael Hanselmann
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2057 162c8636 Guido Trotter
    """Load the given job file from disk.
2058 162c8636 Guido Trotter

2059 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2060 162c8636 Guido Trotter

2061 76b62028 Iustin Pop
    @type job_id: int
2062 162c8636 Guido Trotter
    @param job_id: job identifier
2063 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2064 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2065 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
2066 162c8636 Guido Trotter
    @return: either None or the job object
2067 162c8636 Guido Trotter

2068 162c8636 Guido Trotter
    """
2069 8a3cd185 Michael Hanselmann
    path_functions = [(self._GetJobPath, False)]
2070 194c8ca4 Michael Hanselmann
2071 194c8ca4 Michael Hanselmann
    if try_archived:
2072 8a3cd185 Michael Hanselmann
      path_functions.append((self._GetArchivedJobPath, True))
2073 194c8ca4 Michael Hanselmann
2074 194c8ca4 Michael Hanselmann
    raw_data = None
2075 8a3cd185 Michael Hanselmann
    archived = None
2076 194c8ca4 Michael Hanselmann
2077 8a3cd185 Michael Hanselmann
    for (fn, archived) in path_functions:
2078 194c8ca4 Michael Hanselmann
      filepath = fn(job_id)
2079 194c8ca4 Michael Hanselmann
      logging.debug("Loading job from %s", filepath)
2080 194c8ca4 Michael Hanselmann
      try:
2081 194c8ca4 Michael Hanselmann
        raw_data = utils.ReadFile(filepath)
2082 194c8ca4 Michael Hanselmann
      except EnvironmentError, err:
2083 194c8ca4 Michael Hanselmann
        if err.errno != errno.ENOENT:
2084 194c8ca4 Michael Hanselmann
          raise
2085 194c8ca4 Michael Hanselmann
      else:
2086 194c8ca4 Michael Hanselmann
        break
2087 194c8ca4 Michael Hanselmann
2088 194c8ca4 Michael Hanselmann
    if not raw_data:
2089 194c8ca4 Michael Hanselmann
      return None
2090 13998ef2 Michael Hanselmann
2091 c0f6d0d8 Michael Hanselmann
    if writable is None:
2092 8a3cd185 Michael Hanselmann
      writable = not archived
2093 c0f6d0d8 Michael Hanselmann
2094 94ed59a5 Iustin Pop
    try:
2095 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
2096 8a3cd185 Michael Hanselmann
      job = _QueuedJob.Restore(self, data, writable, archived)
2097 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
2098 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
2099 94ed59a5 Iustin Pop
2100 ac0930b9 Iustin Pop
    return job
2101 f1da30e6 Michael Hanselmann
2102 c0f6d0d8 Michael Hanselmann
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2103 0f9c08dc Guido Trotter
    """Load the given job file from disk.
2104 0f9c08dc Guido Trotter

2105 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2106 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
2107 0f9c08dc Guido Trotter
    exception is logged.
2108 0f9c08dc Guido Trotter

2109 76b62028 Iustin Pop
    @type job_id: int
2110 0f9c08dc Guido Trotter
    @param job_id: job identifier
2111 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2112 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2113 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
2114 0f9c08dc Guido Trotter
    @return: either None or the job object
2115 0f9c08dc Guido Trotter

2116 0f9c08dc Guido Trotter
    """
2117 0f9c08dc Guido Trotter
    try:
2118 c0f6d0d8 Michael Hanselmann
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2119 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
2120 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
2121 0f9c08dc Guido Trotter
      return None
2122 0f9c08dc Guido Trotter
2123 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
2124 20571a26 Guido Trotter
    """Update the queue size.
2125 20571a26 Guido Trotter

2126 20571a26 Guido Trotter
    """
2127 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2128 20571a26 Guido Trotter
2129 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2130 20571a26 Guido Trotter
  @_RequireOpenQueue
2131 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
2132 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
2133 3ccafd0e Iustin Pop

2134 ea03467c Iustin Pop
    @type drain_flag: boolean
2135 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
2136 ea03467c Iustin Pop

2137 3ccafd0e Iustin Pop
    """
2138 be6c403e Michael Hanselmann
    # Change flag locally
2139 ff699aa9 Michael Hanselmann
    jstore.SetDrainFlag(drain_flag)
2140 20571a26 Guido Trotter
2141 20571a26 Guido Trotter
    self._drained = drain_flag
2142 20571a26 Guido Trotter
2143 be6c403e Michael Hanselmann
    # ... and on all nodes
2144 be6c403e Michael Hanselmann
    (names, addrs) = self._GetNodeIp()
2145 be6c403e Michael Hanselmann
    result = \
2146 be6c403e Michael Hanselmann
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2147 be6c403e Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
2148 be6c403e Michael Hanselmann
                         "Setting queue drain flag to %s" % drain_flag)
2149 be6c403e Michael Hanselmann
2150 3ccafd0e Iustin Pop
    return True
2151 3ccafd0e Iustin Pop
2152 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2153 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
2154 85f03e0d Michael Hanselmann
    """Create and store a new job.
2155 f1da30e6 Michael Hanselmann

2156 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
2157 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
2158 c3f0a12f Iustin Pop

2159 009e73d0 Iustin Pop
    @type job_id: job ID
2160 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
2161 c3f0a12f Iustin Pop
    @type ops: list
2162 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
2163 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
2164 7beb1e53 Guido Trotter
    @return: the job object to be queued
2165 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2166 e71c8147 Michael Hanselmann
    @raise errors.GenericError: If an opcode is not valid
2167 c3f0a12f Iustin Pop

2168 c3f0a12f Iustin Pop
    """
2169 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2170 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
2171 f87b405e Michael Hanselmann
2172 c0f6d0d8 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops, True)
2173 f1da30e6 Michael Hanselmann
2174 e71c8147 Michael Hanselmann
    for idx, op in enumerate(job.ops):
2175 42d49574 Michael Hanselmann
      # Check priority
2176 e71c8147 Michael Hanselmann
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2177 e71c8147 Michael Hanselmann
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2178 e71c8147 Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2179 e71c8147 Michael Hanselmann
                                  " are %s" % (idx, op.priority, allowed))
2180 e71c8147 Michael Hanselmann
2181 42d49574 Michael Hanselmann
      # Check job dependencies
2182 b247c6fc Michael Hanselmann
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2183 b247c6fc Michael Hanselmann
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2184 b247c6fc Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2185 b247c6fc Michael Hanselmann
                                  " match %s: %s" %
2186 b247c6fc Michael Hanselmann
                                  (idx, opcodes.TNoRelativeJobDependencies,
2187 b247c6fc Michael Hanselmann
                                   dependencies))
2188 b247c6fc Michael Hanselmann
2189 f1da30e6 Michael Hanselmann
    # Write to disk
2190 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
2191 f1da30e6 Michael Hanselmann
2192 20571a26 Guido Trotter
    self._queue_size += 1
2193 20571a26 Guido Trotter
2194 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
2195 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
2196 ac0930b9 Iustin Pop
2197 7beb1e53 Guido Trotter
    return job
2198 f1da30e6 Michael Hanselmann
2199 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2200 2971c913 Iustin Pop
  @_RequireOpenQueue
2201 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2202 2971c913 Iustin Pop
  def SubmitJob(self, ops):
2203 2971c913 Iustin Pop
    """Create and store a new job.
2204 2971c913 Iustin Pop

2205 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2206 2971c913 Iustin Pop

2207 2971c913 Iustin Pop
    """
2208 b247c6fc Michael Hanselmann
    (job_id, ) = self._NewSerialsUnlocked(1)
2209 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2210 7beb1e53 Guido Trotter
    return job_id
2211 2971c913 Iustin Pop
2212 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2213 2971c913 Iustin Pop
  @_RequireOpenQueue
2214 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2215 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
2216 2971c913 Iustin Pop
    """Create and store multiple jobs.
2217 2971c913 Iustin Pop

2218 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2219 2971c913 Iustin Pop

2220 2971c913 Iustin Pop
    """
2221 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2222 b247c6fc Michael Hanselmann
2223 b247c6fc Michael Hanselmann
    (results, added_jobs) = \
2224 b247c6fc Michael Hanselmann
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2225 7b5c4a69 Michael Hanselmann
2226 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked(added_jobs)
2227 2971c913 Iustin Pop
2228 2971c913 Iustin Pop
    return results
2229 2971c913 Iustin Pop
2230 b247c6fc Michael Hanselmann
  @staticmethod
2231 b247c6fc Michael Hanselmann
  def _FormatSubmitError(msg, ops):
2232 b247c6fc Michael Hanselmann
    """Formats errors which occurred while submitting a job.
2233 b247c6fc Michael Hanselmann

2234 b247c6fc Michael Hanselmann
    """
2235 b247c6fc Michael Hanselmann
    return ("%s; opcodes %s" %
2236 b247c6fc Michael Hanselmann
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2237 b247c6fc Michael Hanselmann
2238 b247c6fc Michael Hanselmann
  @staticmethod
2239 b247c6fc Michael Hanselmann
  def _ResolveJobDependencies(resolve_fn, deps):
2240 b247c6fc Michael Hanselmann
    """Resolves relative job IDs in dependencies.
2241 b247c6fc Michael Hanselmann

2242 b247c6fc Michael Hanselmann
    @type resolve_fn: callable
2243 b247c6fc Michael Hanselmann
    @param resolve_fn: Function to resolve a relative job ID
2244 b247c6fc Michael Hanselmann
    @type deps: list
2245 b247c6fc Michael Hanselmann
    @param deps: Dependencies
2246 4c27b231 Michael Hanselmann
    @rtype: tuple; (boolean, string or list)
2247 4c27b231 Michael Hanselmann
    @return: If successful (first tuple item), the returned list contains
2248 4c27b231 Michael Hanselmann
      resolved job IDs along with the requested status; if not successful,
2249 4c27b231 Michael Hanselmann
      the second element is an error message
2250 b247c6fc Michael Hanselmann

2251 b247c6fc Michael Hanselmann
    """
2252 b247c6fc Michael Hanselmann
    result = []
2253 b247c6fc Michael Hanselmann
2254 b247c6fc Michael Hanselmann
    for (dep_job_id, dep_status) in deps:
2255 b247c6fc Michael Hanselmann
      if ht.TRelativeJobId(dep_job_id):
2256 b247c6fc Michael Hanselmann
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2257 b247c6fc Michael Hanselmann
        try:
2258 b247c6fc Michael Hanselmann
          job_id = resolve_fn(dep_job_id)
2259 b247c6fc Michael Hanselmann
        except IndexError:
2260 b247c6fc Michael Hanselmann
          # Abort
2261 b247c6fc Michael Hanselmann
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2262 b247c6fc Michael Hanselmann
      else:
2263 b247c6fc Michael Hanselmann
        job_id = dep_job_id
2264 b247c6fc Michael Hanselmann
2265 b247c6fc Michael Hanselmann
      result.append((job_id, dep_status))
2266 b247c6fc Michael Hanselmann
2267 b247c6fc Michael Hanselmann
    return (True, result)
2268 b247c6fc Michael Hanselmann
2269 b247c6fc Michael Hanselmann
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2270 b247c6fc Michael Hanselmann
    """Create and store multiple jobs.
2271 b247c6fc Michael Hanselmann

2272 b247c6fc Michael Hanselmann
    @see: L{_SubmitJobUnlocked}
2273 b247c6fc Michael Hanselmann

2274 b247c6fc Michael Hanselmann
    """
2275 b247c6fc Michael Hanselmann
    results = []
2276 b247c6fc Michael Hanselmann
    added_jobs = []
2277 b247c6fc Michael Hanselmann
2278 b247c6fc Michael Hanselmann
    def resolve_fn(job_idx, reljobid):
2279 b247c6fc Michael Hanselmann
      assert reljobid < 0
2280 b247c6fc Michael Hanselmann
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2281 b247c6fc Michael Hanselmann
2282 b247c6fc Michael Hanselmann
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2283 b247c6fc Michael Hanselmann
      for op in ops:
2284 b247c6fc Michael Hanselmann
        if getattr(op, opcodes.DEPEND_ATTR, None):
2285 b247c6fc Michael Hanselmann
          (status, data) = \
2286 b247c6fc Michael Hanselmann
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2287 b247c6fc Michael Hanselmann
                                         op.depends)
2288 b247c6fc Michael Hanselmann
          if not status:
2289 b247c6fc Michael Hanselmann
            # Abort resolving dependencies
2290 b247c6fc Michael Hanselmann
            assert ht.TNonEmptyString(data), "No error message"
2291 b247c6fc Michael Hanselmann
            break
2292 b247c6fc Michael Hanselmann
          # Use resolved dependencies
2293 b247c6fc Michael Hanselmann
          op.depends = data
2294 b247c6fc Michael Hanselmann
      else:
2295 b247c6fc Michael Hanselmann
        try:
2296 b247c6fc Michael Hanselmann
          job = self._SubmitJobUnlocked(job_id, ops)
2297 b247c6fc Michael Hanselmann
        except errors.GenericError, err:
2298 b247c6fc Michael Hanselmann
          status = False
2299 b247c6fc Michael Hanselmann
          data = self._FormatSubmitError(str(err), ops)
2300 b247c6fc Michael Hanselmann
        else:
2301 b247c6fc Michael Hanselmann
          status = True
2302 b247c6fc Michael Hanselmann
          data = job_id
2303 b247c6fc Michael Hanselmann
          added_jobs.append(job)
2304 b247c6fc Michael Hanselmann
2305 b247c6fc Michael Hanselmann
      results.append((status, data))
2306 b247c6fc Michael Hanselmann
2307 b247c6fc Michael Hanselmann
    return (results, added_jobs)
2308 b247c6fc Michael Hanselmann
2309 75d81fc8 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2310 7b5c4a69 Michael Hanselmann
  def _EnqueueJobs(self, jobs):
2311 7b5c4a69 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2312 7b5c4a69 Michael Hanselmann

2313 7b5c4a69 Michael Hanselmann
    @type jobs: list
2314 7b5c4a69 Michael Hanselmann
    @param jobs: List of all jobs
2315 7b5c4a69 Michael Hanselmann

2316 7b5c4a69 Michael Hanselmann
    """
2317 75d81fc8 Michael Hanselmann
    return self._EnqueueJobsUnlocked(jobs)
2318 75d81fc8 Michael Hanselmann
2319 75d81fc8 Michael Hanselmann
  def _EnqueueJobsUnlocked(self, jobs):
2320 75d81fc8 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2321 75d81fc8 Michael Hanselmann

2322 75d81fc8 Michael Hanselmann
    @type jobs: list
2323 75d81fc8 Michael Hanselmann
    @param jobs: List of all jobs
2324 75d81fc8 Michael Hanselmann

2325 75d81fc8 Michael Hanselmann
    """
2326 75d81fc8 Michael Hanselmann
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2327 7b5c4a69 Michael Hanselmann
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2328 99fb250b Michael Hanselmann
                             priority=[job.CalcPriority() for job in jobs],
2329 99fb250b Michael Hanselmann
                             task_id=map(_GetIdAttr, jobs))
2330 7b5c4a69 Michael Hanselmann
2331 b95479a5 Michael Hanselmann
  def _GetJobStatusForDependencies(self, job_id):
2332 b95479a5 Michael Hanselmann
    """Gets the status of a job for dependencies.
2333 b95479a5 Michael Hanselmann

2334 76b62028 Iustin Pop
    @type job_id: int
2335 b95479a5 Michael Hanselmann
    @param job_id: Job ID
2336 b95479a5 Michael Hanselmann
    @raise errors.JobLost: If job can't be found
2337 b95479a5 Michael Hanselmann

2338 b95479a5 Michael Hanselmann
    """
2339 b95479a5 Michael Hanselmann
    # Not using in-memory cache as doing so would require an exclusive lock
2340 b95479a5 Michael Hanselmann
2341 b95479a5 Michael Hanselmann
    # Try to load from disk
2342 c0f6d0d8 Michael Hanselmann
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2343 c0f6d0d8 Michael Hanselmann
2344 17385bd2 Andrea Spadaccini
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2345 b95479a5 Michael Hanselmann
2346 b95479a5 Michael Hanselmann
    if job:
2347 b95479a5 Michael Hanselmann
      return job.CalcStatus()
2348 b95479a5 Michael Hanselmann
2349 b95479a5 Michael Hanselmann
    raise errors.JobLost("Job %s not found" % job_id)
2350 b95479a5 Michael Hanselmann
2351 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2352 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
2353 ea03467c Iustin Pop
    """Update a job's on disk storage.
2354 ea03467c Iustin Pop

2355 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
2356 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
2357 ea03467c Iustin Pop
    nodes.
2358 ea03467c Iustin Pop

2359 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
2360 ea03467c Iustin Pop
    @param job: the changed job
2361 4c36bdf5 Guido Trotter
    @type replicate: boolean
2362 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
2363 ea03467c Iustin Pop

2364 ea03467c Iustin Pop
    """
2365 66bd7445 Michael Hanselmann
    if __debug__:
2366 66bd7445 Michael Hanselmann
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2367 66bd7445 Michael Hanselmann
      assert (finalized ^ (job.end_timestamp is None))
2368 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't update read-only job"
2369 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't update archived job"
2370 66bd7445 Michael Hanselmann
2371 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
2372 a182a3ed Michael Hanselmann
    data = serializer.DumpJson(job.Serialize())
2373 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
2374 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
2375 ac0930b9 Iustin Pop
2376 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2377 5c735209 Iustin Pop
                        timeout):
2378 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
2379 6c5a7090 Michael Hanselmann

2380 76b62028 Iustin Pop
    @type job_id: int
2381 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
2382 6c5a7090 Michael Hanselmann
    @type fields: list of strings
2383 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
2384 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
2385 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
2386 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
2387 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
2388 5c735209 Iustin Pop
    @type timeout: float
2389 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
2390 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
2391 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
2392 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
2393 ea03467c Iustin Pop

2394 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
2395 ea03467c Iustin Pop
        we instead return a special value,
2396 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2397 ea03467c Iustin Pop
        as such by the clients
2398 6c5a7090 Michael Hanselmann

2399 6c5a7090 Michael Hanselmann
    """
2400 04569469 Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2401 c0f6d0d8 Michael Hanselmann
                             writable=False)
2402 989a8bee Michael Hanselmann
2403 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
2404 989a8bee Michael Hanselmann
2405 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
2406 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
2407 dfe57c22 Michael Hanselmann
2408 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2409 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2410 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
2411 188c5e0a Michael Hanselmann
    """Cancels a job.
2412 188c5e0a Michael Hanselmann

2413 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
2414 ea03467c Iustin Pop

2415 76b62028 Iustin Pop
    @type job_id: int
2416 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
2417 188c5e0a Michael Hanselmann

2418 188c5e0a Michael Hanselmann
    """
2419 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
2420 188c5e0a Michael Hanselmann
2421 aebd0e4e Michael Hanselmann
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2422 aebd0e4e Michael Hanselmann
2423 4679547e Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2424 4679547e Michael Hanselmann
  @_RequireOpenQueue
2425 4679547e Michael Hanselmann
  def ChangeJobPriority(self, job_id, priority):
2426 4679547e Michael Hanselmann
    """Changes a job's priority.
2427 4679547e Michael Hanselmann

2428 4679547e Michael Hanselmann
    @type job_id: int
2429 4679547e Michael Hanselmann
    @param job_id: ID of the job whose priority should be changed
2430 4679547e Michael Hanselmann
    @type priority: int
2431 4679547e Michael Hanselmann
    @param priority: New priority
2432 4679547e Michael Hanselmann

2433 4679547e Michael Hanselmann
    """
2434 4679547e Michael Hanselmann
    logging.info("Changing priority of job %s to %s", job_id, priority)
2435 4679547e Michael Hanselmann
2436 4679547e Michael Hanselmann
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2437 4679547e Michael Hanselmann
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2438 4679547e Michael Hanselmann
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2439 4679547e Michael Hanselmann
                                (priority, allowed))
2440 4679547e Michael Hanselmann
2441 4679547e Michael Hanselmann
    def fn(job):
2442 4679547e Michael Hanselmann
      (success, msg) = job.ChangePriority(priority)
2443 4679547e Michael Hanselmann
2444 4679547e Michael Hanselmann
      if success:
2445 4679547e Michael Hanselmann
        try:
2446 4679547e Michael Hanselmann
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2447 4679547e Michael Hanselmann
        except workerpool.NoSuchTask:
2448 4679547e Michael Hanselmann
          logging.debug("Job %s is not in workerpool at this time", job.id)
2449 4679547e Michael Hanselmann
2450 4679547e Michael Hanselmann
      return (success, msg)
2451 4679547e Michael Hanselmann
2452 4679547e Michael Hanselmann
    return self._ModifyJobUnlocked(job_id, fn)
2453 4679547e Michael Hanselmann
2454 aebd0e4e Michael Hanselmann
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2455 aebd0e4e Michael Hanselmann
    """Modifies a job.
2456 aebd0e4e Michael Hanselmann

2457 aebd0e4e Michael Hanselmann
    @type job_id: int
2458 aebd0e4e Michael Hanselmann
    @param job_id: Job ID
2459 aebd0e4e Michael Hanselmann
    @type mod_fn: callable
2460 aebd0e4e Michael Hanselmann
    @param mod_fn: Modifying function, receiving job object as parameter,
2461 aebd0e4e Michael Hanselmann
      returning tuple of (status boolean, message string)
2462 aebd0e4e Michael Hanselmann

2463 aebd0e4e Michael Hanselmann
    """
2464 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2465 188c5e0a Michael Hanselmann
    if not job:
2466 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2467 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
2468 fbf0262f Michael Hanselmann
2469 aebd0e4e Michael Hanselmann
    assert job.writable, "Can't modify read-only job"
2470 aebd0e4e Michael Hanselmann
    assert not job.archived, "Can't modify archived job"
2471 c0f6d0d8 Michael Hanselmann
2472 aebd0e4e Michael Hanselmann
    (success, msg) = mod_fn(job)
2473 188c5e0a Michael Hanselmann
2474 099b2870 Michael Hanselmann
    if success:
2475 66bd7445 Michael Hanselmann
      # If the job was finalized (e.g. cancelled), this is the final write
2476 66bd7445 Michael Hanselmann
      # allowed. The job can be archived anytime.
2477 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
2478 fbf0262f Michael Hanselmann
2479 099b2870 Michael Hanselmann
    return (success, msg)
2480 fbf0262f Michael Hanselmann
2481 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
2482 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
2483 d7fd1f28 Michael Hanselmann
    """Archives jobs.
2484 c609f802 Michael Hanselmann

2485 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
2486 25e7b43f Iustin Pop
    @param jobs: Job objects
2487 d7fd1f28 Michael Hanselmann
    @rtype: int
2488 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
2489 c609f802 Michael Hanselmann

2490 c609f802 Michael Hanselmann
    """
2491 d7fd1f28 Michael Hanselmann
    archive_jobs = []
2492 d7fd1f28 Michael Hanselmann
    rename_files = []
2493 d7fd1f28 Michael Hanselmann
    for job in jobs:
2494 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't archive read-only job"
2495 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't cancel archived job"
2496 c0f6d0d8 Michael Hanselmann
2497 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2498 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
2499 d7fd1f28 Michael Hanselmann
        continue
2500 c609f802 Michael Hanselmann
2501 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
2502 c609f802 Michael Hanselmann
2503 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
2504 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
2505 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
2506 c609f802 Michael Hanselmann
2507 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
2508 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
2509 f1da30e6 Michael Hanselmann
2510 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
2511 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
2512 d7fd1f28 Michael Hanselmann
2513 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2514 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
2515 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
2516 20571a26 Guido Trotter
    # archived jobs to fix this.
2517 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
2518 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
2519 78d12585 Michael Hanselmann
2520 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2521 07cd723a Iustin Pop
  @_RequireOpenQueue
2522 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
2523 07cd723a Iustin Pop
    """Archives a job.
2524 07cd723a Iustin Pop

2525 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2526 ea03467c Iustin Pop

2527 76b62028 Iustin Pop
    @type job_id: int
2528 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
2529 78d12585 Michael Hanselmann
    @rtype: bool
2530 78d12585 Michael Hanselmann
    @return: Whether job was archived
2531 07cd723a Iustin Pop

2532 07cd723a Iustin Pop
    """
2533 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
2534 78d12585 Michael Hanselmann
2535 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2536 78d12585 Michael Hanselmann
    if not job:
2537 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2538 78d12585 Michael Hanselmann
      return False
2539 78d12585 Michael Hanselmann
2540 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
2541 07cd723a Iustin Pop
2542 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2543 07cd723a Iustin Pop
  @_RequireOpenQueue
2544 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
2545 07cd723a Iustin Pop
    """Archives all jobs based on age.
2546 07cd723a Iustin Pop

2547 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
2548 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
2549 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
2550 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
2551 07cd723a Iustin Pop

2552 07cd723a Iustin Pop
    @type age: int
2553 07cd723a Iustin Pop
    @param age: the minimum age in seconds
2554 07cd723a Iustin Pop

2555 07cd723a Iustin Pop
    """
2556 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
2557 07cd723a Iustin Pop
2558 07cd723a Iustin Pop
    now = time.time()
2559 f8ad5591 Michael Hanselmann
    end_time = now + timeout
2560 f8ad5591 Michael Hanselmann
    archived_count = 0
2561 f8ad5591 Michael Hanselmann
    last_touched = 0
2562 f8ad5591 Michael Hanselmann
2563 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
2564 d7fd1f28 Michael Hanselmann
    pending = []
2565 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
2566 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
2567 f8ad5591 Michael Hanselmann
2568 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
2569 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
2570 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
2571 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
2572 f8ad5591 Michael Hanselmann
        break
2573 f8ad5591 Michael Hanselmann
2574 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
2575 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
2576 f8ad5591 Michael Hanselmann
      if job:
2577 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
2578 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
2579 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
2580 f8ad5591 Michael Hanselmann
          else:
2581 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
2582 07cd723a Iustin Pop
        else:
2583 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
2584 f8ad5591 Michael Hanselmann
2585 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
2586 d7fd1f28 Michael Hanselmann
          pending.append(job)
2587 d7fd1f28 Michael Hanselmann
2588 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
2589 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
2590 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
2591 d7fd1f28 Michael Hanselmann
            pending = []
2592 f8ad5591 Michael Hanselmann
2593 d7fd1f28 Michael Hanselmann
    if pending:
2594 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
2595 07cd723a Iustin Pop
2596 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
2597 07cd723a Iustin Pop
2598 e07f7f7a Michael Hanselmann
  def _Query(self, fields, qfilter):
2599 e07f7f7a Michael Hanselmann
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2600 e07f7f7a Michael Hanselmann
                       namefield="id")
2601 e07f7f7a Michael Hanselmann
2602 0422250e Michael Hanselmann
    # Archived jobs are only looked at if the "archived" field is referenced
2603 0422250e Michael Hanselmann
    # either as a requested field or in the filter. By default archived jobs
2604 0422250e Michael Hanselmann
    # are ignored.
2605 0422250e Michael Hanselmann
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2606 0422250e Michael Hanselmann
2607 e07f7f7a Michael Hanselmann
    job_ids = qobj.RequestedNames()
2608 e07f7f7a Michael Hanselmann
2609 e07f7f7a Michael Hanselmann
    list_all = (job_ids is None)
2610 e07f7f7a Michael Hanselmann
2611 e07f7f7a Michael Hanselmann
    if list_all:
2612 e07f7f7a Michael Hanselmann
      # Since files are added to/removed from the queue atomically, there's no
2613 e07f7f7a Michael Hanselmann
      # risk of getting the job ids in an inconsistent state.
2614 0422250e Michael Hanselmann
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2615 e07f7f7a Michael Hanselmann
2616 e07f7f7a Michael Hanselmann
    jobs = []
2617 e07f7f7a Michael Hanselmann
2618 e07f7f7a Michael Hanselmann
    for job_id in job_ids:
2619 e07f7f7a Michael Hanselmann
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2620 e07f7f7a Michael Hanselmann
      if job is not None or not list_all:
2621 e07f7f7a Michael Hanselmann
        jobs.append((job_id, job))
2622 e07f7f7a Michael Hanselmann
2623 e07f7f7a Michael Hanselmann
    return (qobj, jobs, list_all)
2624 e07f7f7a Michael Hanselmann
2625 e07f7f7a Michael Hanselmann
  def QueryJobs(self, fields, qfilter):
2626 e07f7f7a Michael Hanselmann
    """Returns a list of jobs in queue.
2627 e07f7f7a Michael Hanselmann

2628 e07f7f7a Michael Hanselmann
    @type fields: sequence
2629 e07f7f7a Michael Hanselmann
    @param fields: List of wanted fields
2630 e07f7f7a Michael Hanselmann
    @type qfilter: None or query2 filter (list)
2631 e07f7f7a Michael Hanselmann
    @param qfilter: Query filter
2632 e07f7f7a Michael Hanselmann

2633 e07f7f7a Michael Hanselmann
    """
2634 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2635 e07f7f7a Michael Hanselmann
2636 76b62028 Iustin Pop
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2637 e07f7f7a Michael Hanselmann
2638 e07f7f7a Michael Hanselmann
  def OldStyleQueryJobs(self, job_ids, fields):
2639 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
2640 e2715f69 Michael Hanselmann

2641 ea03467c Iustin Pop
    @type job_ids: list
2642 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
2643 ea03467c Iustin Pop
    @type fields: list
2644 ea03467c Iustin Pop
    @param fields: names of fields to return
2645 ea03467c Iustin Pop
    @rtype: list
2646 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
2647 ea03467c Iustin Pop
        the requested fields
2648 e2715f69 Michael Hanselmann

2649 e2715f69 Michael Hanselmann
    """
2650 76b62028 Iustin Pop
    # backwards compat:
2651 76b62028 Iustin Pop
    job_ids = [int(jid) for jid in job_ids]
2652 e07f7f7a Michael Hanselmann
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2653 e2715f69 Michael Hanselmann
2654 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2655 e2715f69 Michael Hanselmann
2656 76b62028 Iustin Pop
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2657 e2715f69 Michael Hanselmann
2658 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2659 6d5ea385 Michael Hanselmann
  def PrepareShutdown(self):
2660 6d5ea385 Michael Hanselmann
    """Prepare to stop the job queue.
2661 6d5ea385 Michael Hanselmann

2662 6d5ea385 Michael Hanselmann
    Disables execution of jobs in the workerpool and returns whether there are
2663 6d5ea385 Michael Hanselmann
    any jobs currently running. If the latter is the case, the job queue is not
2664 6d5ea385 Michael Hanselmann
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2665 6d5ea385 Michael Hanselmann
    be called without interfering with any job. Queued and unfinished jobs will
2666 6d5ea385 Michael Hanselmann
    be resumed next time.
2667 6d5ea385 Michael Hanselmann

2668 6d5ea385 Michael Hanselmann
    Once this function has been called no new job submissions will be accepted
2669 6d5ea385 Michael Hanselmann
    (see L{_RequireNonDrainedQueue}).
2670 6d5ea385 Michael Hanselmann

2671 6d5ea385 Michael Hanselmann
    @rtype: bool
2672 6d5ea385 Michael Hanselmann
    @return: Whether there are any running jobs
2673 6d5ea385 Michael Hanselmann

2674 6d5ea385 Michael Hanselmann
    """
2675 6d5ea385 Michael Hanselmann
    if self._accepting_jobs:
2676 6d5ea385 Michael Hanselmann
      self._accepting_jobs = False
2677 6d5ea385 Michael Hanselmann
2678 6d5ea385 Michael Hanselmann
      # Tell worker pool to stop processing pending tasks
2679 6d5ea385 Michael Hanselmann
      self._wpool.SetActive(False)
2680 6d5ea385 Michael Hanselmann
2681 6d5ea385 Michael Hanselmann
    return self._wpool.HasRunningTasks()
2682 6d5ea385 Michael Hanselmann
2683 942e2262 Michael Hanselmann
  def AcceptingJobsUnlocked(self):
2684 942e2262 Michael Hanselmann
    """Returns whether jobs are accepted.
2685 942e2262 Michael Hanselmann

2686 942e2262 Michael Hanselmann
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2687 942e2262 Michael Hanselmann
    queue is shutting down.
2688 942e2262 Michael Hanselmann

2689 942e2262 Michael Hanselmann
    @rtype: bool
2690 942e2262 Michael Hanselmann

2691 942e2262 Michael Hanselmann
    """
2692 942e2262 Michael Hanselmann
    return self._accepting_jobs
2693 942e2262 Michael Hanselmann
2694 6d5ea385 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2695 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2696 e2715f69 Michael Hanselmann
  def Shutdown(self):
2697 e2715f69 Michael Hanselmann
    """Stops the job queue.
2698 e2715f69 Michael Hanselmann

2699 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
2700 ea03467c Iustin Pop

2701 e2715f69 Michael Hanselmann
    """
2702 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
2703 85f03e0d Michael Hanselmann
2704 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
2705 a71f9c7d Guido Trotter
    self._queue_filelock = None