Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 3c631ea2

History | View | Annotate | Download (78.4 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 76b62028 Iustin Pop
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 e2715f69 Michael Hanselmann
import logging
33 f1da30e6 Michael Hanselmann
import errno
34 f1048938 Iustin Pop
import time
35 5685c1a5 Michael Hanselmann
import weakref
36 b95479a5 Michael Hanselmann
import threading
37 dfc8824a Michael Hanselmann
import itertools
38 99fb250b Michael Hanselmann
import operator
39 498ae1cc Iustin Pop
40 6c2549d6 Guido Trotter
try:
41 b459a848 Andrea Spadaccini
  # pylint: disable=E0611
42 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
43 6c2549d6 Guido Trotter
except ImportError:
44 6c2549d6 Guido Trotter
  import pyinotify
45 6c2549d6 Guido Trotter
46 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
47 e2715f69 Michael Hanselmann
from ganeti import constants
48 f1da30e6 Michael Hanselmann
from ganeti import serializer
49 e2715f69 Michael Hanselmann
from ganeti import workerpool
50 99bd4f0a Guido Trotter
from ganeti import locking
51 f1da30e6 Michael Hanselmann
from ganeti import opcodes
52 7a1ecaed Iustin Pop
from ganeti import errors
53 e2715f69 Michael Hanselmann
from ganeti import mcpu
54 7996a135 Iustin Pop
from ganeti import utils
55 04ab05ce Michael Hanselmann
from ganeti import jstore
56 c3f0a12f Iustin Pop
from ganeti import rpc
57 82b22e19 René Nussbaumer
from ganeti import runtime
58 a744b676 Manuel Franceschini
from ganeti import netutils
59 989a8bee Michael Hanselmann
from ganeti import compat
60 b95479a5 Michael Hanselmann
from ganeti import ht
61 a06c6ae8 Michael Hanselmann
from ganeti import query
62 a06c6ae8 Michael Hanselmann
from ganeti import qlang
63 e2b4a7ba Michael Hanselmann
from ganeti import pathutils
64 cffbbae7 Michael Hanselmann
from ganeti import vcluster
65 e2715f69 Michael Hanselmann
66 fbf0262f Michael Hanselmann
67 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
68 e2715f69 Michael Hanselmann
69 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
70 ebb80afa Guido Trotter
_LOCK = "_lock"
71 ebb80afa Guido Trotter
_QUEUE = "_queue"
72 99bd4f0a Guido Trotter
73 99fb250b Michael Hanselmann
#: Retrieves "id" attribute
74 99fb250b Michael Hanselmann
_GetIdAttr = operator.attrgetter("id")
75 99fb250b Michael Hanselmann
76 498ae1cc Iustin Pop
77 9728ae5d Iustin Pop
class CancelJob(Exception):
78 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
79 fbf0262f Michael Hanselmann

80 fbf0262f Michael Hanselmann
  """
81 fbf0262f Michael Hanselmann
82 fbf0262f Michael Hanselmann
83 942e2262 Michael Hanselmann
class QueueShutdown(Exception):
84 942e2262 Michael Hanselmann
  """Special exception to abort a job when the job queue is shutting down.
85 942e2262 Michael Hanselmann

86 942e2262 Michael Hanselmann
  """
87 942e2262 Michael Hanselmann
88 942e2262 Michael Hanselmann
89 70552c46 Michael Hanselmann
def TimeStampNow():
90 ea03467c Iustin Pop
  """Returns the current timestamp.
91 ea03467c Iustin Pop

92 ea03467c Iustin Pop
  @rtype: tuple
93 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
94 ea03467c Iustin Pop

95 ea03467c Iustin Pop
  """
96 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
97 70552c46 Michael Hanselmann
98 70552c46 Michael Hanselmann
99 cffbbae7 Michael Hanselmann
def _CallJqUpdate(runner, names, file_name, content):
100 cffbbae7 Michael Hanselmann
  """Updates job queue file after virtualizing filename.
101 cffbbae7 Michael Hanselmann

102 cffbbae7 Michael Hanselmann
  """
103 cffbbae7 Michael Hanselmann
  virt_file_name = vcluster.MakeVirtualPath(file_name)
104 cffbbae7 Michael Hanselmann
  return runner.call_jobqueue_update(names, virt_file_name, content)
105 cffbbae7 Michael Hanselmann
106 cffbbae7 Michael Hanselmann
107 a06c6ae8 Michael Hanselmann
class _SimpleJobQuery:
108 a06c6ae8 Michael Hanselmann
  """Wrapper for job queries.
109 a06c6ae8 Michael Hanselmann

110 a06c6ae8 Michael Hanselmann
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
111 a06c6ae8 Michael Hanselmann

112 a06c6ae8 Michael Hanselmann
  """
113 a06c6ae8 Michael Hanselmann
  def __init__(self, fields):
114 a06c6ae8 Michael Hanselmann
    """Initializes this class.
115 a06c6ae8 Michael Hanselmann

116 a06c6ae8 Michael Hanselmann
    """
117 a06c6ae8 Michael Hanselmann
    self._query = query.Query(query.JOB_FIELDS, fields)
118 a06c6ae8 Michael Hanselmann
119 a06c6ae8 Michael Hanselmann
  def __call__(self, job):
120 a06c6ae8 Michael Hanselmann
    """Executes a job query using cached field list.
121 a06c6ae8 Michael Hanselmann

122 a06c6ae8 Michael Hanselmann
    """
123 a06c6ae8 Michael Hanselmann
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
124 a06c6ae8 Michael Hanselmann
125 a06c6ae8 Michael Hanselmann
126 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
127 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
128 e2715f69 Michael Hanselmann

129 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
130 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
131 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
132 ea03467c Iustin Pop
  @ivar status: the current status
133 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
134 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
135 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
137 f1048938 Iustin Pop

138 e2715f69 Michael Hanselmann
  """
139 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
140 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
141 66d895a8 Iustin Pop
               "__weakref__"]
142 66d895a8 Iustin Pop
143 85f03e0d Michael Hanselmann
  def __init__(self, op):
144 66abb9ff Michael Hanselmann
    """Initializes instances of this class.
145 ea03467c Iustin Pop

146 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
147 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
148 ea03467c Iustin Pop

149 ea03467c Iustin Pop
    """
150 85f03e0d Michael Hanselmann
    self.input = op
151 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
152 85f03e0d Michael Hanselmann
    self.result = None
153 85f03e0d Michael Hanselmann
    self.log = []
154 70552c46 Michael Hanselmann
    self.start_timestamp = None
155 b9b5abcb Iustin Pop
    self.exec_timestamp = None
156 70552c46 Michael Hanselmann
    self.end_timestamp = None
157 f1da30e6 Michael Hanselmann
158 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
159 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
160 8f5c488d Michael Hanselmann
161 f1da30e6 Michael Hanselmann
  @classmethod
162 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
163 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
164 ea03467c Iustin Pop

165 ea03467c Iustin Pop
    @type state: dict
166 ea03467c Iustin Pop
    @param state: the serialized state
167 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
168 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
169 ea03467c Iustin Pop

170 ea03467c Iustin Pop
    """
171 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
172 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173 85f03e0d Michael Hanselmann
    obj.status = state["status"]
174 85f03e0d Michael Hanselmann
    obj.result = state["result"]
175 85f03e0d Michael Hanselmann
    obj.log = state["log"]
176 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
177 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
178 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
179 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
180 f1da30e6 Michael Hanselmann
    return obj
181 f1da30e6 Michael Hanselmann
182 f1da30e6 Michael Hanselmann
  def Serialize(self):
183 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
184 ea03467c Iustin Pop

185 ea03467c Iustin Pop
    @rtype: dict
186 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
187 ea03467c Iustin Pop

188 ea03467c Iustin Pop
    """
189 6c5a7090 Michael Hanselmann
    return {
190 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
191 6c5a7090 Michael Hanselmann
      "status": self.status,
192 6c5a7090 Michael Hanselmann
      "result": self.result,
193 6c5a7090 Michael Hanselmann
      "log": self.log,
194 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
195 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
196 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
197 8f5c488d Michael Hanselmann
      "priority": self.priority,
198 6c5a7090 Michael Hanselmann
      }
199 f1048938 Iustin Pop
200 e2715f69 Michael Hanselmann
201 e2715f69 Michael Hanselmann
class _QueuedJob(object):
202 e2715f69 Michael Hanselmann
  """In-memory job representation.
203 e2715f69 Michael Hanselmann

204 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
205 ea03467c Iustin Pop
  be taken care of by users of this class.
206 ea03467c Iustin Pop

207 ea03467c Iustin Pop
  @type queue: L{JobQueue}
208 ea03467c Iustin Pop
  @ivar queue: the parent queue
209 ea03467c Iustin Pop
  @ivar id: the job ID
210 ea03467c Iustin Pop
  @type ops: list
211 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
212 ea03467c Iustin Pop
  @type log_serial: int
213 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
214 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
215 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
216 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
217 c0f6d0d8 Michael Hanselmann
  @ivar writable: Whether the job is allowed to be modified
218 e2715f69 Michael Hanselmann

219 e2715f69 Michael Hanselmann
  """
220 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
221 26d3fd2f Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
223 8a3cd185 Michael Hanselmann
               "__weakref__", "processor_lock", "writable", "archived"]
224 66d895a8 Iustin Pop
225 c0f6d0d8 Michael Hanselmann
  def __init__(self, queue, job_id, ops, writable):
226 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
227 ea03467c Iustin Pop

228 ea03467c Iustin Pop
    @type queue: L{JobQueue}
229 ea03467c Iustin Pop
    @param queue: our parent queue
230 ea03467c Iustin Pop
    @type job_id: job_id
231 ea03467c Iustin Pop
    @param job_id: our job id
232 ea03467c Iustin Pop
    @type ops: list
233 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
234 ea03467c Iustin Pop
        in _QueuedOpCodes
235 c0f6d0d8 Michael Hanselmann
    @type writable: bool
236 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
237 ea03467c Iustin Pop

238 ea03467c Iustin Pop
    """
239 e2715f69 Michael Hanselmann
    if not ops:
240 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
241 e2715f69 Michael Hanselmann
242 85f03e0d Michael Hanselmann
    self.queue = queue
243 76b62028 Iustin Pop
    self.id = int(job_id)
244 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
245 6c5a7090 Michael Hanselmann
    self.log_serial = 0
246 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
247 c56ec146 Iustin Pop
    self.start_timestamp = None
248 c56ec146 Iustin Pop
    self.end_timestamp = None
249 8a3cd185 Michael Hanselmann
    self.archived = False
250 6c5a7090 Michael Hanselmann
251 c0f6d0d8 Michael Hanselmann
    self._InitInMemory(self, writable)
252 fa4aa6b4 Michael Hanselmann
253 8a3cd185 Michael Hanselmann
    assert not self.archived, "New jobs can not be marked as archived"
254 8a3cd185 Michael Hanselmann
255 fa4aa6b4 Michael Hanselmann
  @staticmethod
256 c0f6d0d8 Michael Hanselmann
  def _InitInMemory(obj, writable):
257 fa4aa6b4 Michael Hanselmann
    """Initializes in-memory variables.
258 fa4aa6b4 Michael Hanselmann

259 fa4aa6b4 Michael Hanselmann
    """
260 c0f6d0d8 Michael Hanselmann
    obj.writable = writable
261 03b63608 Michael Hanselmann
    obj.ops_iter = None
262 26d3fd2f Michael Hanselmann
    obj.cur_opctx = None
263 f8a4adfa Michael Hanselmann
264 f8a4adfa Michael Hanselmann
    # Read-only jobs are not processed and therefore don't need a lock
265 f8a4adfa Michael Hanselmann
    if writable:
266 f8a4adfa Michael Hanselmann
      obj.processor_lock = threading.Lock()
267 f8a4adfa Michael Hanselmann
    else:
268 f8a4adfa Michael Hanselmann
      obj.processor_lock = None
269 be760ba8 Michael Hanselmann
270 9fa2e150 Michael Hanselmann
  def __repr__(self):
271 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
272 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
273 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
274 9fa2e150 Michael Hanselmann
275 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
276 9fa2e150 Michael Hanselmann
277 f1da30e6 Michael Hanselmann
  @classmethod
278 8a3cd185 Michael Hanselmann
  def Restore(cls, queue, state, writable, archived):
279 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
280 ea03467c Iustin Pop

281 ea03467c Iustin Pop
    @type queue: L{JobQueue}
282 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
283 ea03467c Iustin Pop
    @type state: dict
284 ea03467c Iustin Pop
    @param state: the serialized state
285 c0f6d0d8 Michael Hanselmann
    @type writable: bool
286 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
287 8a3cd185 Michael Hanselmann
    @type archived: bool
288 8a3cd185 Michael Hanselmann
    @param archived: Whether job was already archived
289 ea03467c Iustin Pop
    @rtype: _JobQueue
290 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
291 ea03467c Iustin Pop

292 ea03467c Iustin Pop
    """
293 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
294 85f03e0d Michael Hanselmann
    obj.queue = queue
295 76b62028 Iustin Pop
    obj.id = int(state["id"])
296 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
297 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
298 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
299 8a3cd185 Michael Hanselmann
    obj.archived = archived
300 6c5a7090 Michael Hanselmann
301 6c5a7090 Michael Hanselmann
    obj.ops = []
302 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
303 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
304 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
305 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
306 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
307 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
308 6c5a7090 Michael Hanselmann
309 c0f6d0d8 Michael Hanselmann
    cls._InitInMemory(obj, writable)
310 be760ba8 Michael Hanselmann
311 f1da30e6 Michael Hanselmann
    return obj
312 f1da30e6 Michael Hanselmann
313 f1da30e6 Michael Hanselmann
  def Serialize(self):
314 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
315 ea03467c Iustin Pop

316 ea03467c Iustin Pop
    @rtype: dict
317 ea03467c Iustin Pop
    @return: the serialized state
318 ea03467c Iustin Pop

319 ea03467c Iustin Pop
    """
320 f1da30e6 Michael Hanselmann
    return {
321 f1da30e6 Michael Hanselmann
      "id": self.id,
322 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
323 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
324 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
325 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
326 f1da30e6 Michael Hanselmann
      }
327 f1da30e6 Michael Hanselmann
328 85f03e0d Michael Hanselmann
  def CalcStatus(self):
329 ea03467c Iustin Pop
    """Compute the status of this job.
330 ea03467c Iustin Pop

331 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
332 ea03467c Iustin Pop
    based on their status, computes the job status.
333 ea03467c Iustin Pop

334 ea03467c Iustin Pop
    The algorithm is:
335 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
336 ea03467c Iustin Pop
        status will be the same
337 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
338 ea03467c Iustin Pop
          - waitlock
339 fbf0262f Michael Hanselmann
          - canceling
340 ea03467c Iustin Pop
          - running
341 ea03467c Iustin Pop

342 ea03467c Iustin Pop
        will determine the job status
343 ea03467c Iustin Pop

344 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
345 ea03467c Iustin Pop
        and the job status will be the same
346 ea03467c Iustin Pop

347 ea03467c Iustin Pop
    @return: the job status
348 ea03467c Iustin Pop

349 ea03467c Iustin Pop
    """
350 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
351 e2715f69 Michael Hanselmann
352 e2715f69 Michael Hanselmann
    all_success = True
353 85f03e0d Michael Hanselmann
    for op in self.ops:
354 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
355 e2715f69 Michael Hanselmann
        continue
356 e2715f69 Michael Hanselmann
357 e2715f69 Michael Hanselmann
      all_success = False
358 e2715f69 Michael Hanselmann
359 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
360 e2715f69 Michael Hanselmann
        pass
361 47099cd1 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING:
362 47099cd1 Michael Hanselmann
        status = constants.JOB_STATUS_WAITING
363 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
364 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
365 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
366 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
367 fbf0262f Michael Hanselmann
        break
368 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
369 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
370 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
371 f1da30e6 Michael Hanselmann
        break
372 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
373 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
374 4cb1d919 Michael Hanselmann
        break
375 e2715f69 Michael Hanselmann
376 e2715f69 Michael Hanselmann
    if all_success:
377 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
378 e2715f69 Michael Hanselmann
379 e2715f69 Michael Hanselmann
    return status
380 e2715f69 Michael Hanselmann
381 8f5c488d Michael Hanselmann
  def CalcPriority(self):
382 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
383 8f5c488d Michael Hanselmann

384 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
385 8f5c488d Michael Hanselmann
    priority is used.
386 8f5c488d Michael Hanselmann

387 8f5c488d Michael Hanselmann
    @rtype: int
388 8f5c488d Michael Hanselmann

389 8f5c488d Michael Hanselmann
    """
390 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
391 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
392 8f5c488d Michael Hanselmann
393 8f5c488d Michael Hanselmann
    if not priorities:
394 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
395 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
396 8f5c488d Michael Hanselmann
397 8f5c488d Michael Hanselmann
    return min(priorities)
398 8f5c488d Michael Hanselmann
399 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
400 ea03467c Iustin Pop
    """Selectively returns the log entries.
401 ea03467c Iustin Pop

402 ea03467c Iustin Pop
    @type newer_than: None or int
403 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
404 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
405 ea03467c Iustin Pop
        than this value
406 ea03467c Iustin Pop
    @rtype: list
407 ea03467c Iustin Pop
    @return: the list of the log entries selected
408 ea03467c Iustin Pop

409 ea03467c Iustin Pop
    """
410 6c5a7090 Michael Hanselmann
    if newer_than is None:
411 6c5a7090 Michael Hanselmann
      serial = -1
412 6c5a7090 Michael Hanselmann
    else:
413 6c5a7090 Michael Hanselmann
      serial = newer_than
414 6c5a7090 Michael Hanselmann
415 6c5a7090 Michael Hanselmann
    entries = []
416 6c5a7090 Michael Hanselmann
    for op in self.ops:
417 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
418 6c5a7090 Michael Hanselmann
419 6c5a7090 Michael Hanselmann
    return entries
420 6c5a7090 Michael Hanselmann
421 6a290889 Guido Trotter
  def GetInfo(self, fields):
422 6a290889 Guido Trotter
    """Returns information about a job.
423 6a290889 Guido Trotter

424 6a290889 Guido Trotter
    @type fields: list
425 6a290889 Guido Trotter
    @param fields: names of fields to return
426 6a290889 Guido Trotter
    @rtype: list
427 6a290889 Guido Trotter
    @return: list with one element for each field
428 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
429 6a290889 Guido Trotter
        has been passed
430 6a290889 Guido Trotter

431 6a290889 Guido Trotter
    """
432 a06c6ae8 Michael Hanselmann
    return _SimpleJobQuery(fields)(self)
433 6a290889 Guido Trotter
434 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
435 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
436 34327f51 Iustin Pop

437 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
438 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
439 34327f51 Iustin Pop
    finalised are not changed.
440 34327f51 Iustin Pop

441 34327f51 Iustin Pop
    @param status: a given opcode status
442 34327f51 Iustin Pop
    @param result: the opcode result
443 34327f51 Iustin Pop

444 34327f51 Iustin Pop
    """
445 747f6113 Michael Hanselmann
    not_marked = True
446 747f6113 Michael Hanselmann
    for op in self.ops:
447 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
448 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
449 747f6113 Michael Hanselmann
        continue
450 747f6113 Michael Hanselmann
      op.status = status
451 747f6113 Michael Hanselmann
      op.result = result
452 747f6113 Michael Hanselmann
      not_marked = False
453 34327f51 Iustin Pop
454 66bd7445 Michael Hanselmann
  def Finalize(self):
455 66bd7445 Michael Hanselmann
    """Marks the job as finalized.
456 66bd7445 Michael Hanselmann

457 66bd7445 Michael Hanselmann
    """
458 66bd7445 Michael Hanselmann
    self.end_timestamp = TimeStampNow()
459 66bd7445 Michael Hanselmann
460 099b2870 Michael Hanselmann
  def Cancel(self):
461 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
462 a0d2fe2c Michael Hanselmann

463 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
464 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
465 a0d2fe2c Michael Hanselmann
      as canceling and a text message
466 a0d2fe2c Michael Hanselmann

467 a0d2fe2c Michael Hanselmann
    """
468 099b2870 Michael Hanselmann
    status = self.CalcStatus()
469 099b2870 Michael Hanselmann
470 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
471 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
472 099b2870 Michael Hanselmann
                             "Job canceled by request")
473 66bd7445 Michael Hanselmann
      self.Finalize()
474 86b16e9d Michael Hanselmann
      return (True, "Job %s canceled" % self.id)
475 099b2870 Michael Hanselmann
476 47099cd1 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITING:
477 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
478 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
479 86b16e9d Michael Hanselmann
      return (True, "Job %s will be canceled" % self.id)
480 099b2870 Michael Hanselmann
481 86b16e9d Michael Hanselmann
    else:
482 86b16e9d Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
483 86b16e9d Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
484 099b2870 Michael Hanselmann
485 4679547e Michael Hanselmann
  def ChangePriority(self, priority):
486 4679547e Michael Hanselmann
    """Changes the job priority.
487 4679547e Michael Hanselmann

488 4679547e Michael Hanselmann
    @type priority: int
489 4679547e Michael Hanselmann
    @param priority: New priority
490 4679547e Michael Hanselmann
    @rtype: tuple; (bool, string)
491 4679547e Michael Hanselmann
    @return: Boolean describing whether job's priority was successfully changed
492 4679547e Michael Hanselmann
      and a text message
493 4679547e Michael Hanselmann

494 4679547e Michael Hanselmann
    """
495 4679547e Michael Hanselmann
    status = self.CalcStatus()
496 4679547e Michael Hanselmann
497 4679547e Michael Hanselmann
    if status in constants.JOBS_FINALIZED:
498 4679547e Michael Hanselmann
      return (False, "Job %s is finished" % self.id)
499 4679547e Michael Hanselmann
    elif status == constants.JOB_STATUS_CANCELING:
500 4679547e Michael Hanselmann
      return (False, "Job %s is cancelling" % self.id)
501 4679547e Michael Hanselmann
    else:
502 4679547e Michael Hanselmann
      assert status in (constants.JOB_STATUS_QUEUED,
503 4679547e Michael Hanselmann
                        constants.JOB_STATUS_WAITING,
504 4679547e Michael Hanselmann
                        constants.JOB_STATUS_RUNNING)
505 4679547e Michael Hanselmann
506 4679547e Michael Hanselmann
      changed = False
507 4679547e Michael Hanselmann
      for op in self.ops:
508 4679547e Michael Hanselmann
        if (op.status == constants.OP_STATUS_RUNNING or
509 4679547e Michael Hanselmann
            op.status in constants.OPS_FINALIZED):
510 4679547e Michael Hanselmann
          assert not changed, \
511 4679547e Michael Hanselmann
            ("Found opcode for which priority should not be changed after"
512 4679547e Michael Hanselmann
             " priority has been changed for previous opcodes")
513 4679547e Michael Hanselmann
          continue
514 4679547e Michael Hanselmann
515 4679547e Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
516 4679547e Michael Hanselmann
                             constants.OP_STATUS_WAITING)
517 4679547e Michael Hanselmann
518 4679547e Michael Hanselmann
        changed = True
519 4679547e Michael Hanselmann
520 3c631ea2 Michael Hanselmann
        # Set new priority (doesn't modify opcode input)
521 4679547e Michael Hanselmann
        op.priority = priority
522 4679547e Michael Hanselmann
523 4679547e Michael Hanselmann
      if changed:
524 4679547e Michael Hanselmann
        return (True, ("Priorities of pending opcodes for job %s have been"
525 4679547e Michael Hanselmann
                       " changed to %s" % (self.id, priority)))
526 4679547e Michael Hanselmann
      else:
527 4679547e Michael Hanselmann
        return (False, "Job %s had no pending opcodes" % self.id)
528 4679547e Michael Hanselmann
529 f1048938 Iustin Pop
530 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
531 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
532 031a3e57 Michael Hanselmann
    """Initializes this class.
533 ea03467c Iustin Pop

534 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
535 031a3e57 Michael Hanselmann
    @param queue: Job queue
536 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
537 031a3e57 Michael Hanselmann
    @param job: Job object
538 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
539 031a3e57 Michael Hanselmann
    @param op: OpCode
540 031a3e57 Michael Hanselmann

541 031a3e57 Michael Hanselmann
    """
542 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
543 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
544 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
545 031a3e57 Michael Hanselmann
546 031a3e57 Michael Hanselmann
    self._queue = queue
547 031a3e57 Michael Hanselmann
    self._job = job
548 031a3e57 Michael Hanselmann
    self._op = op
549 031a3e57 Michael Hanselmann
550 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
551 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
552 dc1e2262 Michael Hanselmann

553 dc1e2262 Michael Hanselmann
    """
554 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
555 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
556 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
557 dc1e2262 Michael Hanselmann
      raise CancelJob()
558 dc1e2262 Michael Hanselmann
559 942e2262 Michael Hanselmann
    # See if queue is shutting down
560 942e2262 Michael Hanselmann
    if not self._queue.AcceptingJobsUnlocked():
561 942e2262 Michael Hanselmann
      logging.debug("Queue is shutting down")
562 942e2262 Michael Hanselmann
      raise QueueShutdown()
563 942e2262 Michael Hanselmann
564 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
565 031a3e57 Michael Hanselmann
  def NotifyStart(self):
566 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
567 e92376d7 Iustin Pop

568 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
569 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
570 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
571 47099cd1 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
572 e92376d7 Iustin Pop

573 e92376d7 Iustin Pop
    """
574 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
575 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
576 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
577 fbf0262f Michael Hanselmann
578 271daef8 Iustin Pop
    # Cancel here if we were asked to
579 dc1e2262 Michael Hanselmann
    self._CheckCancel()
580 fbf0262f Michael Hanselmann
581 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
582 9bdab621 Michael Hanselmann
583 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
584 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
585 271daef8 Iustin Pop
586 271daef8 Iustin Pop
    # And finally replicate the job status
587 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
588 031a3e57 Michael Hanselmann
589 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
590 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
591 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
592 9bf5e01f Guido Trotter

593 9bf5e01f Guido Trotter
    """
594 9bf5e01f Guido Trotter
    self._job.log_serial += 1
595 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
596 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
597 9bf5e01f Guido Trotter
598 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
599 031a3e57 Michael Hanselmann
    """Append a log entry.
600 031a3e57 Michael Hanselmann

601 031a3e57 Michael Hanselmann
    """
602 031a3e57 Michael Hanselmann
    assert len(args) < 3
603 031a3e57 Michael Hanselmann
604 031a3e57 Michael Hanselmann
    if len(args) == 1:
605 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
606 031a3e57 Michael Hanselmann
      log_msg = args[0]
607 031a3e57 Michael Hanselmann
    else:
608 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
609 031a3e57 Michael Hanselmann
610 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
611 031a3e57 Michael Hanselmann
    # precision.
612 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
613 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
614 031a3e57 Michael Hanselmann
615 e4e59de8 Michael Hanselmann
  def CurrentPriority(self):
616 e4e59de8 Michael Hanselmann
    """Returns current priority for opcode.
617 ef2df7d3 Michael Hanselmann

618 ef2df7d3 Michael Hanselmann
    """
619 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
620 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
621 dc1e2262 Michael Hanselmann
622 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
623 dc1e2262 Michael Hanselmann
    self._CheckCancel()
624 dc1e2262 Michael Hanselmann
625 e4e59de8 Michael Hanselmann
    return self._op.priority
626 e4e59de8 Michael Hanselmann
627 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
628 6a373640 Michael Hanselmann
    """Submits jobs for processing.
629 6a373640 Michael Hanselmann

630 6a373640 Michael Hanselmann
    See L{JobQueue.SubmitManyJobs}.
631 6a373640 Michael Hanselmann

632 6a373640 Michael Hanselmann
    """
633 6a373640 Michael Hanselmann
    # Locking is done in job queue
634 6a373640 Michael Hanselmann
    return self._queue.SubmitManyJobs(jobs)
635 6a373640 Michael Hanselmann
636 031a3e57 Michael Hanselmann
637 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
638 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
639 989a8bee Michael Hanselmann
    """Initializes this class.
640 6c2549d6 Guido Trotter

641 989a8bee Michael Hanselmann
    @type fields: list of strings
642 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
643 989a8bee Michael Hanselmann
    @type prev_job_info: string
644 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
645 989a8bee Michael Hanselmann
    @type prev_log_serial: string
646 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
647 6c2549d6 Guido Trotter

648 989a8bee Michael Hanselmann
    """
649 dc2879ea Michael Hanselmann
    self._squery = _SimpleJobQuery(fields)
650 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
651 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
652 6c2549d6 Guido Trotter
653 989a8bee Michael Hanselmann
  def __call__(self, job):
654 989a8bee Michael Hanselmann
    """Checks whether job has changed.
655 6c2549d6 Guido Trotter

656 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
657 989a8bee Michael Hanselmann
    @param job: Job object
658 6c2549d6 Guido Trotter

659 6c2549d6 Guido Trotter
    """
660 c0f6d0d8 Michael Hanselmann
    assert not job.writable, "Expected read-only job"
661 c0f6d0d8 Michael Hanselmann
662 989a8bee Michael Hanselmann
    status = job.CalcStatus()
663 dc2879ea Michael Hanselmann
    job_info = self._squery(job)
664 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
665 6c2549d6 Guido Trotter
666 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
667 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
668 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
669 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
670 6c2549d6 Guido Trotter
    # significantly different.
671 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
672 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
673 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
674 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
675 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
676 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
677 6c2549d6 Guido Trotter
678 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
679 6c2549d6 Guido Trotter
    # no changes.
680 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
681 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
682 47099cd1 Michael Hanselmann
                       constants.JOB_STATUS_WAITING) or
683 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
684 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
685 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
686 989a8bee Michael Hanselmann
      return (job_info, log_entries)
687 6c2549d6 Guido Trotter
688 989a8bee Michael Hanselmann
    return None
689 989a8bee Michael Hanselmann
690 989a8bee Michael Hanselmann
691 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
692 989a8bee Michael Hanselmann
  def __init__(self, filename):
693 989a8bee Michael Hanselmann
    """Initializes this class.
694 989a8bee Michael Hanselmann

695 989a8bee Michael Hanselmann
    @type filename: string
696 989a8bee Michael Hanselmann
    @param filename: Path to job file
697 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
698 6c2549d6 Guido Trotter

699 989a8bee Michael Hanselmann
    """
700 989a8bee Michael Hanselmann
    self._wm = pyinotify.WatchManager()
701 989a8bee Michael Hanselmann
    self._inotify_handler = \
702 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
703 989a8bee Michael Hanselmann
    self._notifier = \
704 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
705 989a8bee Michael Hanselmann
    try:
706 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
707 989a8bee Michael Hanselmann
    except Exception:
708 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
709 989a8bee Michael Hanselmann
      self._notifier.stop()
710 989a8bee Michael Hanselmann
      raise
711 989a8bee Michael Hanselmann
712 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
713 989a8bee Michael Hanselmann
    """Callback for inotify.
714 989a8bee Michael Hanselmann

715 989a8bee Michael Hanselmann
    """
716 6c2549d6 Guido Trotter
    if not notifier_enabled:
717 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
718 989a8bee Michael Hanselmann
719 989a8bee Michael Hanselmann
  def Wait(self, timeout):
720 989a8bee Michael Hanselmann
    """Waits for the job file to change.
721 989a8bee Michael Hanselmann

722 989a8bee Michael Hanselmann
    @type timeout: float
723 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
724 989a8bee Michael Hanselmann
    @return: Whether there have been events
725 989a8bee Michael Hanselmann

726 989a8bee Michael Hanselmann
    """
727 989a8bee Michael Hanselmann
    assert timeout >= 0
728 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
729 989a8bee Michael Hanselmann
    if have_events:
730 989a8bee Michael Hanselmann
      self._notifier.read_events()
731 989a8bee Michael Hanselmann
    self._notifier.process_events()
732 989a8bee Michael Hanselmann
    return have_events
733 989a8bee Michael Hanselmann
734 989a8bee Michael Hanselmann
  def Close(self):
735 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
736 989a8bee Michael Hanselmann

737 989a8bee Michael Hanselmann
    """
738 989a8bee Michael Hanselmann
    self._notifier.stop()
739 989a8bee Michael Hanselmann
740 989a8bee Michael Hanselmann
741 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
742 989a8bee Michael Hanselmann
  def __init__(self, filename):
743 989a8bee Michael Hanselmann
    """Initializes this class.
744 989a8bee Michael Hanselmann

745 989a8bee Michael Hanselmann
    @type filename: string
746 989a8bee Michael Hanselmann
    @param filename: Path to job file
747 989a8bee Michael Hanselmann

748 989a8bee Michael Hanselmann
    """
749 989a8bee Michael Hanselmann
    self._filewaiter = None
750 989a8bee Michael Hanselmann
    self._filename = filename
751 6c2549d6 Guido Trotter
752 989a8bee Michael Hanselmann
  def Wait(self, timeout):
753 989a8bee Michael Hanselmann
    """Waits for a job to change.
754 6c2549d6 Guido Trotter

755 989a8bee Michael Hanselmann
    @type timeout: float
756 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
757 989a8bee Michael Hanselmann
    @return: Whether there have been events
758 989a8bee Michael Hanselmann

759 989a8bee Michael Hanselmann
    """
760 989a8bee Michael Hanselmann
    if self._filewaiter:
761 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
762 989a8bee Michael Hanselmann
763 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
764 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
765 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
766 989a8bee Michael Hanselmann
    # race condition.
767 989a8bee Michael Hanselmann
    self._filewaiter = _JobFileChangesWaiter(self._filename)
768 989a8bee Michael Hanselmann
769 989a8bee Michael Hanselmann
    return True
770 989a8bee Michael Hanselmann
771 989a8bee Michael Hanselmann
  def Close(self):
772 989a8bee Michael Hanselmann
    """Closes underlying waiter.
773 989a8bee Michael Hanselmann

774 989a8bee Michael Hanselmann
    """
775 989a8bee Michael Hanselmann
    if self._filewaiter:
776 989a8bee Michael Hanselmann
      self._filewaiter.Close()
777 989a8bee Michael Hanselmann
778 989a8bee Michael Hanselmann
779 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
780 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
781 989a8bee Michael Hanselmann

782 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
783 989a8bee Michael Hanselmann
  the current job status has changed.
784 989a8bee Michael Hanselmann

785 989a8bee Michael Hanselmann
  """
786 989a8bee Michael Hanselmann
  @staticmethod
787 dfc8824a Michael Hanselmann
  def _CheckForChanges(counter, job_load_fn, check_fn):
788 dfc8824a Michael Hanselmann
    if counter.next() > 0:
789 dfc8824a Michael Hanselmann
      # If this isn't the first check the job is given some more time to change
790 dfc8824a Michael Hanselmann
      # again. This gives better performance for jobs generating many
791 dfc8824a Michael Hanselmann
      # changes/messages.
792 dfc8824a Michael Hanselmann
      time.sleep(0.1)
793 dfc8824a Michael Hanselmann
794 989a8bee Michael Hanselmann
    job = job_load_fn()
795 989a8bee Michael Hanselmann
    if not job:
796 989a8bee Michael Hanselmann
      raise errors.JobLost()
797 989a8bee Michael Hanselmann
798 989a8bee Michael Hanselmann
    result = check_fn(job)
799 989a8bee Michael Hanselmann
    if result is None:
800 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
801 989a8bee Michael Hanselmann
802 989a8bee Michael Hanselmann
    return result
803 989a8bee Michael Hanselmann
804 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
805 989a8bee Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout):
806 989a8bee Michael Hanselmann
    """Waits for changes on a job.
807 989a8bee Michael Hanselmann

808 989a8bee Michael Hanselmann
    @type filename: string
809 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
810 989a8bee Michael Hanselmann
    @type job_load_fn: callable
811 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
812 989a8bee Michael Hanselmann
    @type fields: list of strings
813 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
814 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
815 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
816 989a8bee Michael Hanselmann
    @type prev_log_serial: int
817 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
818 989a8bee Michael Hanselmann
    @type timeout: float
819 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
820 989a8bee Michael Hanselmann

821 989a8bee Michael Hanselmann
    """
822 dfc8824a Michael Hanselmann
    counter = itertools.count()
823 6c2549d6 Guido Trotter
    try:
824 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
825 989a8bee Michael Hanselmann
      waiter = _JobChangesWaiter(filename)
826 989a8bee Michael Hanselmann
      try:
827 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
828 dfc8824a Michael Hanselmann
                                          counter, job_load_fn, check_fn),
829 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
830 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
831 989a8bee Michael Hanselmann
      finally:
832 989a8bee Michael Hanselmann
        waiter.Close()
833 6c2549d6 Guido Trotter
    except (errors.InotifyError, errors.JobLost):
834 6c2549d6 Guido Trotter
      return None
835 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
836 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
837 6c2549d6 Guido Trotter
838 6c2549d6 Guido Trotter
839 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
840 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
841 6760e4ed Michael Hanselmann

842 6760e4ed Michael Hanselmann
  """
843 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
844 6760e4ed Michael Hanselmann
    to_encode = err
845 6760e4ed Michael Hanselmann
  else:
846 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
847 6760e4ed Michael Hanselmann
848 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
849 6760e4ed Michael Hanselmann
850 6760e4ed Michael Hanselmann
851 26d3fd2f Michael Hanselmann
class _TimeoutStrategyWrapper:
852 26d3fd2f Michael Hanselmann
  def __init__(self, fn):
853 26d3fd2f Michael Hanselmann
    """Initializes this class.
854 26d3fd2f Michael Hanselmann

855 26d3fd2f Michael Hanselmann
    """
856 26d3fd2f Michael Hanselmann
    self._fn = fn
857 26d3fd2f Michael Hanselmann
    self._next = None
858 26d3fd2f Michael Hanselmann
859 26d3fd2f Michael Hanselmann
  def _Advance(self):
860 26d3fd2f Michael Hanselmann
    """Gets the next timeout if necessary.
861 26d3fd2f Michael Hanselmann

862 26d3fd2f Michael Hanselmann
    """
863 26d3fd2f Michael Hanselmann
    if self._next is None:
864 26d3fd2f Michael Hanselmann
      self._next = self._fn()
865 26d3fd2f Michael Hanselmann
866 26d3fd2f Michael Hanselmann
  def Peek(self):
867 26d3fd2f Michael Hanselmann
    """Returns the next timeout.
868 26d3fd2f Michael Hanselmann

869 26d3fd2f Michael Hanselmann
    """
870 26d3fd2f Michael Hanselmann
    self._Advance()
871 26d3fd2f Michael Hanselmann
    return self._next
872 26d3fd2f Michael Hanselmann
873 26d3fd2f Michael Hanselmann
  def Next(self):
874 26d3fd2f Michael Hanselmann
    """Returns the current timeout and advances the internal state.
875 26d3fd2f Michael Hanselmann

876 26d3fd2f Michael Hanselmann
    """
877 26d3fd2f Michael Hanselmann
    self._Advance()
878 26d3fd2f Michael Hanselmann
    result = self._next
879 26d3fd2f Michael Hanselmann
    self._next = None
880 26d3fd2f Michael Hanselmann
    return result
881 26d3fd2f Michael Hanselmann
882 26d3fd2f Michael Hanselmann
883 b80cc518 Michael Hanselmann
class _OpExecContext:
884 26d3fd2f Michael Hanselmann
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
885 b80cc518 Michael Hanselmann
    """Initializes this class.
886 b80cc518 Michael Hanselmann

887 b80cc518 Michael Hanselmann
    """
888 b80cc518 Michael Hanselmann
    self.op = op
889 b80cc518 Michael Hanselmann
    self.index = index
890 b80cc518 Michael Hanselmann
    self.log_prefix = log_prefix
891 b80cc518 Michael Hanselmann
    self.summary = op.input.Summary()
892 b80cc518 Michael Hanselmann
893 b95479a5 Michael Hanselmann
    # Create local copy to modify
894 b95479a5 Michael Hanselmann
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
895 b95479a5 Michael Hanselmann
      self.jobdeps = op.input.depends[:]
896 b95479a5 Michael Hanselmann
    else:
897 b95479a5 Michael Hanselmann
      self.jobdeps = None
898 b95479a5 Michael Hanselmann
899 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = timeout_strategy_factory
900 26d3fd2f Michael Hanselmann
    self._ResetTimeoutStrategy()
901 26d3fd2f Michael Hanselmann
902 26d3fd2f Michael Hanselmann
  def _ResetTimeoutStrategy(self):
903 26d3fd2f Michael Hanselmann
    """Creates a new timeout strategy.
904 26d3fd2f Michael Hanselmann

905 26d3fd2f Michael Hanselmann
    """
906 26d3fd2f Michael Hanselmann
    self._timeout_strategy = \
907 26d3fd2f Michael Hanselmann
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
908 26d3fd2f Michael Hanselmann
909 26d3fd2f Michael Hanselmann
  def CheckPriorityIncrease(self):
910 26d3fd2f Michael Hanselmann
    """Checks whether priority can and should be increased.
911 26d3fd2f Michael Hanselmann

912 26d3fd2f Michael Hanselmann
    Called when locks couldn't be acquired.
913 26d3fd2f Michael Hanselmann

914 26d3fd2f Michael Hanselmann
    """
915 26d3fd2f Michael Hanselmann
    op = self.op
916 26d3fd2f Michael Hanselmann
917 26d3fd2f Michael Hanselmann
    # Exhausted all retries and next round should not use blocking acquire
918 26d3fd2f Michael Hanselmann
    # for locks?
919 26d3fd2f Michael Hanselmann
    if (self._timeout_strategy.Peek() is None and
920 26d3fd2f Michael Hanselmann
        op.priority > constants.OP_PRIO_HIGHEST):
921 26d3fd2f Michael Hanselmann
      logging.debug("Increasing priority")
922 26d3fd2f Michael Hanselmann
      op.priority -= 1
923 26d3fd2f Michael Hanselmann
      self._ResetTimeoutStrategy()
924 26d3fd2f Michael Hanselmann
      return True
925 26d3fd2f Michael Hanselmann
926 26d3fd2f Michael Hanselmann
    return False
927 26d3fd2f Michael Hanselmann
928 26d3fd2f Michael Hanselmann
  def GetNextLockTimeout(self):
929 26d3fd2f Michael Hanselmann
    """Returns the next lock acquire timeout.
930 26d3fd2f Michael Hanselmann

931 26d3fd2f Michael Hanselmann
    """
932 26d3fd2f Michael Hanselmann
    return self._timeout_strategy.Next()
933 26d3fd2f Michael Hanselmann
934 b80cc518 Michael Hanselmann
935 be760ba8 Michael Hanselmann
class _JobProcessor(object):
936 75d81fc8 Michael Hanselmann
  (DEFER,
937 75d81fc8 Michael Hanselmann
   WAITDEP,
938 75d81fc8 Michael Hanselmann
   FINISHED) = range(1, 4)
939 75d81fc8 Michael Hanselmann
940 26d3fd2f Michael Hanselmann
  def __init__(self, queue, opexec_fn, job,
941 26d3fd2f Michael Hanselmann
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
942 be760ba8 Michael Hanselmann
    """Initializes this class.
943 be760ba8 Michael Hanselmann

944 be760ba8 Michael Hanselmann
    """
945 be760ba8 Michael Hanselmann
    self.queue = queue
946 be760ba8 Michael Hanselmann
    self.opexec_fn = opexec_fn
947 be760ba8 Michael Hanselmann
    self.job = job
948 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = _timeout_strategy_factory
949 be760ba8 Michael Hanselmann
950 be760ba8 Michael Hanselmann
  @staticmethod
951 26d3fd2f Michael Hanselmann
  def _FindNextOpcode(job, timeout_strategy_factory):
952 be760ba8 Michael Hanselmann
    """Locates the next opcode to run.
953 be760ba8 Michael Hanselmann

954 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
955 be760ba8 Michael Hanselmann
    @param job: Job object
956 26d3fd2f Michael Hanselmann
    @param timeout_strategy_factory: Callable to create new timeout strategy
957 be760ba8 Michael Hanselmann

958 be760ba8 Michael Hanselmann
    """
959 be760ba8 Michael Hanselmann
    # Create some sort of a cache to speed up locating next opcode for future
960 be760ba8 Michael Hanselmann
    # lookups
961 be760ba8 Michael Hanselmann
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
962 be760ba8 Michael Hanselmann
    # pending and one for processed ops.
963 03b63608 Michael Hanselmann
    if job.ops_iter is None:
964 03b63608 Michael Hanselmann
      job.ops_iter = enumerate(job.ops)
965 be760ba8 Michael Hanselmann
966 be760ba8 Michael Hanselmann
    # Find next opcode to run
967 be760ba8 Michael Hanselmann
    while True:
968 be760ba8 Michael Hanselmann
      try:
969 03b63608 Michael Hanselmann
        (idx, op) = job.ops_iter.next()
970 be760ba8 Michael Hanselmann
      except StopIteration:
971 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for a finished job")
972 be760ba8 Michael Hanselmann
973 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_RUNNING:
974 be760ba8 Michael Hanselmann
        # Found an opcode already marked as running
975 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for job marked as running")
976 be760ba8 Michael Hanselmann
977 26d3fd2f Michael Hanselmann
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
978 26d3fd2f Michael Hanselmann
                             timeout_strategy_factory)
979 be760ba8 Michael Hanselmann
980 66bd7445 Michael Hanselmann
      if op.status not in constants.OPS_FINALIZED:
981 66bd7445 Michael Hanselmann
        return opctx
982 be760ba8 Michael Hanselmann
983 66bd7445 Michael Hanselmann
      # This is a job that was partially completed before master daemon
984 66bd7445 Michael Hanselmann
      # shutdown, so it can be expected that some opcodes are already
985 66bd7445 Michael Hanselmann
      # completed successfully (if any did error out, then the whole job
986 66bd7445 Michael Hanselmann
      # should have been aborted and not resubmitted for processing).
987 66bd7445 Michael Hanselmann
      logging.info("%s: opcode %s already processed, skipping",
988 66bd7445 Michael Hanselmann
                   opctx.log_prefix, opctx.summary)
989 be760ba8 Michael Hanselmann
990 be760ba8 Michael Hanselmann
  @staticmethod
991 be760ba8 Michael Hanselmann
  def _MarkWaitlock(job, op):
992 be760ba8 Michael Hanselmann
    """Marks an opcode as waiting for locks.
993 be760ba8 Michael Hanselmann

994 be760ba8 Michael Hanselmann
    The job's start timestamp is also set if necessary.
995 be760ba8 Michael Hanselmann

996 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
997 be760ba8 Michael Hanselmann
    @param job: Job object
998 a38e8674 Michael Hanselmann
    @type op: L{_QueuedOpCode}
999 a38e8674 Michael Hanselmann
    @param op: Opcode object
1000 be760ba8 Michael Hanselmann

1001 be760ba8 Michael Hanselmann
    """
1002 be760ba8 Michael Hanselmann
    assert op in job.ops
1003 5fd6b694 Michael Hanselmann
    assert op.status in (constants.OP_STATUS_QUEUED,
1004 47099cd1 Michael Hanselmann
                         constants.OP_STATUS_WAITING)
1005 5fd6b694 Michael Hanselmann
1006 5fd6b694 Michael Hanselmann
    update = False
1007 be760ba8 Michael Hanselmann
1008 be760ba8 Michael Hanselmann
    op.result = None
1009 5fd6b694 Michael Hanselmann
1010 5fd6b694 Michael Hanselmann
    if op.status == constants.OP_STATUS_QUEUED:
1011 47099cd1 Michael Hanselmann
      op.status = constants.OP_STATUS_WAITING
1012 5fd6b694 Michael Hanselmann
      update = True
1013 5fd6b694 Michael Hanselmann
1014 5fd6b694 Michael Hanselmann
    if op.start_timestamp is None:
1015 5fd6b694 Michael Hanselmann
      op.start_timestamp = TimeStampNow()
1016 5fd6b694 Michael Hanselmann
      update = True
1017 be760ba8 Michael Hanselmann
1018 be760ba8 Michael Hanselmann
    if job.start_timestamp is None:
1019 be760ba8 Michael Hanselmann
      job.start_timestamp = op.start_timestamp
1020 5fd6b694 Michael Hanselmann
      update = True
1021 5fd6b694 Michael Hanselmann
1022 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1023 5fd6b694 Michael Hanselmann
1024 5fd6b694 Michael Hanselmann
    return update
1025 be760ba8 Michael Hanselmann
1026 b95479a5 Michael Hanselmann
  @staticmethod
1027 b95479a5 Michael Hanselmann
  def _CheckDependencies(queue, job, opctx):
1028 b95479a5 Michael Hanselmann
    """Checks if an opcode has dependencies and if so, processes them.
1029 b95479a5 Michael Hanselmann

1030 b95479a5 Michael Hanselmann
    @type queue: L{JobQueue}
1031 b95479a5 Michael Hanselmann
    @param queue: Queue object
1032 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1033 b95479a5 Michael Hanselmann
    @param job: Job object
1034 b95479a5 Michael Hanselmann
    @type opctx: L{_OpExecContext}
1035 b95479a5 Michael Hanselmann
    @param opctx: Opcode execution context
1036 b95479a5 Michael Hanselmann
    @rtype: bool
1037 b95479a5 Michael Hanselmann
    @return: Whether opcode will be re-scheduled by dependency tracker
1038 b95479a5 Michael Hanselmann

1039 b95479a5 Michael Hanselmann
    """
1040 b95479a5 Michael Hanselmann
    op = opctx.op
1041 b95479a5 Michael Hanselmann
1042 b95479a5 Michael Hanselmann
    result = False
1043 b95479a5 Michael Hanselmann
1044 b95479a5 Michael Hanselmann
    while opctx.jobdeps:
1045 b95479a5 Michael Hanselmann
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1046 b95479a5 Michael Hanselmann
1047 b95479a5 Michael Hanselmann
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1048 b95479a5 Michael Hanselmann
                                                          dep_status)
1049 b95479a5 Michael Hanselmann
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1050 b95479a5 Michael Hanselmann
1051 b95479a5 Michael Hanselmann
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1052 b95479a5 Michael Hanselmann
1053 b95479a5 Michael Hanselmann
      if depresult == _JobDependencyManager.CONTINUE:
1054 b95479a5 Michael Hanselmann
        # Remove dependency and continue
1055 b95479a5 Michael Hanselmann
        opctx.jobdeps.pop(0)
1056 b95479a5 Michael Hanselmann
1057 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.WAIT:
1058 b95479a5 Michael Hanselmann
        # Need to wait for notification, dependency tracker will re-add job
1059 b95479a5 Michael Hanselmann
        # to workerpool
1060 b95479a5 Michael Hanselmann
        result = True
1061 b95479a5 Michael Hanselmann
        break
1062 b95479a5 Michael Hanselmann
1063 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.CANCEL:
1064 b95479a5 Michael Hanselmann
        # Job was cancelled, cancel this job as well
1065 b95479a5 Michael Hanselmann
        job.Cancel()
1066 b95479a5 Michael Hanselmann
        assert op.status == constants.OP_STATUS_CANCELING
1067 b95479a5 Michael Hanselmann
        break
1068 b95479a5 Michael Hanselmann
1069 b95479a5 Michael Hanselmann
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1070 b95479a5 Michael Hanselmann
                         _JobDependencyManager.ERROR):
1071 b95479a5 Michael Hanselmann
        # Job failed or there was an error, this job must fail
1072 b95479a5 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1073 b95479a5 Michael Hanselmann
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1074 b95479a5 Michael Hanselmann
        break
1075 b95479a5 Michael Hanselmann
1076 b95479a5 Michael Hanselmann
      else:
1077 b95479a5 Michael Hanselmann
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1078 b95479a5 Michael Hanselmann
                                     depresult)
1079 b95479a5 Michael Hanselmann
1080 b95479a5 Michael Hanselmann
    return result
1081 b95479a5 Michael Hanselmann
1082 b80cc518 Michael Hanselmann
  def _ExecOpCodeUnlocked(self, opctx):
1083 be760ba8 Michael Hanselmann
    """Processes one opcode and returns the result.
1084 be760ba8 Michael Hanselmann

1085 be760ba8 Michael Hanselmann
    """
1086 b80cc518 Michael Hanselmann
    op = opctx.op
1087 b80cc518 Michael Hanselmann
1088 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1089 be760ba8 Michael Hanselmann
1090 26d3fd2f Michael Hanselmann
    timeout = opctx.GetNextLockTimeout()
1091 26d3fd2f Michael Hanselmann
1092 be760ba8 Michael Hanselmann
    try:
1093 be760ba8 Michael Hanselmann
      # Make sure not to hold queue lock while calling ExecOpCode
1094 be760ba8 Michael Hanselmann
      result = self.opexec_fn(op.input,
1095 26d3fd2f Michael Hanselmann
                              _OpExecCallbacks(self.queue, self.job, op),
1096 e4e59de8 Michael Hanselmann
                              timeout=timeout)
1097 26d3fd2f Michael Hanselmann
    except mcpu.LockAcquireTimeout:
1098 26d3fd2f Michael Hanselmann
      assert timeout is not None, "Received timeout for blocking acquire"
1099 26d3fd2f Michael Hanselmann
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1100 9e49dfc5 Michael Hanselmann
1101 47099cd1 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_WAITING,
1102 9e49dfc5 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1103 9e49dfc5 Michael Hanselmann
1104 9e49dfc5 Michael Hanselmann
      # Was job cancelled while we were waiting for the lock?
1105 9e49dfc5 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELING:
1106 9e49dfc5 Michael Hanselmann
        return (constants.OP_STATUS_CANCELING, None)
1107 9e49dfc5 Michael Hanselmann
1108 942e2262 Michael Hanselmann
      # Queue is shutting down, return to queued
1109 942e2262 Michael Hanselmann
      if not self.queue.AcceptingJobsUnlocked():
1110 942e2262 Michael Hanselmann
        return (constants.OP_STATUS_QUEUED, None)
1111 942e2262 Michael Hanselmann
1112 5fd6b694 Michael Hanselmann
      # Stay in waitlock while trying to re-acquire lock
1113 47099cd1 Michael Hanselmann
      return (constants.OP_STATUS_WAITING, None)
1114 be760ba8 Michael Hanselmann
    except CancelJob:
1115 b80cc518 Michael Hanselmann
      logging.exception("%s: Canceling job", opctx.log_prefix)
1116 be760ba8 Michael Hanselmann
      assert op.status == constants.OP_STATUS_CANCELING
1117 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_CANCELING, None)
1118 942e2262 Michael Hanselmann
1119 942e2262 Michael Hanselmann
    except QueueShutdown:
1120 942e2262 Michael Hanselmann
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1121 942e2262 Michael Hanselmann
1122 942e2262 Michael Hanselmann
      assert op.status == constants.OP_STATUS_WAITING
1123 942e2262 Michael Hanselmann
1124 942e2262 Michael Hanselmann
      # Job hadn't been started yet, so it should return to the queue
1125 942e2262 Michael Hanselmann
      return (constants.OP_STATUS_QUEUED, None)
1126 942e2262 Michael Hanselmann
1127 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1128 b80cc518 Michael Hanselmann
      logging.exception("%s: Caught exception in %s",
1129 b80cc518 Michael Hanselmann
                        opctx.log_prefix, opctx.summary)
1130 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1131 be760ba8 Michael Hanselmann
    else:
1132 b80cc518 Michael Hanselmann
      logging.debug("%s: %s successful",
1133 b80cc518 Michael Hanselmann
                    opctx.log_prefix, opctx.summary)
1134 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_SUCCESS, result)
1135 be760ba8 Michael Hanselmann
1136 26d3fd2f Michael Hanselmann
  def __call__(self, _nextop_fn=None):
1137 be760ba8 Michael Hanselmann
    """Continues execution of a job.
1138 be760ba8 Michael Hanselmann

1139 26d3fd2f Michael Hanselmann
    @param _nextop_fn: Callback function for tests
1140 75d81fc8 Michael Hanselmann
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1141 75d81fc8 Michael Hanselmann
      be deferred and C{WAITDEP} if the dependency manager
1142 75d81fc8 Michael Hanselmann
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1143 be760ba8 Michael Hanselmann

1144 be760ba8 Michael Hanselmann
    """
1145 be760ba8 Michael Hanselmann
    queue = self.queue
1146 be760ba8 Michael Hanselmann
    job = self.job
1147 be760ba8 Michael Hanselmann
1148 be760ba8 Michael Hanselmann
    logging.debug("Processing job %s", job.id)
1149 be760ba8 Michael Hanselmann
1150 be760ba8 Michael Hanselmann
    queue.acquire(shared=1)
1151 be760ba8 Michael Hanselmann
    try:
1152 be760ba8 Michael Hanselmann
      opcount = len(job.ops)
1153 be760ba8 Michael Hanselmann
1154 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Expected writable job"
1155 c0f6d0d8 Michael Hanselmann
1156 66bd7445 Michael Hanselmann
      # Don't do anything for finalized jobs
1157 66bd7445 Michael Hanselmann
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1158 75d81fc8 Michael Hanselmann
        return self.FINISHED
1159 66bd7445 Michael Hanselmann
1160 26d3fd2f Michael Hanselmann
      # Is a previous opcode still pending?
1161 26d3fd2f Michael Hanselmann
      if job.cur_opctx:
1162 26d3fd2f Michael Hanselmann
        opctx = job.cur_opctx
1163 5fd6b694 Michael Hanselmann
        job.cur_opctx = None
1164 26d3fd2f Michael Hanselmann
      else:
1165 26d3fd2f Michael Hanselmann
        if __debug__ and _nextop_fn:
1166 26d3fd2f Michael Hanselmann
          _nextop_fn()
1167 26d3fd2f Michael Hanselmann
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1168 26d3fd2f Michael Hanselmann
1169 b80cc518 Michael Hanselmann
      op = opctx.op
1170 be760ba8 Michael Hanselmann
1171 be760ba8 Michael Hanselmann
      # Consistency check
1172 be760ba8 Michael Hanselmann
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1173 66bd7445 Michael Hanselmann
                                     constants.OP_STATUS_CANCELING)
1174 5fd6b694 Michael Hanselmann
                        for i in job.ops[opctx.index + 1:])
1175 be760ba8 Michael Hanselmann
1176 be760ba8 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_QUEUED,
1177 47099cd1 Michael Hanselmann
                           constants.OP_STATUS_WAITING,
1178 66bd7445 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1179 be760ba8 Michael Hanselmann
1180 26d3fd2f Michael Hanselmann
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1181 26d3fd2f Michael Hanselmann
              op.priority >= constants.OP_PRIO_HIGHEST)
1182 26d3fd2f Michael Hanselmann
1183 b95479a5 Michael Hanselmann
      waitjob = None
1184 b95479a5 Michael Hanselmann
1185 66bd7445 Michael Hanselmann
      if op.status != constants.OP_STATUS_CANCELING:
1186 30c945d0 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
1187 47099cd1 Michael Hanselmann
                             constants.OP_STATUS_WAITING)
1188 30c945d0 Michael Hanselmann
1189 be760ba8 Michael Hanselmann
        # Prepare to start opcode
1190 5fd6b694 Michael Hanselmann
        if self._MarkWaitlock(job, op):
1191 5fd6b694 Michael Hanselmann
          # Write to disk
1192 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1193 be760ba8 Michael Hanselmann
1194 47099cd1 Michael Hanselmann
        assert op.status == constants.OP_STATUS_WAITING
1195 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1196 5fd6b694 Michael Hanselmann
        assert job.start_timestamp and op.start_timestamp
1197 b95479a5 Michael Hanselmann
        assert waitjob is None
1198 b95479a5 Michael Hanselmann
1199 b95479a5 Michael Hanselmann
        # Check if waiting for a job is necessary
1200 b95479a5 Michael Hanselmann
        waitjob = self._CheckDependencies(queue, job, opctx)
1201 be760ba8 Michael Hanselmann
1202 47099cd1 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_WAITING,
1203 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_CANCELING,
1204 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_ERROR)
1205 be760ba8 Michael Hanselmann
1206 b95479a5 Michael Hanselmann
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1207 b95479a5 Michael Hanselmann
                                         constants.OP_STATUS_ERROR)):
1208 b95479a5 Michael Hanselmann
          logging.info("%s: opcode %s waiting for locks",
1209 b95479a5 Michael Hanselmann
                       opctx.log_prefix, opctx.summary)
1210 be760ba8 Michael Hanselmann
1211 b95479a5 Michael Hanselmann
          assert not opctx.jobdeps, "Not all dependencies were removed"
1212 b95479a5 Michael Hanselmann
1213 b95479a5 Michael Hanselmann
          queue.release()
1214 b95479a5 Michael Hanselmann
          try:
1215 b95479a5 Michael Hanselmann
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1216 b95479a5 Michael Hanselmann
          finally:
1217 b95479a5 Michael Hanselmann
            queue.acquire(shared=1)
1218 b95479a5 Michael Hanselmann
1219 b95479a5 Michael Hanselmann
          op.status = op_status
1220 b95479a5 Michael Hanselmann
          op.result = op_result
1221 b95479a5 Michael Hanselmann
1222 b95479a5 Michael Hanselmann
          assert not waitjob
1223 be760ba8 Michael Hanselmann
1224 942e2262 Michael Hanselmann
        if op.status in (constants.OP_STATUS_WAITING,
1225 942e2262 Michael Hanselmann
                         constants.OP_STATUS_QUEUED):
1226 942e2262 Michael Hanselmann
          # waiting: Couldn't get locks in time
1227 942e2262 Michael Hanselmann
          # queued: Queue is shutting down
1228 26d3fd2f Michael Hanselmann
          assert not op.end_timestamp
1229 be760ba8 Michael Hanselmann
        else:
1230 26d3fd2f Michael Hanselmann
          # Finalize opcode
1231 26d3fd2f Michael Hanselmann
          op.end_timestamp = TimeStampNow()
1232 be760ba8 Michael Hanselmann
1233 26d3fd2f Michael Hanselmann
          if op.status == constants.OP_STATUS_CANCELING:
1234 26d3fd2f Michael Hanselmann
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1235 26d3fd2f Michael Hanselmann
                                  for i in job.ops[opctx.index:])
1236 26d3fd2f Michael Hanselmann
          else:
1237 26d3fd2f Michael Hanselmann
            assert op.status in constants.OPS_FINALIZED
1238 be760ba8 Michael Hanselmann
1239 942e2262 Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
1240 942e2262 Michael Hanselmann
        # Queue is shutting down
1241 942e2262 Michael Hanselmann
        assert not waitjob
1242 942e2262 Michael Hanselmann
1243 942e2262 Michael Hanselmann
        finalize = False
1244 942e2262 Michael Hanselmann
1245 942e2262 Michael Hanselmann
        # Reset context
1246 942e2262 Michael Hanselmann
        job.cur_opctx = None
1247 942e2262 Michael Hanselmann
1248 942e2262 Michael Hanselmann
        # In no case must the status be finalized here
1249 942e2262 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1250 942e2262 Michael Hanselmann
1251 942e2262 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1252 be760ba8 Michael Hanselmann
        finalize = False
1253 be760ba8 Michael Hanselmann
1254 b95479a5 Michael Hanselmann
        if not waitjob and opctx.CheckPriorityIncrease():
1255 5fd6b694 Michael Hanselmann
          # Priority was changed, need to update on-disk file
1256 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1257 be760ba8 Michael Hanselmann
1258 26d3fd2f Michael Hanselmann
        # Keep around for another round
1259 26d3fd2f Michael Hanselmann
        job.cur_opctx = opctx
1260 be760ba8 Michael Hanselmann
1261 26d3fd2f Michael Hanselmann
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1262 26d3fd2f Michael Hanselmann
                op.priority >= constants.OP_PRIO_HIGHEST)
1263 be760ba8 Michael Hanselmann
1264 26d3fd2f Michael Hanselmann
        # In no case must the status be finalized here
1265 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1266 be760ba8 Michael Hanselmann
1267 be760ba8 Michael Hanselmann
      else:
1268 26d3fd2f Michael Hanselmann
        # Ensure all opcodes so far have been successful
1269 26d3fd2f Michael Hanselmann
        assert (opctx.index == 0 or
1270 26d3fd2f Michael Hanselmann
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1271 26d3fd2f Michael Hanselmann
                           for i in job.ops[:opctx.index]))
1272 26d3fd2f Michael Hanselmann
1273 26d3fd2f Michael Hanselmann
        # Reset context
1274 26d3fd2f Michael Hanselmann
        job.cur_opctx = None
1275 26d3fd2f Michael Hanselmann
1276 26d3fd2f Michael Hanselmann
        if op.status == constants.OP_STATUS_SUCCESS:
1277 26d3fd2f Michael Hanselmann
          finalize = False
1278 26d3fd2f Michael Hanselmann
1279 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_ERROR:
1280 26d3fd2f Michael Hanselmann
          # Ensure failed opcode has an exception as its result
1281 26d3fd2f Michael Hanselmann
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1282 26d3fd2f Michael Hanselmann
1283 26d3fd2f Michael Hanselmann
          to_encode = errors.OpExecError("Preceding opcode failed")
1284 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1285 26d3fd2f Michael Hanselmann
                                _EncodeOpError(to_encode))
1286 26d3fd2f Michael Hanselmann
          finalize = True
1287 be760ba8 Michael Hanselmann
1288 26d3fd2f Michael Hanselmann
          # Consistency check
1289 26d3fd2f Michael Hanselmann
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1290 26d3fd2f Michael Hanselmann
                            errors.GetEncodedError(i.result)
1291 26d3fd2f Michael Hanselmann
                            for i in job.ops[opctx.index:])
1292 be760ba8 Michael Hanselmann
1293 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELING:
1294 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1295 26d3fd2f Michael Hanselmann
                                "Job canceled by request")
1296 26d3fd2f Michael Hanselmann
          finalize = True
1297 26d3fd2f Michael Hanselmann
1298 26d3fd2f Michael Hanselmann
        else:
1299 26d3fd2f Michael Hanselmann
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1300 26d3fd2f Michael Hanselmann
1301 66bd7445 Michael Hanselmann
        if opctx.index == (opcount - 1):
1302 66bd7445 Michael Hanselmann
          # Finalize on last opcode
1303 66bd7445 Michael Hanselmann
          finalize = True
1304 66bd7445 Michael Hanselmann
1305 66bd7445 Michael Hanselmann
        if finalize:
1306 26d3fd2f Michael Hanselmann
          # All opcodes have been run, finalize job
1307 66bd7445 Michael Hanselmann
          job.Finalize()
1308 26d3fd2f Michael Hanselmann
1309 26d3fd2f Michael Hanselmann
        # Write to disk. If the job status is final, this is the final write
1310 26d3fd2f Michael Hanselmann
        # allowed. Once the file has been written, it can be archived anytime.
1311 26d3fd2f Michael Hanselmann
        queue.UpdateJobUnlocked(job)
1312 be760ba8 Michael Hanselmann
1313 b95479a5 Michael Hanselmann
        assert not waitjob
1314 b95479a5 Michael Hanselmann
1315 66bd7445 Michael Hanselmann
        if finalize:
1316 26d3fd2f Michael Hanselmann
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1317 75d81fc8 Michael Hanselmann
          return self.FINISHED
1318 be760ba8 Michael Hanselmann
1319 b95479a5 Michael Hanselmann
      assert not waitjob or queue.depmgr.JobWaiting(job)
1320 b95479a5 Michael Hanselmann
1321 75d81fc8 Michael Hanselmann
      if waitjob:
1322 75d81fc8 Michael Hanselmann
        return self.WAITDEP
1323 75d81fc8 Michael Hanselmann
      else:
1324 75d81fc8 Michael Hanselmann
        return self.DEFER
1325 be760ba8 Michael Hanselmann
    finally:
1326 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Job became read-only while being processed"
1327 be760ba8 Michael Hanselmann
      queue.release()
1328 be760ba8 Michael Hanselmann
1329 be760ba8 Michael Hanselmann
1330 df5a5730 Michael Hanselmann
def _EvaluateJobProcessorResult(depmgr, job, result):
1331 df5a5730 Michael Hanselmann
  """Looks at a result from L{_JobProcessor} for a job.
1332 df5a5730 Michael Hanselmann

1333 df5a5730 Michael Hanselmann
  To be used in a L{_JobQueueWorker}.
1334 df5a5730 Michael Hanselmann

1335 df5a5730 Michael Hanselmann
  """
1336 df5a5730 Michael Hanselmann
  if result == _JobProcessor.FINISHED:
1337 df5a5730 Michael Hanselmann
    # Notify waiting jobs
1338 df5a5730 Michael Hanselmann
    depmgr.NotifyWaiters(job.id)
1339 df5a5730 Michael Hanselmann
1340 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.DEFER:
1341 df5a5730 Michael Hanselmann
    # Schedule again
1342 df5a5730 Michael Hanselmann
    raise workerpool.DeferTask(priority=job.CalcPriority())
1343 df5a5730 Michael Hanselmann
1344 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.WAITDEP:
1345 df5a5730 Michael Hanselmann
    # No-op, dependency manager will re-schedule
1346 df5a5730 Michael Hanselmann
    pass
1347 df5a5730 Michael Hanselmann
1348 df5a5730 Michael Hanselmann
  else:
1349 df5a5730 Michael Hanselmann
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1350 df5a5730 Michael Hanselmann
                                 (result, ))
1351 df5a5730 Michael Hanselmann
1352 df5a5730 Michael Hanselmann
1353 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
1354 031a3e57 Michael Hanselmann
  """The actual job workers.
1355 031a3e57 Michael Hanselmann

1356 031a3e57 Michael Hanselmann
  """
1357 b459a848 Andrea Spadaccini
  def RunTask(self, job): # pylint: disable=W0221
1358 e2715f69 Michael Hanselmann
    """Job executor.
1359 e2715f69 Michael Hanselmann

1360 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1361 ea03467c Iustin Pop
    @param job: the job to be processed
1362 ea03467c Iustin Pop

1363 e2715f69 Michael Hanselmann
    """
1364 f8a4adfa Michael Hanselmann
    assert job.writable, "Expected writable job"
1365 f8a4adfa Michael Hanselmann
1366 b95479a5 Michael Hanselmann
    # Ensure only one worker is active on a single job. If a job registers for
1367 b95479a5 Michael Hanselmann
    # a dependency job, and the other job notifies before the first worker is
1368 b95479a5 Michael Hanselmann
    # done, the job can end up in the tasklist more than once.
1369 b95479a5 Michael Hanselmann
    job.processor_lock.acquire()
1370 b95479a5 Michael Hanselmann
    try:
1371 b95479a5 Michael Hanselmann
      return self._RunTaskInner(job)
1372 b95479a5 Michael Hanselmann
    finally:
1373 b95479a5 Michael Hanselmann
      job.processor_lock.release()
1374 b95479a5 Michael Hanselmann
1375 b95479a5 Michael Hanselmann
  def _RunTaskInner(self, job):
1376 b95479a5 Michael Hanselmann
    """Executes a job.
1377 b95479a5 Michael Hanselmann

1378 b95479a5 Michael Hanselmann
    Must be called with per-job lock acquired.
1379 b95479a5 Michael Hanselmann

1380 b95479a5 Michael Hanselmann
    """
1381 be760ba8 Michael Hanselmann
    queue = job.queue
1382 be760ba8 Michael Hanselmann
    assert queue == self.pool.queue
1383 be760ba8 Michael Hanselmann
1384 0aeeb6e3 Michael Hanselmann
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1385 0aeeb6e3 Michael Hanselmann
    setname_fn(None)
1386 daba67c7 Michael Hanselmann
1387 be760ba8 Michael Hanselmann
    proc = mcpu.Processor(queue.context, job.id)
1388 be760ba8 Michael Hanselmann
1389 0aeeb6e3 Michael Hanselmann
    # Create wrapper for setting thread name
1390 0aeeb6e3 Michael Hanselmann
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1391 0aeeb6e3 Michael Hanselmann
                                    proc.ExecOpCode)
1392 0aeeb6e3 Michael Hanselmann
1393 df5a5730 Michael Hanselmann
    _EvaluateJobProcessorResult(queue.depmgr, job,
1394 df5a5730 Michael Hanselmann
                                _JobProcessor(queue, wrap_execop_fn, job)())
1395 75d81fc8 Michael Hanselmann
1396 0aeeb6e3 Michael Hanselmann
  @staticmethod
1397 0aeeb6e3 Michael Hanselmann
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1398 0aeeb6e3 Michael Hanselmann
    """Updates the worker thread name to include a short summary of the opcode.
1399 0aeeb6e3 Michael Hanselmann

1400 0aeeb6e3 Michael Hanselmann
    @param setname_fn: Callable setting worker thread name
1401 0aeeb6e3 Michael Hanselmann
    @param execop_fn: Callable for executing opcode (usually
1402 0aeeb6e3 Michael Hanselmann
                      L{mcpu.Processor.ExecOpCode})
1403 0aeeb6e3 Michael Hanselmann

1404 0aeeb6e3 Michael Hanselmann
    """
1405 0aeeb6e3 Michael Hanselmann
    setname_fn(op)
1406 0aeeb6e3 Michael Hanselmann
    try:
1407 0aeeb6e3 Michael Hanselmann
      return execop_fn(op, *args, **kwargs)
1408 0aeeb6e3 Michael Hanselmann
    finally:
1409 0aeeb6e3 Michael Hanselmann
      setname_fn(None)
1410 0aeeb6e3 Michael Hanselmann
1411 0aeeb6e3 Michael Hanselmann
  @staticmethod
1412 0aeeb6e3 Michael Hanselmann
  def _GetWorkerName(job, op):
1413 0aeeb6e3 Michael Hanselmann
    """Sets the worker thread name.
1414 0aeeb6e3 Michael Hanselmann

1415 0aeeb6e3 Michael Hanselmann
    @type job: L{_QueuedJob}
1416 0aeeb6e3 Michael Hanselmann
    @type op: L{opcodes.OpCode}
1417 0aeeb6e3 Michael Hanselmann

1418 0aeeb6e3 Michael Hanselmann
    """
1419 0aeeb6e3 Michael Hanselmann
    parts = ["Job%s" % job.id]
1420 0aeeb6e3 Michael Hanselmann
1421 0aeeb6e3 Michael Hanselmann
    if op:
1422 0aeeb6e3 Michael Hanselmann
      parts.append(op.TinySummary())
1423 0aeeb6e3 Michael Hanselmann
1424 0aeeb6e3 Michael Hanselmann
    return "/".join(parts)
1425 0aeeb6e3 Michael Hanselmann
1426 e2715f69 Michael Hanselmann
1427 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
1428 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
1429 ea03467c Iustin Pop

1430 ea03467c Iustin Pop
  """
1431 5bdce580 Michael Hanselmann
  def __init__(self, queue):
1432 0aeeb6e3 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("Jq",
1433 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
1434 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
1435 5bdce580 Michael Hanselmann
    self.queue = queue
1436 e2715f69 Michael Hanselmann
1437 e2715f69 Michael Hanselmann
1438 b95479a5 Michael Hanselmann
class _JobDependencyManager:
1439 b95479a5 Michael Hanselmann
  """Keeps track of job dependencies.
1440 b95479a5 Michael Hanselmann

1441 b95479a5 Michael Hanselmann
  """
1442 b95479a5 Michael Hanselmann
  (WAIT,
1443 b95479a5 Michael Hanselmann
   ERROR,
1444 b95479a5 Michael Hanselmann
   CANCEL,
1445 b95479a5 Michael Hanselmann
   CONTINUE,
1446 b95479a5 Michael Hanselmann
   WRONGSTATUS) = range(1, 6)
1447 b95479a5 Michael Hanselmann
1448 b95479a5 Michael Hanselmann
  def __init__(self, getstatus_fn, enqueue_fn):
1449 b95479a5 Michael Hanselmann
    """Initializes this class.
1450 b95479a5 Michael Hanselmann

1451 b95479a5 Michael Hanselmann
    """
1452 b95479a5 Michael Hanselmann
    self._getstatus_fn = getstatus_fn
1453 b95479a5 Michael Hanselmann
    self._enqueue_fn = enqueue_fn
1454 b95479a5 Michael Hanselmann
1455 b95479a5 Michael Hanselmann
    self._waiters = {}
1456 b95479a5 Michael Hanselmann
    self._lock = locking.SharedLock("JobDepMgr")
1457 b95479a5 Michael Hanselmann
1458 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1459 b459a848 Andrea Spadaccini
  def GetLockInfo(self, requested): # pylint: disable=W0613
1460 fcb21ad7 Michael Hanselmann
    """Retrieves information about waiting jobs.
1461 fcb21ad7 Michael Hanselmann

1462 fcb21ad7 Michael Hanselmann
    @type requested: set
1463 fcb21ad7 Michael Hanselmann
    @param requested: Requested information, see C{query.LQ_*}
1464 fcb21ad7 Michael Hanselmann

1465 fcb21ad7 Michael Hanselmann
    """
1466 fcb21ad7 Michael Hanselmann
    # No need to sort here, that's being done by the lock manager and query
1467 fcb21ad7 Michael Hanselmann
    # library. There are no priorities for notifying jobs, hence all show up as
1468 fcb21ad7 Michael Hanselmann
    # one item under "pending".
1469 fcb21ad7 Michael Hanselmann
    return [("job/%s" % job_id, None, None,
1470 fcb21ad7 Michael Hanselmann
             [("job", [job.id for job in waiters])])
1471 fcb21ad7 Michael Hanselmann
            for job_id, waiters in self._waiters.items()
1472 fcb21ad7 Michael Hanselmann
            if waiters]
1473 fcb21ad7 Michael Hanselmann
1474 fcb21ad7 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1475 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
1476 b95479a5 Michael Hanselmann
    """Checks if a job is waiting.
1477 b95479a5 Michael Hanselmann

1478 b95479a5 Michael Hanselmann
    """
1479 b95479a5 Michael Hanselmann
    return compat.any(job in jobs
1480 b95479a5 Michael Hanselmann
                      for jobs in self._waiters.values())
1481 b95479a5 Michael Hanselmann
1482 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1483 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1484 b95479a5 Michael Hanselmann
    """Checks if a dependency job has the requested status.
1485 b95479a5 Michael Hanselmann

1486 b95479a5 Michael Hanselmann
    If the other job is not yet in a finalized status, the calling job will be
1487 b95479a5 Michael Hanselmann
    notified (re-added to the workerpool) at a later point.
1488 b95479a5 Michael Hanselmann

1489 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1490 b95479a5 Michael Hanselmann
    @param job: Job object
1491 76b62028 Iustin Pop
    @type dep_job_id: int
1492 b95479a5 Michael Hanselmann
    @param dep_job_id: ID of dependency job
1493 b95479a5 Michael Hanselmann
    @type dep_status: list
1494 b95479a5 Michael Hanselmann
    @param dep_status: Required status
1495 b95479a5 Michael Hanselmann

1496 b95479a5 Michael Hanselmann
    """
1497 76b62028 Iustin Pop
    assert ht.TJobId(job.id)
1498 76b62028 Iustin Pop
    assert ht.TJobId(dep_job_id)
1499 b95479a5 Michael Hanselmann
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1500 b95479a5 Michael Hanselmann
1501 b95479a5 Michael Hanselmann
    if job.id == dep_job_id:
1502 b95479a5 Michael Hanselmann
      return (self.ERROR, "Job can't depend on itself")
1503 b95479a5 Michael Hanselmann
1504 b95479a5 Michael Hanselmann
    # Get status of dependency job
1505 b95479a5 Michael Hanselmann
    try:
1506 b95479a5 Michael Hanselmann
      status = self._getstatus_fn(dep_job_id)
1507 b95479a5 Michael Hanselmann
    except errors.JobLost, err:
1508 b95479a5 Michael Hanselmann
      return (self.ERROR, "Dependency error: %s" % err)
1509 b95479a5 Michael Hanselmann
1510 b95479a5 Michael Hanselmann
    assert status in constants.JOB_STATUS_ALL
1511 b95479a5 Michael Hanselmann
1512 b95479a5 Michael Hanselmann
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1513 b95479a5 Michael Hanselmann
1514 b95479a5 Michael Hanselmann
    if status not in constants.JOBS_FINALIZED:
1515 b95479a5 Michael Hanselmann
      # Register for notification and wait for job to finish
1516 b95479a5 Michael Hanselmann
      job_id_waiters.add(job)
1517 b95479a5 Michael Hanselmann
      return (self.WAIT,
1518 b95479a5 Michael Hanselmann
              "Need to wait for job %s, wanted status '%s'" %
1519 b95479a5 Michael Hanselmann
              (dep_job_id, dep_status))
1520 b95479a5 Michael Hanselmann
1521 b95479a5 Michael Hanselmann
    # Remove from waiters list
1522 b95479a5 Michael Hanselmann
    if job in job_id_waiters:
1523 b95479a5 Michael Hanselmann
      job_id_waiters.remove(job)
1524 b95479a5 Michael Hanselmann
1525 b95479a5 Michael Hanselmann
    if (status == constants.JOB_STATUS_CANCELED and
1526 b95479a5 Michael Hanselmann
        constants.JOB_STATUS_CANCELED not in dep_status):
1527 b95479a5 Michael Hanselmann
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1528 b95479a5 Michael Hanselmann
1529 b95479a5 Michael Hanselmann
    elif not dep_status or status in dep_status:
1530 b95479a5 Michael Hanselmann
      return (self.CONTINUE,
1531 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s'" %
1532 b95479a5 Michael Hanselmann
              (dep_job_id, status))
1533 b95479a5 Michael Hanselmann
1534 b95479a5 Michael Hanselmann
    else:
1535 b95479a5 Michael Hanselmann
      return (self.WRONGSTATUS,
1536 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s',"
1537 b95479a5 Michael Hanselmann
              " not one of '%s' as required" %
1538 b95479a5 Michael Hanselmann
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1539 b95479a5 Michael Hanselmann
1540 37d76f1e Michael Hanselmann
  def _RemoveEmptyWaitersUnlocked(self):
1541 37d76f1e Michael Hanselmann
    """Remove all jobs without actual waiters.
1542 37d76f1e Michael Hanselmann

1543 37d76f1e Michael Hanselmann
    """
1544 37d76f1e Michael Hanselmann
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1545 37d76f1e Michael Hanselmann
                   if not waiters]:
1546 37d76f1e Michael Hanselmann
      del self._waiters[job_id]
1547 37d76f1e Michael Hanselmann
1548 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
1549 b95479a5 Michael Hanselmann
    """Notifies all jobs waiting for a certain job ID.
1550 b95479a5 Michael Hanselmann

1551 1316ebc2 Michael Hanselmann
    @attention: Do not call until L{CheckAndRegister} returned a status other
1552 1316ebc2 Michael Hanselmann
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1553 76b62028 Iustin Pop
    @type job_id: int
1554 b95479a5 Michael Hanselmann
    @param job_id: Job ID
1555 b95479a5 Michael Hanselmann

1556 b95479a5 Michael Hanselmann
    """
1557 76b62028 Iustin Pop
    assert ht.TJobId(job_id)
1558 b95479a5 Michael Hanselmann
1559 37d76f1e Michael Hanselmann
    self._lock.acquire()
1560 37d76f1e Michael Hanselmann
    try:
1561 37d76f1e Michael Hanselmann
      self._RemoveEmptyWaitersUnlocked()
1562 37d76f1e Michael Hanselmann
1563 37d76f1e Michael Hanselmann
      jobs = self._waiters.pop(job_id, None)
1564 37d76f1e Michael Hanselmann
    finally:
1565 37d76f1e Michael Hanselmann
      self._lock.release()
1566 37d76f1e Michael Hanselmann
1567 b95479a5 Michael Hanselmann
    if jobs:
1568 b95479a5 Michael Hanselmann
      # Re-add jobs to workerpool
1569 b95479a5 Michael Hanselmann
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1570 b95479a5 Michael Hanselmann
                    len(jobs), job_id)
1571 b95479a5 Michael Hanselmann
      self._enqueue_fn(jobs)
1572 b95479a5 Michael Hanselmann
1573 b95479a5 Michael Hanselmann
1574 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
1575 6c881c52 Iustin Pop
  """Decorator for "public" functions.
1576 ea03467c Iustin Pop

1577 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
1578 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
1579 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
1580 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
1581 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
1582 ea03467c Iustin Pop

1583 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
1584 f1da30e6 Michael Hanselmann

1585 6c881c52 Iustin Pop
  Example::
1586 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
1587 6c881c52 Iustin Pop
    @_RequireOpenQueue
1588 6c881c52 Iustin Pop
    def Example(self):
1589 6c881c52 Iustin Pop
      pass
1590 db37da70 Michael Hanselmann

1591 6c881c52 Iustin Pop
  """
1592 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
1593 b459a848 Andrea Spadaccini
    # pylint: disable=W0212
1594 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
1595 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
1596 6c881c52 Iustin Pop
  return wrapper
1597 db37da70 Michael Hanselmann
1598 db37da70 Michael Hanselmann
1599 c8d0be94 Michael Hanselmann
def _RequireNonDrainedQueue(fn):
1600 c8d0be94 Michael Hanselmann
  """Decorator checking for a non-drained queue.
1601 c8d0be94 Michael Hanselmann

1602 c8d0be94 Michael Hanselmann
  To be used with functions submitting new jobs.
1603 c8d0be94 Michael Hanselmann

1604 c8d0be94 Michael Hanselmann
  """
1605 c8d0be94 Michael Hanselmann
  def wrapper(self, *args, **kwargs):
1606 c8d0be94 Michael Hanselmann
    """Wrapper function.
1607 c8d0be94 Michael Hanselmann

1608 c8d0be94 Michael Hanselmann
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1609 c8d0be94 Michael Hanselmann

1610 c8d0be94 Michael Hanselmann
    """
1611 c8d0be94 Michael Hanselmann
    # Ok when sharing the big job queue lock, as the drain file is created when
1612 c8d0be94 Michael Hanselmann
    # the lock is exclusive.
1613 c8d0be94 Michael Hanselmann
    # Needs access to protected member, pylint: disable=W0212
1614 c8d0be94 Michael Hanselmann
    if self._drained:
1615 c8d0be94 Michael Hanselmann
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1616 6d5ea385 Michael Hanselmann
1617 6d5ea385 Michael Hanselmann
    if not self._accepting_jobs:
1618 6d5ea385 Michael Hanselmann
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1619 6d5ea385 Michael Hanselmann
1620 c8d0be94 Michael Hanselmann
    return fn(self, *args, **kwargs)
1621 c8d0be94 Michael Hanselmann
  return wrapper
1622 c8d0be94 Michael Hanselmann
1623 c8d0be94 Michael Hanselmann
1624 6c881c52 Iustin Pop
class JobQueue(object):
1625 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
1626 db37da70 Michael Hanselmann

1627 6c881c52 Iustin Pop
  """
1628 85f03e0d Michael Hanselmann
  def __init__(self, context):
1629 ea03467c Iustin Pop
    """Constructor for JobQueue.
1630 ea03467c Iustin Pop

1631 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
1632 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
1633 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
1634 ea03467c Iustin Pop
    running).
1635 ea03467c Iustin Pop

1636 ea03467c Iustin Pop
    @type context: GanetiContext
1637 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
1638 ea03467c Iustin Pop
        data and other ganeti objects
1639 ea03467c Iustin Pop

1640 ea03467c Iustin Pop
    """
1641 5bdce580 Michael Hanselmann
    self.context = context
1642 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
1643 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
1644 f1da30e6 Michael Hanselmann
1645 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
1646 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
1647 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
1648 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
1649 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
1650 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
1651 ebb80afa Guido Trotter
1652 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
1653 ebb80afa Guido Trotter
    self.release = self._lock.release
1654 85f03e0d Michael Hanselmann
1655 6d5ea385 Michael Hanselmann
    # Accept jobs by default
1656 6d5ea385 Michael Hanselmann
    self._accepting_jobs = True
1657 6d5ea385 Michael Hanselmann
1658 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
1659 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
1660 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1661 f1da30e6 Michael Hanselmann
1662 04ab05ce Michael Hanselmann
    # Read serial file
1663 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
1664 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
1665 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
1666 c4beba1c Iustin Pop
1667 23752136 Michael Hanselmann
    # Get initial list of nodes
1668 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
1669 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
1670 59303563 Iustin Pop
                       if n.master_candidate)
1671 8e00939c Michael Hanselmann
1672 8e00939c Michael Hanselmann
    # Remove master node
1673 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
1674 23752136 Michael Hanselmann
1675 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
1676 23752136 Michael Hanselmann
1677 6d5ea385 Michael Hanselmann
    self._queue_size = None
1678 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1679 6d5ea385 Michael Hanselmann
    assert ht.TInt(self._queue_size)
1680 ff699aa9 Michael Hanselmann
    self._drained = jstore.CheckDrainFlag()
1681 20571a26 Guido Trotter
1682 b95479a5 Michael Hanselmann
    # Job dependencies
1683 b95479a5 Michael Hanselmann
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1684 b95479a5 Michael Hanselmann
                                        self._EnqueueJobs)
1685 fcb21ad7 Michael Hanselmann
    self.context.glm.AddToLockMonitor(self.depmgr)
1686 b95479a5 Michael Hanselmann
1687 85f03e0d Michael Hanselmann
    # Setup worker pool
1688 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
1689 85f03e0d Michael Hanselmann
    try:
1690 de9d02c7 Michael Hanselmann
      self._InspectQueue()
1691 de9d02c7 Michael Hanselmann
    except:
1692 de9d02c7 Michael Hanselmann
      self._wpool.TerminateWorkers()
1693 de9d02c7 Michael Hanselmann
      raise
1694 711b5124 Michael Hanselmann
1695 de9d02c7 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1696 de9d02c7 Michael Hanselmann
  @_RequireOpenQueue
1697 de9d02c7 Michael Hanselmann
  def _InspectQueue(self):
1698 de9d02c7 Michael Hanselmann
    """Loads the whole job queue and resumes unfinished jobs.
1699 de9d02c7 Michael Hanselmann

1700 de9d02c7 Michael Hanselmann
    This function needs the lock here because WorkerPool.AddTask() may start a
1701 de9d02c7 Michael Hanselmann
    job while we're still doing our work.
1702 711b5124 Michael Hanselmann

1703 de9d02c7 Michael Hanselmann
    """
1704 de9d02c7 Michael Hanselmann
    logging.info("Inspecting job queue")
1705 de9d02c7 Michael Hanselmann
1706 7b5c4a69 Michael Hanselmann
    restartjobs = []
1707 7b5c4a69 Michael Hanselmann
1708 de9d02c7 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked()
1709 de9d02c7 Michael Hanselmann
    jobs_count = len(all_job_ids)
1710 de9d02c7 Michael Hanselmann
    lastinfo = time.time()
1711 de9d02c7 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1712 de9d02c7 Michael Hanselmann
      # Give an update every 1000 jobs or 10 seconds
1713 de9d02c7 Michael Hanselmann
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1714 de9d02c7 Michael Hanselmann
          idx == (jobs_count - 1)):
1715 de9d02c7 Michael Hanselmann
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1716 de9d02c7 Michael Hanselmann
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1717 711b5124 Michael Hanselmann
        lastinfo = time.time()
1718 94ed59a5 Iustin Pop
1719 de9d02c7 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1720 85f03e0d Michael Hanselmann
1721 de9d02c7 Michael Hanselmann
      # a failure in loading the job can cause 'None' to be returned
1722 de9d02c7 Michael Hanselmann
      if job is None:
1723 de9d02c7 Michael Hanselmann
        continue
1724 85f03e0d Michael Hanselmann
1725 de9d02c7 Michael Hanselmann
      status = job.CalcStatus()
1726 711b5124 Michael Hanselmann
1727 320d1daf Michael Hanselmann
      if status == constants.JOB_STATUS_QUEUED:
1728 7b5c4a69 Michael Hanselmann
        restartjobs.append(job)
1729 de9d02c7 Michael Hanselmann
1730 de9d02c7 Michael Hanselmann
      elif status in (constants.JOB_STATUS_RUNNING,
1731 47099cd1 Michael Hanselmann
                      constants.JOB_STATUS_WAITING,
1732 de9d02c7 Michael Hanselmann
                      constants.JOB_STATUS_CANCELING):
1733 de9d02c7 Michael Hanselmann
        logging.warning("Unfinished job %s found: %s", job.id, job)
1734 320d1daf Michael Hanselmann
1735 47099cd1 Michael Hanselmann
        if status == constants.JOB_STATUS_WAITING:
1736 320d1daf Michael Hanselmann
          # Restart job
1737 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1738 320d1daf Michael Hanselmann
          restartjobs.append(job)
1739 320d1daf Michael Hanselmann
        else:
1740 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1741 320d1daf Michael Hanselmann
                                "Unclean master daemon shutdown")
1742 45df0793 Michael Hanselmann
          job.Finalize()
1743 320d1daf Michael Hanselmann
1744 de9d02c7 Michael Hanselmann
        self.UpdateJobUnlocked(job)
1745 de9d02c7 Michael Hanselmann
1746 7b5c4a69 Michael Hanselmann
    if restartjobs:
1747 7b5c4a69 Michael Hanselmann
      logging.info("Restarting %s jobs", len(restartjobs))
1748 75d81fc8 Michael Hanselmann
      self._EnqueueJobsUnlocked(restartjobs)
1749 7b5c4a69 Michael Hanselmann
1750 de9d02c7 Michael Hanselmann
    logging.info("Job queue inspection finished")
1751 85f03e0d Michael Hanselmann
1752 fb1ffbca Michael Hanselmann
  def _GetRpc(self, address_list):
1753 fb1ffbca Michael Hanselmann
    """Gets RPC runner with context.
1754 fb1ffbca Michael Hanselmann

1755 fb1ffbca Michael Hanselmann
    """
1756 fb1ffbca Michael Hanselmann
    return rpc.JobQueueRunner(self.context, address_list)
1757 fb1ffbca Michael Hanselmann
1758 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1759 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1760 99aabbed Iustin Pop
  def AddNode(self, node):
1761 99aabbed Iustin Pop
    """Register a new node with the queue.
1762 99aabbed Iustin Pop

1763 99aabbed Iustin Pop
    @type node: L{objects.Node}
1764 99aabbed Iustin Pop
    @param node: the node object to be added
1765 99aabbed Iustin Pop

1766 99aabbed Iustin Pop
    """
1767 99aabbed Iustin Pop
    node_name = node.name
1768 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1769 23752136 Michael Hanselmann
1770 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1771 fb1ffbca Michael Hanselmann
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1772 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1773 c8457ce7 Iustin Pop
    if msg:
1774 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1775 c8457ce7 Iustin Pop
                      node_name, msg)
1776 23752136 Michael Hanselmann
1777 59303563 Iustin Pop
    if not node.master_candidate:
1778 59303563 Iustin Pop
      # remove if existing, ignoring errors
1779 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1780 59303563 Iustin Pop
      # and skip the replication of the job ids
1781 59303563 Iustin Pop
      return
1782 59303563 Iustin Pop
1783 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1784 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1785 23752136 Michael Hanselmann
1786 d2e03a33 Michael Hanselmann
    # Upload current serial file
1787 e2b4a7ba Michael Hanselmann
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1788 d2e03a33 Michael Hanselmann
1789 fb1ffbca Michael Hanselmann
    # Static address list
1790 fb1ffbca Michael Hanselmann
    addrs = [node.primary_ip]
1791 fb1ffbca Michael Hanselmann
1792 d2e03a33 Michael Hanselmann
    for file_name in files:
1793 9f774ee8 Michael Hanselmann
      # Read file content
1794 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1795 9f774ee8 Michael Hanselmann
1796 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1797 cffbbae7 Michael Hanselmann
                             file_name, content)
1798 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1799 c8457ce7 Iustin Pop
      if msg:
1800 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1801 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1802 d2e03a33 Michael Hanselmann
1803 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1804 d2e03a33 Michael Hanselmann
1805 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1806 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1807 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1808 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1809 ea03467c Iustin Pop

1810 ea03467c Iustin Pop
    @type node_name: str
1811 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1812 ea03467c Iustin Pop

1813 ea03467c Iustin Pop
    """
1814 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1815 23752136 Michael Hanselmann
1816 7e950d31 Iustin Pop
  @staticmethod
1817 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1818 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1819 ea03467c Iustin Pop

1820 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1821 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1822 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1823 ea03467c Iustin Pop

1824 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1825 ea03467c Iustin Pop
    @type nodes: list
1826 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1827 ea03467c Iustin Pop
    @type failmsg: str
1828 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1829 ea03467c Iustin Pop

1830 ea03467c Iustin Pop
    """
1831 e74798c1 Michael Hanselmann
    failed = []
1832 e74798c1 Michael Hanselmann
    success = []
1833 e74798c1 Michael Hanselmann
1834 e74798c1 Michael Hanselmann
    for node in nodes:
1835 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1836 c8457ce7 Iustin Pop
      if msg:
1837 e74798c1 Michael Hanselmann
        failed.append(node)
1838 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1839 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1840 c8457ce7 Iustin Pop
      else:
1841 c8457ce7 Iustin Pop
        success.append(node)
1842 e74798c1 Michael Hanselmann
1843 e74798c1 Michael Hanselmann
    # +1 for the master node
1844 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1845 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1846 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1847 e74798c1 Michael Hanselmann
1848 99aabbed Iustin Pop
  def _GetNodeIp(self):
1849 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1850 99aabbed Iustin Pop

1851 ea03467c Iustin Pop
    @rtype: (list, list)
1852 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1853 ea03467c Iustin Pop
        names and the second one with the node addresses
1854 ea03467c Iustin Pop

1855 99aabbed Iustin Pop
    """
1856 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1857 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1858 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1859 99aabbed Iustin Pop
    return name_list, addr_list
1860 99aabbed Iustin Pop
1861 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1862 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1863 8e00939c Michael Hanselmann

1864 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1865 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1866 ea03467c Iustin Pop

1867 ea03467c Iustin Pop
    @type file_name: str
1868 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1869 ea03467c Iustin Pop
    @type data: str
1870 ea03467c Iustin Pop
    @param data: the new contents of the file
1871 4c36bdf5 Guido Trotter
    @type replicate: boolean
1872 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1873 ea03467c Iustin Pop

1874 8e00939c Michael Hanselmann
    """
1875 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1876 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1877 82b22e19 René Nussbaumer
                    gid=getents.masterd_gid)
1878 8e00939c Michael Hanselmann
1879 4c36bdf5 Guido Trotter
    if replicate:
1880 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1881 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1882 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1883 23752136 Michael Hanselmann
1884 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1885 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1886 ea03467c Iustin Pop

1887 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1888 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1889 ea03467c Iustin Pop

1890 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1891 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1892 ea03467c Iustin Pop

1893 ea03467c Iustin Pop
    """
1894 dd875d32 Michael Hanselmann
    # Rename them locally
1895 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1896 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1897 abc1f2ce Michael Hanselmann
1898 dd875d32 Michael Hanselmann
    # ... and on all nodes
1899 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1900 fb1ffbca Michael Hanselmann
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1901 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1902 abc1f2ce Michael Hanselmann
1903 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1904 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1905 f1da30e6 Michael Hanselmann

1906 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1907 f1da30e6 Michael Hanselmann

1908 009e73d0 Iustin Pop
    @type count: integer
1909 009e73d0 Iustin Pop
    @param count: how many serials to return
1910 76b62028 Iustin Pop
    @rtype: list of int
1911 76b62028 Iustin Pop
    @return: a list of job identifiers.
1912 f1da30e6 Michael Hanselmann

1913 f1da30e6 Michael Hanselmann
    """
1914 2c9fa1ff Iustin Pop
    assert ht.TNonNegativeInt(count)
1915 719f8fba Michael Hanselmann
1916 f1da30e6 Michael Hanselmann
    # New number
1917 009e73d0 Iustin Pop
    serial = self._last_serial + count
1918 f1da30e6 Michael Hanselmann
1919 f1da30e6 Michael Hanselmann
    # Write to file
1920 e2b4a7ba Michael Hanselmann
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1921 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1922 f1da30e6 Michael Hanselmann
1923 1410a389 Michael Hanselmann
    result = [jstore.FormatJobID(v)
1924 3c88bf36 Michael Hanselmann
              for v in range(self._last_serial + 1, serial + 1)]
1925 3c88bf36 Michael Hanselmann
1926 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1927 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1928 f1da30e6 Michael Hanselmann
1929 3c88bf36 Michael Hanselmann
    assert len(result) == count
1930 3c88bf36 Michael Hanselmann
1931 009e73d0 Iustin Pop
    return result
1932 f1da30e6 Michael Hanselmann
1933 85f03e0d Michael Hanselmann
  @staticmethod
1934 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1935 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1936 ea03467c Iustin Pop

1937 ea03467c Iustin Pop
    @type job_id: str
1938 ea03467c Iustin Pop
    @param job_id: the job identifier
1939 ea03467c Iustin Pop
    @rtype: str
1940 ea03467c Iustin Pop
    @return: the path to the job file
1941 ea03467c Iustin Pop

1942 ea03467c Iustin Pop
    """
1943 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1944 f1da30e6 Michael Hanselmann
1945 1410a389 Michael Hanselmann
  @staticmethod
1946 1410a389 Michael Hanselmann
  def _GetArchivedJobPath(job_id):
1947 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1948 ea03467c Iustin Pop

1949 ea03467c Iustin Pop
    @type job_id: str
1950 ea03467c Iustin Pop
    @param job_id: the job identifier
1951 ea03467c Iustin Pop
    @rtype: str
1952 ea03467c Iustin Pop
    @return: the path to the archived job file
1953 ea03467c Iustin Pop

1954 ea03467c Iustin Pop
    """
1955 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1956 1410a389 Michael Hanselmann
                          jstore.GetArchiveDirectory(job_id),
1957 1410a389 Michael Hanselmann
                          "job-%s" % job_id)
1958 0cb94105 Michael Hanselmann
1959 cb66225d Michael Hanselmann
  @staticmethod
1960 0422250e Michael Hanselmann
  def _DetermineJobDirectories(archived):
1961 bb921668 Michael Hanselmann
    """Build list of directories containing job files.
1962 bb921668 Michael Hanselmann

1963 bb921668 Michael Hanselmann
    @type archived: bool
1964 bb921668 Michael Hanselmann
    @param archived: Whether to include directories for archived jobs
1965 bb921668 Michael Hanselmann
    @rtype: list
1966 bb921668 Michael Hanselmann

1967 bb921668 Michael Hanselmann
    """
1968 0422250e Michael Hanselmann
    result = [pathutils.QUEUE_DIR]
1969 0422250e Michael Hanselmann
1970 0422250e Michael Hanselmann
    if archived:
1971 0422250e Michael Hanselmann
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1972 0422250e Michael Hanselmann
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1973 0422250e Michael Hanselmann
                        utils.ListVisibleFiles(archive_path)))
1974 0422250e Michael Hanselmann
1975 0422250e Michael Hanselmann
    return result
1976 0422250e Michael Hanselmann
1977 0422250e Michael Hanselmann
  @classmethod
1978 0422250e Michael Hanselmann
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1979 911a495b Iustin Pop
    """Return all known job IDs.
1980 911a495b Iustin Pop

1981 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1982 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1983 ac0930b9 Iustin Pop
    extra IDs).
1984 ac0930b9 Iustin Pop

1985 85a1c57d Guido Trotter
    @type sort: boolean
1986 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1987 ea03467c Iustin Pop
    @rtype: list
1988 ea03467c Iustin Pop
    @return: the list of job IDs
1989 ea03467c Iustin Pop

1990 911a495b Iustin Pop
    """
1991 85a1c57d Guido Trotter
    jlist = []
1992 0422250e Michael Hanselmann
1993 0422250e Michael Hanselmann
    for path in cls._DetermineJobDirectories(archived):
1994 0422250e Michael Hanselmann
      for filename in utils.ListVisibleFiles(path):
1995 0422250e Michael Hanselmann
        m = constants.JOB_FILE_RE.match(filename)
1996 0422250e Michael Hanselmann
        if m:
1997 0422250e Michael Hanselmann
          jlist.append(int(m.group(1)))
1998 0422250e Michael Hanselmann
1999 85a1c57d Guido Trotter
    if sort:
2000 76b62028 Iustin Pop
      jlist.sort()
2001 f0d874fe Iustin Pop
    return jlist
2002 911a495b Iustin Pop
2003 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
2004 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
2005 ea03467c Iustin Pop

2006 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
2007 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
2008 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
2009 ea03467c Iustin Pop

2010 76b62028 Iustin Pop
    @type job_id: int
2011 ea03467c Iustin Pop
    @param job_id: the job id
2012 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
2013 ea03467c Iustin Pop
    @return: either None or the job object
2014 ea03467c Iustin Pop

2015 ea03467c Iustin Pop
    """
2016 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
2017 5685c1a5 Michael Hanselmann
    if job:
2018 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
2019 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Found read-only job in memcache"
2020 5685c1a5 Michael Hanselmann
      return job
2021 ac0930b9 Iustin Pop
2022 3d6c5566 Guido Trotter
    try:
2023 194c8ca4 Michael Hanselmann
      job = self._LoadJobFromDisk(job_id, False)
2024 aa9f8167 Iustin Pop
      if job is None:
2025 aa9f8167 Iustin Pop
        return job
2026 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
2027 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
2028 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
2029 3d6c5566 Guido Trotter
      if old_path == new_path:
2030 3d6c5566 Guido Trotter
        # job already archived (future case)
2031 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
2032 3d6c5566 Guido Trotter
      else:
2033 3d6c5566 Guido Trotter
        # non-archived case
2034 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
2035 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
2036 3d6c5566 Guido Trotter
      return None
2037 162c8636 Guido Trotter
2038 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Job just loaded is not writable"
2039 c0f6d0d8 Michael Hanselmann
2040 162c8636 Guido Trotter
    self._memcache[job_id] = job
2041 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
2042 162c8636 Guido Trotter
    return job
2043 162c8636 Guido Trotter
2044 c0f6d0d8 Michael Hanselmann
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2045 162c8636 Guido Trotter
    """Load the given job file from disk.
2046 162c8636 Guido Trotter

2047 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2048 162c8636 Guido Trotter

2049 76b62028 Iustin Pop
    @type job_id: int
2050 162c8636 Guido Trotter
    @param job_id: job identifier
2051 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2052 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2053 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
2054 162c8636 Guido Trotter
    @return: either None or the job object
2055 162c8636 Guido Trotter

2056 162c8636 Guido Trotter
    """
2057 8a3cd185 Michael Hanselmann
    path_functions = [(self._GetJobPath, False)]
2058 194c8ca4 Michael Hanselmann
2059 194c8ca4 Michael Hanselmann
    if try_archived:
2060 8a3cd185 Michael Hanselmann
      path_functions.append((self._GetArchivedJobPath, True))
2061 194c8ca4 Michael Hanselmann
2062 194c8ca4 Michael Hanselmann
    raw_data = None
2063 8a3cd185 Michael Hanselmann
    archived = None
2064 194c8ca4 Michael Hanselmann
2065 8a3cd185 Michael Hanselmann
    for (fn, archived) in path_functions:
2066 194c8ca4 Michael Hanselmann
      filepath = fn(job_id)
2067 194c8ca4 Michael Hanselmann
      logging.debug("Loading job from %s", filepath)
2068 194c8ca4 Michael Hanselmann
      try:
2069 194c8ca4 Michael Hanselmann
        raw_data = utils.ReadFile(filepath)
2070 194c8ca4 Michael Hanselmann
      except EnvironmentError, err:
2071 194c8ca4 Michael Hanselmann
        if err.errno != errno.ENOENT:
2072 194c8ca4 Michael Hanselmann
          raise
2073 194c8ca4 Michael Hanselmann
      else:
2074 194c8ca4 Michael Hanselmann
        break
2075 194c8ca4 Michael Hanselmann
2076 194c8ca4 Michael Hanselmann
    if not raw_data:
2077 194c8ca4 Michael Hanselmann
      return None
2078 13998ef2 Michael Hanselmann
2079 c0f6d0d8 Michael Hanselmann
    if writable is None:
2080 8a3cd185 Michael Hanselmann
      writable = not archived
2081 c0f6d0d8 Michael Hanselmann
2082 94ed59a5 Iustin Pop
    try:
2083 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
2084 8a3cd185 Michael Hanselmann
      job = _QueuedJob.Restore(self, data, writable, archived)
2085 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
2086 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
2087 94ed59a5 Iustin Pop
2088 ac0930b9 Iustin Pop
    return job
2089 f1da30e6 Michael Hanselmann
2090 c0f6d0d8 Michael Hanselmann
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2091 0f9c08dc Guido Trotter
    """Load the given job file from disk.
2092 0f9c08dc Guido Trotter

2093 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2094 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
2095 0f9c08dc Guido Trotter
    exception is logged.
2096 0f9c08dc Guido Trotter

2097 76b62028 Iustin Pop
    @type job_id: int
2098 0f9c08dc Guido Trotter
    @param job_id: job identifier
2099 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2100 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2101 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
2102 0f9c08dc Guido Trotter
    @return: either None or the job object
2103 0f9c08dc Guido Trotter

2104 0f9c08dc Guido Trotter
    """
2105 0f9c08dc Guido Trotter
    try:
2106 c0f6d0d8 Michael Hanselmann
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2107 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
2108 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
2109 0f9c08dc Guido Trotter
      return None
2110 0f9c08dc Guido Trotter
2111 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
2112 20571a26 Guido Trotter
    """Update the queue size.
2113 20571a26 Guido Trotter

2114 20571a26 Guido Trotter
    """
2115 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2116 20571a26 Guido Trotter
2117 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2118 20571a26 Guido Trotter
  @_RequireOpenQueue
2119 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
2120 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
2121 3ccafd0e Iustin Pop

2122 ea03467c Iustin Pop
    @type drain_flag: boolean
2123 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
2124 ea03467c Iustin Pop

2125 3ccafd0e Iustin Pop
    """
2126 ff699aa9 Michael Hanselmann
    jstore.SetDrainFlag(drain_flag)
2127 20571a26 Guido Trotter
2128 20571a26 Guido Trotter
    self._drained = drain_flag
2129 20571a26 Guido Trotter
2130 3ccafd0e Iustin Pop
    return True
2131 3ccafd0e Iustin Pop
2132 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2133 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
2134 85f03e0d Michael Hanselmann
    """Create and store a new job.
2135 f1da30e6 Michael Hanselmann

2136 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
2137 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
2138 c3f0a12f Iustin Pop

2139 009e73d0 Iustin Pop
    @type job_id: job ID
2140 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
2141 c3f0a12f Iustin Pop
    @type ops: list
2142 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
2143 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
2144 7beb1e53 Guido Trotter
    @return: the job object to be queued
2145 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2146 e71c8147 Michael Hanselmann
    @raise errors.GenericError: If an opcode is not valid
2147 c3f0a12f Iustin Pop

2148 c3f0a12f Iustin Pop
    """
2149 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2150 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
2151 f87b405e Michael Hanselmann
2152 c0f6d0d8 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops, True)
2153 f1da30e6 Michael Hanselmann
2154 e71c8147 Michael Hanselmann
    for idx, op in enumerate(job.ops):
2155 42d49574 Michael Hanselmann
      # Check priority
2156 e71c8147 Michael Hanselmann
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2157 e71c8147 Michael Hanselmann
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2158 e71c8147 Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2159 e71c8147 Michael Hanselmann
                                  " are %s" % (idx, op.priority, allowed))
2160 e71c8147 Michael Hanselmann
2161 42d49574 Michael Hanselmann
      # Check job dependencies
2162 b247c6fc Michael Hanselmann
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2163 b247c6fc Michael Hanselmann
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2164 b247c6fc Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2165 b247c6fc Michael Hanselmann
                                  " match %s: %s" %
2166 b247c6fc Michael Hanselmann
                                  (idx, opcodes.TNoRelativeJobDependencies,
2167 b247c6fc Michael Hanselmann
                                   dependencies))
2168 b247c6fc Michael Hanselmann
2169 f1da30e6 Michael Hanselmann
    # Write to disk
2170 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
2171 f1da30e6 Michael Hanselmann
2172 20571a26 Guido Trotter
    self._queue_size += 1
2173 20571a26 Guido Trotter
2174 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
2175 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
2176 ac0930b9 Iustin Pop
2177 7beb1e53 Guido Trotter
    return job
2178 f1da30e6 Michael Hanselmann
2179 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2180 2971c913 Iustin Pop
  @_RequireOpenQueue
2181 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2182 2971c913 Iustin Pop
  def SubmitJob(self, ops):
2183 2971c913 Iustin Pop
    """Create and store a new job.
2184 2971c913 Iustin Pop

2185 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2186 2971c913 Iustin Pop

2187 2971c913 Iustin Pop
    """
2188 b247c6fc Michael Hanselmann
    (job_id, ) = self._NewSerialsUnlocked(1)
2189 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2190 7beb1e53 Guido Trotter
    return job_id
2191 2971c913 Iustin Pop
2192 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2193 2971c913 Iustin Pop
  @_RequireOpenQueue
2194 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2195 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
2196 2971c913 Iustin Pop
    """Create and store multiple jobs.
2197 2971c913 Iustin Pop

2198 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2199 2971c913 Iustin Pop

2200 2971c913 Iustin Pop
    """
2201 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2202 b247c6fc Michael Hanselmann
2203 b247c6fc Michael Hanselmann
    (results, added_jobs) = \
2204 b247c6fc Michael Hanselmann
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2205 7b5c4a69 Michael Hanselmann
2206 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked(added_jobs)
2207 2971c913 Iustin Pop
2208 2971c913 Iustin Pop
    return results
2209 2971c913 Iustin Pop
2210 b247c6fc Michael Hanselmann
  @staticmethod
2211 b247c6fc Michael Hanselmann
  def _FormatSubmitError(msg, ops):
2212 b247c6fc Michael Hanselmann
    """Formats errors which occurred while submitting a job.
2213 b247c6fc Michael Hanselmann

2214 b247c6fc Michael Hanselmann
    """
2215 b247c6fc Michael Hanselmann
    return ("%s; opcodes %s" %
2216 b247c6fc Michael Hanselmann
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2217 b247c6fc Michael Hanselmann
2218 b247c6fc Michael Hanselmann
  @staticmethod
2219 b247c6fc Michael Hanselmann
  def _ResolveJobDependencies(resolve_fn, deps):
2220 b247c6fc Michael Hanselmann
    """Resolves relative job IDs in dependencies.
2221 b247c6fc Michael Hanselmann

2222 b247c6fc Michael Hanselmann
    @type resolve_fn: callable
2223 b247c6fc Michael Hanselmann
    @param resolve_fn: Function to resolve a relative job ID
2224 b247c6fc Michael Hanselmann
    @type deps: list
2225 b247c6fc Michael Hanselmann
    @param deps: Dependencies
2226 4c27b231 Michael Hanselmann
    @rtype: tuple; (boolean, string or list)
2227 4c27b231 Michael Hanselmann
    @return: If successful (first tuple item), the returned list contains
2228 4c27b231 Michael Hanselmann
      resolved job IDs along with the requested status; if not successful,
2229 4c27b231 Michael Hanselmann
      the second element is an error message
2230 b247c6fc Michael Hanselmann

2231 b247c6fc Michael Hanselmann
    """
2232 b247c6fc Michael Hanselmann
    result = []
2233 b247c6fc Michael Hanselmann
2234 b247c6fc Michael Hanselmann
    for (dep_job_id, dep_status) in deps:
2235 b247c6fc Michael Hanselmann
      if ht.TRelativeJobId(dep_job_id):
2236 b247c6fc Michael Hanselmann
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2237 b247c6fc Michael Hanselmann
        try:
2238 b247c6fc Michael Hanselmann
          job_id = resolve_fn(dep_job_id)
2239 b247c6fc Michael Hanselmann
        except IndexError:
2240 b247c6fc Michael Hanselmann
          # Abort
2241 b247c6fc Michael Hanselmann
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2242 b247c6fc Michael Hanselmann
      else:
2243 b247c6fc Michael Hanselmann
        job_id = dep_job_id
2244 b247c6fc Michael Hanselmann
2245 b247c6fc Michael Hanselmann
      result.append((job_id, dep_status))
2246 b247c6fc Michael Hanselmann
2247 b247c6fc Michael Hanselmann
    return (True, result)
2248 b247c6fc Michael Hanselmann
2249 b247c6fc Michael Hanselmann
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2250 b247c6fc Michael Hanselmann
    """Create and store multiple jobs.
2251 b247c6fc Michael Hanselmann

2252 b247c6fc Michael Hanselmann
    @see: L{_SubmitJobUnlocked}
2253 b247c6fc Michael Hanselmann

2254 b247c6fc Michael Hanselmann
    """
2255 b247c6fc Michael Hanselmann
    results = []
2256 b247c6fc Michael Hanselmann
    added_jobs = []
2257 b247c6fc Michael Hanselmann
2258 b247c6fc Michael Hanselmann
    def resolve_fn(job_idx, reljobid):
2259 b247c6fc Michael Hanselmann
      assert reljobid < 0
2260 b247c6fc Michael Hanselmann
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2261 b247c6fc Michael Hanselmann
2262 b247c6fc Michael Hanselmann
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2263 b247c6fc Michael Hanselmann
      for op in ops:
2264 b247c6fc Michael Hanselmann
        if getattr(op, opcodes.DEPEND_ATTR, None):
2265 b247c6fc Michael Hanselmann
          (status, data) = \
2266 b247c6fc Michael Hanselmann
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2267 b247c6fc Michael Hanselmann
                                         op.depends)
2268 b247c6fc Michael Hanselmann
          if not status:
2269 b247c6fc Michael Hanselmann
            # Abort resolving dependencies
2270 b247c6fc Michael Hanselmann
            assert ht.TNonEmptyString(data), "No error message"
2271 b247c6fc Michael Hanselmann
            break
2272 b247c6fc Michael Hanselmann
          # Use resolved dependencies
2273 b247c6fc Michael Hanselmann
          op.depends = data
2274 b247c6fc Michael Hanselmann
      else:
2275 b247c6fc Michael Hanselmann
        try:
2276 b247c6fc Michael Hanselmann
          job = self._SubmitJobUnlocked(job_id, ops)
2277 b247c6fc Michael Hanselmann
        except errors.GenericError, err:
2278 b247c6fc Michael Hanselmann
          status = False
2279 b247c6fc Michael Hanselmann
          data = self._FormatSubmitError(str(err), ops)
2280 b247c6fc Michael Hanselmann
        else:
2281 b247c6fc Michael Hanselmann
          status = True
2282 b247c6fc Michael Hanselmann
          data = job_id
2283 b247c6fc Michael Hanselmann
          added_jobs.append(job)
2284 b247c6fc Michael Hanselmann
2285 b247c6fc Michael Hanselmann
      results.append((status, data))
2286 b247c6fc Michael Hanselmann
2287 b247c6fc Michael Hanselmann
    return (results, added_jobs)
2288 b247c6fc Michael Hanselmann
2289 75d81fc8 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2290 7b5c4a69 Michael Hanselmann
  def _EnqueueJobs(self, jobs):
2291 7b5c4a69 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2292 7b5c4a69 Michael Hanselmann

2293 7b5c4a69 Michael Hanselmann
    @type jobs: list
2294 7b5c4a69 Michael Hanselmann
    @param jobs: List of all jobs
2295 7b5c4a69 Michael Hanselmann

2296 7b5c4a69 Michael Hanselmann
    """
2297 75d81fc8 Michael Hanselmann
    return self._EnqueueJobsUnlocked(jobs)
2298 75d81fc8 Michael Hanselmann
2299 75d81fc8 Michael Hanselmann
  def _EnqueueJobsUnlocked(self, jobs):
2300 75d81fc8 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2301 75d81fc8 Michael Hanselmann

2302 75d81fc8 Michael Hanselmann
    @type jobs: list
2303 75d81fc8 Michael Hanselmann
    @param jobs: List of all jobs
2304 75d81fc8 Michael Hanselmann

2305 75d81fc8 Michael Hanselmann
    """
2306 75d81fc8 Michael Hanselmann
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2307 7b5c4a69 Michael Hanselmann
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2308 99fb250b Michael Hanselmann
                             priority=[job.CalcPriority() for job in jobs],
2309 99fb250b Michael Hanselmann
                             task_id=map(_GetIdAttr, jobs))
2310 7b5c4a69 Michael Hanselmann
2311 b95479a5 Michael Hanselmann
  def _GetJobStatusForDependencies(self, job_id):
2312 b95479a5 Michael Hanselmann
    """Gets the status of a job for dependencies.
2313 b95479a5 Michael Hanselmann

2314 76b62028 Iustin Pop
    @type job_id: int
2315 b95479a5 Michael Hanselmann
    @param job_id: Job ID
2316 b95479a5 Michael Hanselmann
    @raise errors.JobLost: If job can't be found
2317 b95479a5 Michael Hanselmann

2318 b95479a5 Michael Hanselmann
    """
2319 b95479a5 Michael Hanselmann
    # Not using in-memory cache as doing so would require an exclusive lock
2320 b95479a5 Michael Hanselmann
2321 b95479a5 Michael Hanselmann
    # Try to load from disk
2322 c0f6d0d8 Michael Hanselmann
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2323 c0f6d0d8 Michael Hanselmann
2324 17385bd2 Andrea Spadaccini
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2325 b95479a5 Michael Hanselmann
2326 b95479a5 Michael Hanselmann
    if job:
2327 b95479a5 Michael Hanselmann
      return job.CalcStatus()
2328 b95479a5 Michael Hanselmann
2329 b95479a5 Michael Hanselmann
    raise errors.JobLost("Job %s not found" % job_id)
2330 b95479a5 Michael Hanselmann
2331 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2332 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
2333 ea03467c Iustin Pop
    """Update a job's on disk storage.
2334 ea03467c Iustin Pop

2335 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
2336 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
2337 ea03467c Iustin Pop
    nodes.
2338 ea03467c Iustin Pop

2339 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
2340 ea03467c Iustin Pop
    @param job: the changed job
2341 4c36bdf5 Guido Trotter
    @type replicate: boolean
2342 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
2343 ea03467c Iustin Pop

2344 ea03467c Iustin Pop
    """
2345 66bd7445 Michael Hanselmann
    if __debug__:
2346 66bd7445 Michael Hanselmann
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2347 66bd7445 Michael Hanselmann
      assert (finalized ^ (job.end_timestamp is None))
2348 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't update read-only job"
2349 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't update archived job"
2350 66bd7445 Michael Hanselmann
2351 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
2352 a182a3ed Michael Hanselmann
    data = serializer.DumpJson(job.Serialize())
2353 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
2354 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
2355 ac0930b9 Iustin Pop
2356 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2357 5c735209 Iustin Pop
                        timeout):
2358 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
2359 6c5a7090 Michael Hanselmann

2360 76b62028 Iustin Pop
    @type job_id: int
2361 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
2362 6c5a7090 Michael Hanselmann
    @type fields: list of strings
2363 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
2364 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
2365 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
2366 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
2367 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
2368 5c735209 Iustin Pop
    @type timeout: float
2369 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
2370 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
2371 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
2372 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
2373 ea03467c Iustin Pop

2374 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
2375 ea03467c Iustin Pop
        we instead return a special value,
2376 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2377 ea03467c Iustin Pop
        as such by the clients
2378 6c5a7090 Michael Hanselmann

2379 6c5a7090 Michael Hanselmann
    """
2380 04569469 Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2381 c0f6d0d8 Michael Hanselmann
                             writable=False)
2382 989a8bee Michael Hanselmann
2383 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
2384 989a8bee Michael Hanselmann
2385 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
2386 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
2387 dfe57c22 Michael Hanselmann
2388 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2389 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2390 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
2391 188c5e0a Michael Hanselmann
    """Cancels a job.
2392 188c5e0a Michael Hanselmann

2393 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
2394 ea03467c Iustin Pop

2395 76b62028 Iustin Pop
    @type job_id: int
2396 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
2397 188c5e0a Michael Hanselmann

2398 188c5e0a Michael Hanselmann
    """
2399 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
2400 188c5e0a Michael Hanselmann
2401 aebd0e4e Michael Hanselmann
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2402 aebd0e4e Michael Hanselmann
2403 4679547e Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2404 4679547e Michael Hanselmann
  @_RequireOpenQueue
2405 4679547e Michael Hanselmann
  def ChangeJobPriority(self, job_id, priority):
2406 4679547e Michael Hanselmann
    """Changes a job's priority.
2407 4679547e Michael Hanselmann

2408 4679547e Michael Hanselmann
    @type job_id: int
2409 4679547e Michael Hanselmann
    @param job_id: ID of the job whose priority should be changed
2410 4679547e Michael Hanselmann
    @type priority: int
2411 4679547e Michael Hanselmann
    @param priority: New priority
2412 4679547e Michael Hanselmann

2413 4679547e Michael Hanselmann
    """
2414 4679547e Michael Hanselmann
    logging.info("Changing priority of job %s to %s", job_id, priority)
2415 4679547e Michael Hanselmann
2416 4679547e Michael Hanselmann
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2417 4679547e Michael Hanselmann
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2418 4679547e Michael Hanselmann
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2419 4679547e Michael Hanselmann
                                (priority, allowed))
2420 4679547e Michael Hanselmann
2421 4679547e Michael Hanselmann
    def fn(job):
2422 4679547e Michael Hanselmann
      (success, msg) = job.ChangePriority(priority)
2423 4679547e Michael Hanselmann
2424 4679547e Michael Hanselmann
      if success:
2425 4679547e Michael Hanselmann
        try:
2426 4679547e Michael Hanselmann
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2427 4679547e Michael Hanselmann
        except workerpool.NoSuchTask:
2428 4679547e Michael Hanselmann
          logging.debug("Job %s is not in workerpool at this time", job.id)
2429 4679547e Michael Hanselmann
2430 4679547e Michael Hanselmann
      return (success, msg)
2431 4679547e Michael Hanselmann
2432 4679547e Michael Hanselmann
    return self._ModifyJobUnlocked(job_id, fn)
2433 4679547e Michael Hanselmann
2434 aebd0e4e Michael Hanselmann
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2435 aebd0e4e Michael Hanselmann
    """Modifies a job.
2436 aebd0e4e Michael Hanselmann

2437 aebd0e4e Michael Hanselmann
    @type job_id: int
2438 aebd0e4e Michael Hanselmann
    @param job_id: Job ID
2439 aebd0e4e Michael Hanselmann
    @type mod_fn: callable
2440 aebd0e4e Michael Hanselmann
    @param mod_fn: Modifying function, receiving job object as parameter,
2441 aebd0e4e Michael Hanselmann
      returning tuple of (status boolean, message string)
2442 aebd0e4e Michael Hanselmann

2443 aebd0e4e Michael Hanselmann
    """
2444 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2445 188c5e0a Michael Hanselmann
    if not job:
2446 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2447 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
2448 fbf0262f Michael Hanselmann
2449 aebd0e4e Michael Hanselmann
    assert job.writable, "Can't modify read-only job"
2450 aebd0e4e Michael Hanselmann
    assert not job.archived, "Can't modify archived job"
2451 c0f6d0d8 Michael Hanselmann
2452 aebd0e4e Michael Hanselmann
    (success, msg) = mod_fn(job)
2453 188c5e0a Michael Hanselmann
2454 099b2870 Michael Hanselmann
    if success:
2455 66bd7445 Michael Hanselmann
      # If the job was finalized (e.g. cancelled), this is the final write
2456 66bd7445 Michael Hanselmann
      # allowed. The job can be archived anytime.
2457 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
2458 fbf0262f Michael Hanselmann
2459 099b2870 Michael Hanselmann
    return (success, msg)
2460 fbf0262f Michael Hanselmann
2461 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
2462 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
2463 d7fd1f28 Michael Hanselmann
    """Archives jobs.
2464 c609f802 Michael Hanselmann

2465 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
2466 25e7b43f Iustin Pop
    @param jobs: Job objects
2467 d7fd1f28 Michael Hanselmann
    @rtype: int
2468 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
2469 c609f802 Michael Hanselmann

2470 c609f802 Michael Hanselmann
    """
2471 d7fd1f28 Michael Hanselmann
    archive_jobs = []
2472 d7fd1f28 Michael Hanselmann
    rename_files = []
2473 d7fd1f28 Michael Hanselmann
    for job in jobs:
2474 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't archive read-only job"
2475 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't cancel archived job"
2476 c0f6d0d8 Michael Hanselmann
2477 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2478 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
2479 d7fd1f28 Michael Hanselmann
        continue
2480 c609f802 Michael Hanselmann
2481 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
2482 c609f802 Michael Hanselmann
2483 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
2484 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
2485 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
2486 c609f802 Michael Hanselmann
2487 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
2488 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
2489 f1da30e6 Michael Hanselmann
2490 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
2491 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
2492 d7fd1f28 Michael Hanselmann
2493 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2494 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
2495 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
2496 20571a26 Guido Trotter
    # archived jobs to fix this.
2497 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
2498 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
2499 78d12585 Michael Hanselmann
2500 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2501 07cd723a Iustin Pop
  @_RequireOpenQueue
2502 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
2503 07cd723a Iustin Pop
    """Archives a job.
2504 07cd723a Iustin Pop

2505 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2506 ea03467c Iustin Pop

2507 76b62028 Iustin Pop
    @type job_id: int
2508 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
2509 78d12585 Michael Hanselmann
    @rtype: bool
2510 78d12585 Michael Hanselmann
    @return: Whether job was archived
2511 07cd723a Iustin Pop

2512 07cd723a Iustin Pop
    """
2513 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
2514 78d12585 Michael Hanselmann
2515 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2516 78d12585 Michael Hanselmann
    if not job:
2517 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2518 78d12585 Michael Hanselmann
      return False
2519 78d12585 Michael Hanselmann
2520 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
2521 07cd723a Iustin Pop
2522 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2523 07cd723a Iustin Pop
  @_RequireOpenQueue
2524 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
2525 07cd723a Iustin Pop
    """Archives all jobs based on age.
2526 07cd723a Iustin Pop

2527 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
2528 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
2529 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
2530 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
2531 07cd723a Iustin Pop

2532 07cd723a Iustin Pop
    @type age: int
2533 07cd723a Iustin Pop
    @param age: the minimum age in seconds
2534 07cd723a Iustin Pop

2535 07cd723a Iustin Pop
    """
2536 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
2537 07cd723a Iustin Pop
2538 07cd723a Iustin Pop
    now = time.time()
2539 f8ad5591 Michael Hanselmann
    end_time = now + timeout
2540 f8ad5591 Michael Hanselmann
    archived_count = 0
2541 f8ad5591 Michael Hanselmann
    last_touched = 0
2542 f8ad5591 Michael Hanselmann
2543 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
2544 d7fd1f28 Michael Hanselmann
    pending = []
2545 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
2546 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
2547 f8ad5591 Michael Hanselmann
2548 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
2549 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
2550 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
2551 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
2552 f8ad5591 Michael Hanselmann
        break
2553 f8ad5591 Michael Hanselmann
2554 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
2555 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
2556 f8ad5591 Michael Hanselmann
      if job:
2557 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
2558 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
2559 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
2560 f8ad5591 Michael Hanselmann
          else:
2561 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
2562 07cd723a Iustin Pop
        else:
2563 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
2564 f8ad5591 Michael Hanselmann
2565 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
2566 d7fd1f28 Michael Hanselmann
          pending.append(job)
2567 d7fd1f28 Michael Hanselmann
2568 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
2569 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
2570 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
2571 d7fd1f28 Michael Hanselmann
            pending = []
2572 f8ad5591 Michael Hanselmann
2573 d7fd1f28 Michael Hanselmann
    if pending:
2574 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
2575 07cd723a Iustin Pop
2576 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
2577 07cd723a Iustin Pop
2578 e07f7f7a Michael Hanselmann
  def _Query(self, fields, qfilter):
2579 e07f7f7a Michael Hanselmann
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2580 e07f7f7a Michael Hanselmann
                       namefield="id")
2581 e07f7f7a Michael Hanselmann
2582 0422250e Michael Hanselmann
    # Archived jobs are only looked at if the "archived" field is referenced
2583 0422250e Michael Hanselmann
    # either as a requested field or in the filter. By default archived jobs
2584 0422250e Michael Hanselmann
    # are ignored.
2585 0422250e Michael Hanselmann
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2586 0422250e Michael Hanselmann
2587 e07f7f7a Michael Hanselmann
    job_ids = qobj.RequestedNames()
2588 e07f7f7a Michael Hanselmann
2589 e07f7f7a Michael Hanselmann
    list_all = (job_ids is None)
2590 e07f7f7a Michael Hanselmann
2591 e07f7f7a Michael Hanselmann
    if list_all:
2592 e07f7f7a Michael Hanselmann
      # Since files are added to/removed from the queue atomically, there's no
2593 e07f7f7a Michael Hanselmann
      # risk of getting the job ids in an inconsistent state.
2594 0422250e Michael Hanselmann
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2595 e07f7f7a Michael Hanselmann
2596 e07f7f7a Michael Hanselmann
    jobs = []
2597 e07f7f7a Michael Hanselmann
2598 e07f7f7a Michael Hanselmann
    for job_id in job_ids:
2599 e07f7f7a Michael Hanselmann
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2600 e07f7f7a Michael Hanselmann
      if job is not None or not list_all:
2601 e07f7f7a Michael Hanselmann
        jobs.append((job_id, job))
2602 e07f7f7a Michael Hanselmann
2603 e07f7f7a Michael Hanselmann
    return (qobj, jobs, list_all)
2604 e07f7f7a Michael Hanselmann
2605 e07f7f7a Michael Hanselmann
  def QueryJobs(self, fields, qfilter):
2606 e07f7f7a Michael Hanselmann
    """Returns a list of jobs in queue.
2607 e07f7f7a Michael Hanselmann

2608 e07f7f7a Michael Hanselmann
    @type fields: sequence
2609 e07f7f7a Michael Hanselmann
    @param fields: List of wanted fields
2610 e07f7f7a Michael Hanselmann
    @type qfilter: None or query2 filter (list)
2611 e07f7f7a Michael Hanselmann
    @param qfilter: Query filter
2612 e07f7f7a Michael Hanselmann

2613 e07f7f7a Michael Hanselmann
    """
2614 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2615 e07f7f7a Michael Hanselmann
2616 76b62028 Iustin Pop
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2617 e07f7f7a Michael Hanselmann
2618 e07f7f7a Michael Hanselmann
  def OldStyleQueryJobs(self, job_ids, fields):
2619 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
2620 e2715f69 Michael Hanselmann

2621 ea03467c Iustin Pop
    @type job_ids: list
2622 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
2623 ea03467c Iustin Pop
    @type fields: list
2624 ea03467c Iustin Pop
    @param fields: names of fields to return
2625 ea03467c Iustin Pop
    @rtype: list
2626 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
2627 ea03467c Iustin Pop
        the requested fields
2628 e2715f69 Michael Hanselmann

2629 e2715f69 Michael Hanselmann
    """
2630 76b62028 Iustin Pop
    # backwards compat:
2631 76b62028 Iustin Pop
    job_ids = [int(jid) for jid in job_ids]
2632 e07f7f7a Michael Hanselmann
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2633 e2715f69 Michael Hanselmann
2634 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2635 e2715f69 Michael Hanselmann
2636 76b62028 Iustin Pop
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2637 e2715f69 Michael Hanselmann
2638 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2639 6d5ea385 Michael Hanselmann
  def PrepareShutdown(self):
2640 6d5ea385 Michael Hanselmann
    """Prepare to stop the job queue.
2641 6d5ea385 Michael Hanselmann

2642 6d5ea385 Michael Hanselmann
    Disables execution of jobs in the workerpool and returns whether there are
2643 6d5ea385 Michael Hanselmann
    any jobs currently running. If the latter is the case, the job queue is not
2644 6d5ea385 Michael Hanselmann
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2645 6d5ea385 Michael Hanselmann
    be called without interfering with any job. Queued and unfinished jobs will
2646 6d5ea385 Michael Hanselmann
    be resumed next time.
2647 6d5ea385 Michael Hanselmann

2648 6d5ea385 Michael Hanselmann
    Once this function has been called no new job submissions will be accepted
2649 6d5ea385 Michael Hanselmann
    (see L{_RequireNonDrainedQueue}).
2650 6d5ea385 Michael Hanselmann

2651 6d5ea385 Michael Hanselmann
    @rtype: bool
2652 6d5ea385 Michael Hanselmann
    @return: Whether there are any running jobs
2653 6d5ea385 Michael Hanselmann

2654 6d5ea385 Michael Hanselmann
    """
2655 6d5ea385 Michael Hanselmann
    if self._accepting_jobs:
2656 6d5ea385 Michael Hanselmann
      self._accepting_jobs = False
2657 6d5ea385 Michael Hanselmann
2658 6d5ea385 Michael Hanselmann
      # Tell worker pool to stop processing pending tasks
2659 6d5ea385 Michael Hanselmann
      self._wpool.SetActive(False)
2660 6d5ea385 Michael Hanselmann
2661 6d5ea385 Michael Hanselmann
    return self._wpool.HasRunningTasks()
2662 6d5ea385 Michael Hanselmann
2663 942e2262 Michael Hanselmann
  def AcceptingJobsUnlocked(self):
2664 942e2262 Michael Hanselmann
    """Returns whether jobs are accepted.
2665 942e2262 Michael Hanselmann

2666 942e2262 Michael Hanselmann
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2667 942e2262 Michael Hanselmann
    queue is shutting down.
2668 942e2262 Michael Hanselmann

2669 942e2262 Michael Hanselmann
    @rtype: bool
2670 942e2262 Michael Hanselmann

2671 942e2262 Michael Hanselmann
    """
2672 942e2262 Michael Hanselmann
    return self._accepting_jobs
2673 942e2262 Michael Hanselmann
2674 6d5ea385 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2675 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2676 e2715f69 Michael Hanselmann
  def Shutdown(self):
2677 e2715f69 Michael Hanselmann
    """Stops the job queue.
2678 e2715f69 Michael Hanselmann

2679 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
2680 ea03467c Iustin Pop

2681 e2715f69 Michael Hanselmann
    """
2682 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
2683 85f03e0d Michael Hanselmann
2684 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
2685 a71f9c7d Guido Trotter
    self._queue_filelock = None