Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ be6cdf67

History | View | Annotate | Download (76 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 95a4e33f Hrvoje Ribicic
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 e2715f69 Michael Hanselmann
import logging
33 f1da30e6 Michael Hanselmann
import errno
34 f1048938 Iustin Pop
import time
35 5685c1a5 Michael Hanselmann
import weakref
36 b95479a5 Michael Hanselmann
import threading
37 dfc8824a Michael Hanselmann
import itertools
38 99fb250b Michael Hanselmann
import operator
39 498ae1cc Iustin Pop
40 6c2549d6 Guido Trotter
try:
41 b459a848 Andrea Spadaccini
  # pylint: disable=E0611
42 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
43 6c2549d6 Guido Trotter
except ImportError:
44 6c2549d6 Guido Trotter
  import pyinotify
45 6c2549d6 Guido Trotter
46 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
47 e2715f69 Michael Hanselmann
from ganeti import constants
48 f1da30e6 Michael Hanselmann
from ganeti import serializer
49 e2715f69 Michael Hanselmann
from ganeti import workerpool
50 99bd4f0a Guido Trotter
from ganeti import locking
51 704b51ff Klaus Aehlig
from ganeti import luxi
52 f1da30e6 Michael Hanselmann
from ganeti import opcodes
53 580b1fdd Jose A. Lopes
from ganeti import opcodes_base
54 7a1ecaed Iustin Pop
from ganeti import errors
55 e2715f69 Michael Hanselmann
from ganeti import mcpu
56 7996a135 Iustin Pop
from ganeti import utils
57 04ab05ce Michael Hanselmann
from ganeti import jstore
58 4869595d Petr Pudlak
import ganeti.rpc.node as rpc
59 82b22e19 René Nussbaumer
from ganeti import runtime
60 a744b676 Manuel Franceschini
from ganeti import netutils
61 989a8bee Michael Hanselmann
from ganeti import compat
62 b95479a5 Michael Hanselmann
from ganeti import ht
63 a06c6ae8 Michael Hanselmann
from ganeti import query
64 a06c6ae8 Michael Hanselmann
from ganeti import qlang
65 e2b4a7ba Michael Hanselmann
from ganeti import pathutils
66 cffbbae7 Michael Hanselmann
from ganeti import vcluster
67 e2715f69 Michael Hanselmann
68 fbf0262f Michael Hanselmann
69 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
70 e2715f69 Michael Hanselmann
71 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
72 ebb80afa Guido Trotter
_LOCK = "_lock"
73 ebb80afa Guido Trotter
_QUEUE = "_queue"
74 99bd4f0a Guido Trotter
75 99fb250b Michael Hanselmann
#: Retrieves "id" attribute
76 99fb250b Michael Hanselmann
_GetIdAttr = operator.attrgetter("id")
77 99fb250b Michael Hanselmann
78 498ae1cc Iustin Pop
79 9728ae5d Iustin Pop
class CancelJob(Exception):
80 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
81 fbf0262f Michael Hanselmann

82 fbf0262f Michael Hanselmann
  """
83 fbf0262f Michael Hanselmann
84 fbf0262f Michael Hanselmann
85 942e2262 Michael Hanselmann
class QueueShutdown(Exception):
86 942e2262 Michael Hanselmann
  """Special exception to abort a job when the job queue is shutting down.
87 942e2262 Michael Hanselmann

88 942e2262 Michael Hanselmann
  """
89 942e2262 Michael Hanselmann
90 942e2262 Michael Hanselmann
91 70552c46 Michael Hanselmann
def TimeStampNow():
92 ea03467c Iustin Pop
  """Returns the current timestamp.
93 ea03467c Iustin Pop

94 ea03467c Iustin Pop
  @rtype: tuple
95 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
96 ea03467c Iustin Pop

97 ea03467c Iustin Pop
  """
98 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
99 70552c46 Michael Hanselmann
100 70552c46 Michael Hanselmann
101 cffbbae7 Michael Hanselmann
def _CallJqUpdate(runner, names, file_name, content):
102 cffbbae7 Michael Hanselmann
  """Updates job queue file after virtualizing filename.
103 cffbbae7 Michael Hanselmann

104 cffbbae7 Michael Hanselmann
  """
105 cffbbae7 Michael Hanselmann
  virt_file_name = vcluster.MakeVirtualPath(file_name)
106 cffbbae7 Michael Hanselmann
  return runner.call_jobqueue_update(names, virt_file_name, content)
107 cffbbae7 Michael Hanselmann
108 cffbbae7 Michael Hanselmann
109 a06c6ae8 Michael Hanselmann
class _SimpleJobQuery:
110 a06c6ae8 Michael Hanselmann
  """Wrapper for job queries.
111 a06c6ae8 Michael Hanselmann

112 a06c6ae8 Michael Hanselmann
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
113 a06c6ae8 Michael Hanselmann

114 a06c6ae8 Michael Hanselmann
  """
115 a06c6ae8 Michael Hanselmann
  def __init__(self, fields):
116 a06c6ae8 Michael Hanselmann
    """Initializes this class.
117 a06c6ae8 Michael Hanselmann

118 a06c6ae8 Michael Hanselmann
    """
119 a06c6ae8 Michael Hanselmann
    self._query = query.Query(query.JOB_FIELDS, fields)
120 a06c6ae8 Michael Hanselmann
121 a06c6ae8 Michael Hanselmann
  def __call__(self, job):
122 a06c6ae8 Michael Hanselmann
    """Executes a job query using cached field list.
123 a06c6ae8 Michael Hanselmann

124 a06c6ae8 Michael Hanselmann
    """
125 a06c6ae8 Michael Hanselmann
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
126 a06c6ae8 Michael Hanselmann
127 a06c6ae8 Michael Hanselmann
128 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
129 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
130 e2715f69 Michael Hanselmann

131 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
132 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
133 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
134 ea03467c Iustin Pop
  @ivar status: the current status
135 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
136 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
137 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
138 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
139 f1048938 Iustin Pop

140 e2715f69 Michael Hanselmann
  """
141 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
142 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
143 66d895a8 Iustin Pop
               "__weakref__"]
144 66d895a8 Iustin Pop
145 85f03e0d Michael Hanselmann
  def __init__(self, op):
146 66abb9ff Michael Hanselmann
    """Initializes instances of this class.
147 ea03467c Iustin Pop

148 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
149 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
150 ea03467c Iustin Pop

151 ea03467c Iustin Pop
    """
152 85f03e0d Michael Hanselmann
    self.input = op
153 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
154 85f03e0d Michael Hanselmann
    self.result = None
155 85f03e0d Michael Hanselmann
    self.log = []
156 70552c46 Michael Hanselmann
    self.start_timestamp = None
157 b9b5abcb Iustin Pop
    self.exec_timestamp = None
158 70552c46 Michael Hanselmann
    self.end_timestamp = None
159 f1da30e6 Michael Hanselmann
160 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
161 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
162 8f5c488d Michael Hanselmann
163 f1da30e6 Michael Hanselmann
  @classmethod
164 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
165 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
166 ea03467c Iustin Pop

167 ea03467c Iustin Pop
    @type state: dict
168 ea03467c Iustin Pop
    @param state: the serialized state
169 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
170 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
171 ea03467c Iustin Pop

172 ea03467c Iustin Pop
    """
173 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
174 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
175 85f03e0d Michael Hanselmann
    obj.status = state["status"]
176 85f03e0d Michael Hanselmann
    obj.result = state["result"]
177 85f03e0d Michael Hanselmann
    obj.log = state["log"]
178 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
179 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
180 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
181 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
182 f1da30e6 Michael Hanselmann
    return obj
183 f1da30e6 Michael Hanselmann
184 f1da30e6 Michael Hanselmann
  def Serialize(self):
185 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
186 ea03467c Iustin Pop

187 ea03467c Iustin Pop
    @rtype: dict
188 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
189 ea03467c Iustin Pop

190 ea03467c Iustin Pop
    """
191 6c5a7090 Michael Hanselmann
    return {
192 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
193 6c5a7090 Michael Hanselmann
      "status": self.status,
194 6c5a7090 Michael Hanselmann
      "result": self.result,
195 6c5a7090 Michael Hanselmann
      "log": self.log,
196 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
197 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
198 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
199 8f5c488d Michael Hanselmann
      "priority": self.priority,
200 6c5a7090 Michael Hanselmann
      }
201 f1048938 Iustin Pop
202 e2715f69 Michael Hanselmann
203 e2715f69 Michael Hanselmann
class _QueuedJob(object):
204 e2715f69 Michael Hanselmann
  """In-memory job representation.
205 e2715f69 Michael Hanselmann

206 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
207 ea03467c Iustin Pop
  be taken care of by users of this class.
208 ea03467c Iustin Pop

209 ea03467c Iustin Pop
  @type queue: L{JobQueue}
210 ea03467c Iustin Pop
  @ivar queue: the parent queue
211 ea03467c Iustin Pop
  @ivar id: the job ID
212 ea03467c Iustin Pop
  @type ops: list
213 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
214 ea03467c Iustin Pop
  @type log_serial: int
215 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
216 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
217 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
218 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
219 c0f6d0d8 Michael Hanselmann
  @ivar writable: Whether the job is allowed to be modified
220 e2715f69 Michael Hanselmann

221 e2715f69 Michael Hanselmann
  """
222 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
223 26d3fd2f Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
224 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
225 8a3cd185 Michael Hanselmann
               "__weakref__", "processor_lock", "writable", "archived"]
226 66d895a8 Iustin Pop
227 be6cdf67 Michele Tartara
  def AddReasons(self, pickup=False):
228 e0f2bf1e Michele Tartara
    """Extend the reason trail
229 e0f2bf1e Michele Tartara

230 e0f2bf1e Michele Tartara
    Add the reason for all the opcodes of this job to be executed.
231 e0f2bf1e Michele Tartara

232 e0f2bf1e Michele Tartara
    """
233 e0f2bf1e Michele Tartara
    count = 0
234 e0f2bf1e Michele Tartara
    for queued_op in self.ops:
235 e0f2bf1e Michele Tartara
      op = queued_op.input
236 be6cdf67 Michele Tartara
      if pickup:
237 be6cdf67 Michele Tartara
        reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP
238 be6cdf67 Michele Tartara
      else:
239 be6cdf67 Michele Tartara
        reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE
240 be6cdf67 Michele Tartara
      reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__,
241 be6cdf67 Michele Tartara
                                                reason_src_prefix)
242 e0f2bf1e Michele Tartara
      reason_text = "job=%d;index=%d" % (self.id, count)
243 e0f2bf1e Michele Tartara
      reason = getattr(op, "reason", [])
244 e0f2bf1e Michele Tartara
      reason.append((reason_src, reason_text, utils.EpochNano()))
245 e0f2bf1e Michele Tartara
      op.reason = reason
246 e0f2bf1e Michele Tartara
      count = count + 1
247 e0f2bf1e Michele Tartara
248 c0f6d0d8 Michael Hanselmann
  def __init__(self, queue, job_id, ops, writable):
249 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
250 ea03467c Iustin Pop

251 ea03467c Iustin Pop
    @type queue: L{JobQueue}
252 ea03467c Iustin Pop
    @param queue: our parent queue
253 ea03467c Iustin Pop
    @type job_id: job_id
254 ea03467c Iustin Pop
    @param job_id: our job id
255 ea03467c Iustin Pop
    @type ops: list
256 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
257 ea03467c Iustin Pop
        in _QueuedOpCodes
258 c0f6d0d8 Michael Hanselmann
    @type writable: bool
259 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
260 ea03467c Iustin Pop

261 ea03467c Iustin Pop
    """
262 e2715f69 Michael Hanselmann
    if not ops:
263 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
264 e2715f69 Michael Hanselmann
265 85f03e0d Michael Hanselmann
    self.queue = queue
266 76b62028 Iustin Pop
    self.id = int(job_id)
267 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
268 653bc0f1 Michele Tartara
    self.AddReasons()
269 6c5a7090 Michael Hanselmann
    self.log_serial = 0
270 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
271 c56ec146 Iustin Pop
    self.start_timestamp = None
272 c56ec146 Iustin Pop
    self.end_timestamp = None
273 8a3cd185 Michael Hanselmann
    self.archived = False
274 6c5a7090 Michael Hanselmann
275 c0f6d0d8 Michael Hanselmann
    self._InitInMemory(self, writable)
276 fa4aa6b4 Michael Hanselmann
277 8a3cd185 Michael Hanselmann
    assert not self.archived, "New jobs can not be marked as archived"
278 8a3cd185 Michael Hanselmann
279 fa4aa6b4 Michael Hanselmann
  @staticmethod
280 c0f6d0d8 Michael Hanselmann
  def _InitInMemory(obj, writable):
281 fa4aa6b4 Michael Hanselmann
    """Initializes in-memory variables.
282 fa4aa6b4 Michael Hanselmann

283 fa4aa6b4 Michael Hanselmann
    """
284 c0f6d0d8 Michael Hanselmann
    obj.writable = writable
285 03b63608 Michael Hanselmann
    obj.ops_iter = None
286 26d3fd2f Michael Hanselmann
    obj.cur_opctx = None
287 f8a4adfa Michael Hanselmann
288 f8a4adfa Michael Hanselmann
    # Read-only jobs are not processed and therefore don't need a lock
289 f8a4adfa Michael Hanselmann
    if writable:
290 f8a4adfa Michael Hanselmann
      obj.processor_lock = threading.Lock()
291 f8a4adfa Michael Hanselmann
    else:
292 f8a4adfa Michael Hanselmann
      obj.processor_lock = None
293 be760ba8 Michael Hanselmann
294 9fa2e150 Michael Hanselmann
  def __repr__(self):
295 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
296 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
297 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
298 9fa2e150 Michael Hanselmann
299 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
300 9fa2e150 Michael Hanselmann
301 f1da30e6 Michael Hanselmann
  @classmethod
302 8a3cd185 Michael Hanselmann
  def Restore(cls, queue, state, writable, archived):
303 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
304 ea03467c Iustin Pop

305 ea03467c Iustin Pop
    @type queue: L{JobQueue}
306 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
307 ea03467c Iustin Pop
    @type state: dict
308 ea03467c Iustin Pop
    @param state: the serialized state
309 c0f6d0d8 Michael Hanselmann
    @type writable: bool
310 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
311 8a3cd185 Michael Hanselmann
    @type archived: bool
312 8a3cd185 Michael Hanselmann
    @param archived: Whether job was already archived
313 ea03467c Iustin Pop
    @rtype: _JobQueue
314 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
315 ea03467c Iustin Pop

316 ea03467c Iustin Pop
    """
317 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
318 85f03e0d Michael Hanselmann
    obj.queue = queue
319 76b62028 Iustin Pop
    obj.id = int(state["id"])
320 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
321 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
322 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
323 8a3cd185 Michael Hanselmann
    obj.archived = archived
324 6c5a7090 Michael Hanselmann
325 6c5a7090 Michael Hanselmann
    obj.ops = []
326 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
327 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
328 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
329 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
330 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
331 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
332 6c5a7090 Michael Hanselmann
333 c0f6d0d8 Michael Hanselmann
    cls._InitInMemory(obj, writable)
334 be760ba8 Michael Hanselmann
335 f1da30e6 Michael Hanselmann
    return obj
336 f1da30e6 Michael Hanselmann
337 f1da30e6 Michael Hanselmann
  def Serialize(self):
338 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
339 ea03467c Iustin Pop

340 ea03467c Iustin Pop
    @rtype: dict
341 ea03467c Iustin Pop
    @return: the serialized state
342 ea03467c Iustin Pop

343 ea03467c Iustin Pop
    """
344 f1da30e6 Michael Hanselmann
    return {
345 f1da30e6 Michael Hanselmann
      "id": self.id,
346 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
347 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
348 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
349 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
350 f1da30e6 Michael Hanselmann
      }
351 f1da30e6 Michael Hanselmann
352 85f03e0d Michael Hanselmann
  def CalcStatus(self):
353 ea03467c Iustin Pop
    """Compute the status of this job.
354 ea03467c Iustin Pop

355 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
356 ea03467c Iustin Pop
    based on their status, computes the job status.
357 ea03467c Iustin Pop

358 ea03467c Iustin Pop
    The algorithm is:
359 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
360 ea03467c Iustin Pop
        status will be the same
361 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
362 ea03467c Iustin Pop
          - waitlock
363 fbf0262f Michael Hanselmann
          - canceling
364 ea03467c Iustin Pop
          - running
365 ea03467c Iustin Pop

366 ea03467c Iustin Pop
        will determine the job status
367 ea03467c Iustin Pop

368 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
369 ea03467c Iustin Pop
        and the job status will be the same
370 ea03467c Iustin Pop

371 ea03467c Iustin Pop
    @return: the job status
372 ea03467c Iustin Pop

373 ea03467c Iustin Pop
    """
374 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
375 e2715f69 Michael Hanselmann
376 e2715f69 Michael Hanselmann
    all_success = True
377 85f03e0d Michael Hanselmann
    for op in self.ops:
378 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
379 e2715f69 Michael Hanselmann
        continue
380 e2715f69 Michael Hanselmann
381 e2715f69 Michael Hanselmann
      all_success = False
382 e2715f69 Michael Hanselmann
383 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
384 e2715f69 Michael Hanselmann
        pass
385 47099cd1 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING:
386 47099cd1 Michael Hanselmann
        status = constants.JOB_STATUS_WAITING
387 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
388 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
389 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
390 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
391 fbf0262f Michael Hanselmann
        break
392 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
393 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
394 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
395 f1da30e6 Michael Hanselmann
        break
396 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
397 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
398 4cb1d919 Michael Hanselmann
        break
399 e2715f69 Michael Hanselmann
400 e2715f69 Michael Hanselmann
    if all_success:
401 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
402 e2715f69 Michael Hanselmann
403 e2715f69 Michael Hanselmann
    return status
404 e2715f69 Michael Hanselmann
405 8f5c488d Michael Hanselmann
  def CalcPriority(self):
406 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
407 8f5c488d Michael Hanselmann

408 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
409 8f5c488d Michael Hanselmann
    priority is used.
410 8f5c488d Michael Hanselmann

411 8f5c488d Michael Hanselmann
    @rtype: int
412 8f5c488d Michael Hanselmann

413 8f5c488d Michael Hanselmann
    """
414 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
415 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
416 8f5c488d Michael Hanselmann
417 8f5c488d Michael Hanselmann
    if not priorities:
418 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
419 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
420 8f5c488d Michael Hanselmann
421 8f5c488d Michael Hanselmann
    return min(priorities)
422 8f5c488d Michael Hanselmann
423 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
424 ea03467c Iustin Pop
    """Selectively returns the log entries.
425 ea03467c Iustin Pop

426 ea03467c Iustin Pop
    @type newer_than: None or int
427 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
428 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
429 ea03467c Iustin Pop
        than this value
430 ea03467c Iustin Pop
    @rtype: list
431 ea03467c Iustin Pop
    @return: the list of the log entries selected
432 ea03467c Iustin Pop

433 ea03467c Iustin Pop
    """
434 6c5a7090 Michael Hanselmann
    if newer_than is None:
435 6c5a7090 Michael Hanselmann
      serial = -1
436 6c5a7090 Michael Hanselmann
    else:
437 6c5a7090 Michael Hanselmann
      serial = newer_than
438 6c5a7090 Michael Hanselmann
439 6c5a7090 Michael Hanselmann
    entries = []
440 6c5a7090 Michael Hanselmann
    for op in self.ops:
441 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
442 6c5a7090 Michael Hanselmann
443 6c5a7090 Michael Hanselmann
    return entries
444 6c5a7090 Michael Hanselmann
445 6a290889 Guido Trotter
  def GetInfo(self, fields):
446 6a290889 Guido Trotter
    """Returns information about a job.
447 6a290889 Guido Trotter

448 6a290889 Guido Trotter
    @type fields: list
449 6a290889 Guido Trotter
    @param fields: names of fields to return
450 6a290889 Guido Trotter
    @rtype: list
451 6a290889 Guido Trotter
    @return: list with one element for each field
452 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
453 6a290889 Guido Trotter
        has been passed
454 6a290889 Guido Trotter

455 6a290889 Guido Trotter
    """
456 a06c6ae8 Michael Hanselmann
    return _SimpleJobQuery(fields)(self)
457 6a290889 Guido Trotter
458 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
459 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
460 34327f51 Iustin Pop

461 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
462 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
463 34327f51 Iustin Pop
    finalised are not changed.
464 34327f51 Iustin Pop

465 34327f51 Iustin Pop
    @param status: a given opcode status
466 34327f51 Iustin Pop
    @param result: the opcode result
467 34327f51 Iustin Pop

468 34327f51 Iustin Pop
    """
469 747f6113 Michael Hanselmann
    not_marked = True
470 747f6113 Michael Hanselmann
    for op in self.ops:
471 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
472 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
473 747f6113 Michael Hanselmann
        continue
474 747f6113 Michael Hanselmann
      op.status = status
475 747f6113 Michael Hanselmann
      op.result = result
476 747f6113 Michael Hanselmann
      not_marked = False
477 34327f51 Iustin Pop
478 66bd7445 Michael Hanselmann
  def Finalize(self):
479 66bd7445 Michael Hanselmann
    """Marks the job as finalized.
480 66bd7445 Michael Hanselmann

481 66bd7445 Michael Hanselmann
    """
482 66bd7445 Michael Hanselmann
    self.end_timestamp = TimeStampNow()
483 66bd7445 Michael Hanselmann
484 099b2870 Michael Hanselmann
  def Cancel(self):
485 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
486 a0d2fe2c Michael Hanselmann

487 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
488 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
489 a0d2fe2c Michael Hanselmann
      as canceling and a text message
490 a0d2fe2c Michael Hanselmann

491 a0d2fe2c Michael Hanselmann
    """
492 099b2870 Michael Hanselmann
    status = self.CalcStatus()
493 099b2870 Michael Hanselmann
494 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
495 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
496 099b2870 Michael Hanselmann
                             "Job canceled by request")
497 66bd7445 Michael Hanselmann
      self.Finalize()
498 86b16e9d Michael Hanselmann
      return (True, "Job %s canceled" % self.id)
499 099b2870 Michael Hanselmann
500 47099cd1 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITING:
501 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
502 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
503 86b16e9d Michael Hanselmann
      return (True, "Job %s will be canceled" % self.id)
504 099b2870 Michael Hanselmann
505 86b16e9d Michael Hanselmann
    else:
506 86b16e9d Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
507 86b16e9d Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
508 099b2870 Michael Hanselmann
509 4679547e Michael Hanselmann
  def ChangePriority(self, priority):
510 4679547e Michael Hanselmann
    """Changes the job priority.
511 4679547e Michael Hanselmann

512 4679547e Michael Hanselmann
    @type priority: int
513 4679547e Michael Hanselmann
    @param priority: New priority
514 4679547e Michael Hanselmann
    @rtype: tuple; (bool, string)
515 4679547e Michael Hanselmann
    @return: Boolean describing whether job's priority was successfully changed
516 4679547e Michael Hanselmann
      and a text message
517 4679547e Michael Hanselmann

518 4679547e Michael Hanselmann
    """
519 4679547e Michael Hanselmann
    status = self.CalcStatus()
520 4679547e Michael Hanselmann
521 4679547e Michael Hanselmann
    if status in constants.JOBS_FINALIZED:
522 4679547e Michael Hanselmann
      return (False, "Job %s is finished" % self.id)
523 4679547e Michael Hanselmann
    elif status == constants.JOB_STATUS_CANCELING:
524 4679547e Michael Hanselmann
      return (False, "Job %s is cancelling" % self.id)
525 4679547e Michael Hanselmann
    else:
526 4679547e Michael Hanselmann
      assert status in (constants.JOB_STATUS_QUEUED,
527 4679547e Michael Hanselmann
                        constants.JOB_STATUS_WAITING,
528 4679547e Michael Hanselmann
                        constants.JOB_STATUS_RUNNING)
529 4679547e Michael Hanselmann
530 4679547e Michael Hanselmann
      changed = False
531 4679547e Michael Hanselmann
      for op in self.ops:
532 4679547e Michael Hanselmann
        if (op.status == constants.OP_STATUS_RUNNING or
533 4679547e Michael Hanselmann
            op.status in constants.OPS_FINALIZED):
534 4679547e Michael Hanselmann
          assert not changed, \
535 4679547e Michael Hanselmann
            ("Found opcode for which priority should not be changed after"
536 4679547e Michael Hanselmann
             " priority has been changed for previous opcodes")
537 4679547e Michael Hanselmann
          continue
538 4679547e Michael Hanselmann
539 4679547e Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
540 4679547e Michael Hanselmann
                             constants.OP_STATUS_WAITING)
541 4679547e Michael Hanselmann
542 4679547e Michael Hanselmann
        changed = True
543 4679547e Michael Hanselmann
544 3c631ea2 Michael Hanselmann
        # Set new priority (doesn't modify opcode input)
545 4679547e Michael Hanselmann
        op.priority = priority
546 4679547e Michael Hanselmann
547 4679547e Michael Hanselmann
      if changed:
548 4679547e Michael Hanselmann
        return (True, ("Priorities of pending opcodes for job %s have been"
549 4679547e Michael Hanselmann
                       " changed to %s" % (self.id, priority)))
550 4679547e Michael Hanselmann
      else:
551 4679547e Michael Hanselmann
        return (False, "Job %s had no pending opcodes" % self.id)
552 4679547e Michael Hanselmann
553 f1048938 Iustin Pop
554 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
555 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
556 031a3e57 Michael Hanselmann
    """Initializes this class.
557 ea03467c Iustin Pop

558 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
559 031a3e57 Michael Hanselmann
    @param queue: Job queue
560 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
561 031a3e57 Michael Hanselmann
    @param job: Job object
562 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
563 031a3e57 Michael Hanselmann
    @param op: OpCode
564 031a3e57 Michael Hanselmann

565 031a3e57 Michael Hanselmann
    """
566 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
567 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
568 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
569 031a3e57 Michael Hanselmann
570 031a3e57 Michael Hanselmann
    self._queue = queue
571 031a3e57 Michael Hanselmann
    self._job = job
572 031a3e57 Michael Hanselmann
    self._op = op
573 031a3e57 Michael Hanselmann
574 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
575 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
576 dc1e2262 Michael Hanselmann

577 dc1e2262 Michael Hanselmann
    """
578 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
579 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
580 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
581 dc1e2262 Michael Hanselmann
      raise CancelJob()
582 dc1e2262 Michael Hanselmann
583 942e2262 Michael Hanselmann
    # See if queue is shutting down
584 942e2262 Michael Hanselmann
    if not self._queue.AcceptingJobsUnlocked():
585 942e2262 Michael Hanselmann
      logging.debug("Queue is shutting down")
586 942e2262 Michael Hanselmann
      raise QueueShutdown()
587 942e2262 Michael Hanselmann
588 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
589 031a3e57 Michael Hanselmann
  def NotifyStart(self):
590 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
591 e92376d7 Iustin Pop

592 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
593 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
594 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
595 47099cd1 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
596 e92376d7 Iustin Pop

597 e92376d7 Iustin Pop
    """
598 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
599 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
600 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
601 fbf0262f Michael Hanselmann
602 271daef8 Iustin Pop
    # Cancel here if we were asked to
603 dc1e2262 Michael Hanselmann
    self._CheckCancel()
604 fbf0262f Michael Hanselmann
605 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
606 9bdab621 Michael Hanselmann
607 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
608 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
609 271daef8 Iustin Pop
610 271daef8 Iustin Pop
    # And finally replicate the job status
611 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
612 031a3e57 Michael Hanselmann
613 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
614 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
615 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
616 9bf5e01f Guido Trotter

617 9bf5e01f Guido Trotter
    """
618 9bf5e01f Guido Trotter
    self._job.log_serial += 1
619 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
620 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
621 9bf5e01f Guido Trotter
622 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
623 031a3e57 Michael Hanselmann
    """Append a log entry.
624 031a3e57 Michael Hanselmann

625 031a3e57 Michael Hanselmann
    """
626 031a3e57 Michael Hanselmann
    assert len(args) < 3
627 031a3e57 Michael Hanselmann
628 031a3e57 Michael Hanselmann
    if len(args) == 1:
629 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
630 031a3e57 Michael Hanselmann
      log_msg = args[0]
631 031a3e57 Michael Hanselmann
    else:
632 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
633 031a3e57 Michael Hanselmann
634 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
635 031a3e57 Michael Hanselmann
    # precision.
636 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
637 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
638 031a3e57 Michael Hanselmann
639 e4e59de8 Michael Hanselmann
  def CurrentPriority(self):
640 e4e59de8 Michael Hanselmann
    """Returns current priority for opcode.
641 ef2df7d3 Michael Hanselmann

642 ef2df7d3 Michael Hanselmann
    """
643 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
644 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
645 dc1e2262 Michael Hanselmann
646 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
647 dc1e2262 Michael Hanselmann
    self._CheckCancel()
648 dc1e2262 Michael Hanselmann
649 e4e59de8 Michael Hanselmann
    return self._op.priority
650 e4e59de8 Michael Hanselmann
651 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
652 6a373640 Michael Hanselmann
    """Submits jobs for processing.
653 6a373640 Michael Hanselmann

654 6a373640 Michael Hanselmann
    See L{JobQueue.SubmitManyJobs}.
655 6a373640 Michael Hanselmann

656 6a373640 Michael Hanselmann
    """
657 6a373640 Michael Hanselmann
    # Locking is done in job queue
658 6a373640 Michael Hanselmann
    return self._queue.SubmitManyJobs(jobs)
659 6a373640 Michael Hanselmann
660 031a3e57 Michael Hanselmann
661 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
662 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
663 989a8bee Michael Hanselmann
    """Initializes this class.
664 6c2549d6 Guido Trotter

665 989a8bee Michael Hanselmann
    @type fields: list of strings
666 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
667 989a8bee Michael Hanselmann
    @type prev_job_info: string
668 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
669 989a8bee Michael Hanselmann
    @type prev_log_serial: string
670 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
671 6c2549d6 Guido Trotter

672 989a8bee Michael Hanselmann
    """
673 dc2879ea Michael Hanselmann
    self._squery = _SimpleJobQuery(fields)
674 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
675 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
676 6c2549d6 Guido Trotter
677 989a8bee Michael Hanselmann
  def __call__(self, job):
678 989a8bee Michael Hanselmann
    """Checks whether job has changed.
679 6c2549d6 Guido Trotter

680 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
681 989a8bee Michael Hanselmann
    @param job: Job object
682 6c2549d6 Guido Trotter

683 6c2549d6 Guido Trotter
    """
684 c0f6d0d8 Michael Hanselmann
    assert not job.writable, "Expected read-only job"
685 c0f6d0d8 Michael Hanselmann
686 989a8bee Michael Hanselmann
    status = job.CalcStatus()
687 dc2879ea Michael Hanselmann
    job_info = self._squery(job)
688 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
689 6c2549d6 Guido Trotter
690 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
691 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
692 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
693 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
694 6c2549d6 Guido Trotter
    # significantly different.
695 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
696 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
697 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
698 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
699 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
700 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
701 6c2549d6 Guido Trotter
702 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
703 6c2549d6 Guido Trotter
    # no changes.
704 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
705 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
706 47099cd1 Michael Hanselmann
                       constants.JOB_STATUS_WAITING) or
707 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
708 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
709 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
710 989a8bee Michael Hanselmann
      return (job_info, log_entries)
711 6c2549d6 Guido Trotter
712 989a8bee Michael Hanselmann
    return None
713 989a8bee Michael Hanselmann
714 989a8bee Michael Hanselmann
715 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
716 383477e9 Michael Hanselmann
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
717 989a8bee Michael Hanselmann
    """Initializes this class.
718 989a8bee Michael Hanselmann

719 989a8bee Michael Hanselmann
    @type filename: string
720 989a8bee Michael Hanselmann
    @param filename: Path to job file
721 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
722 6c2549d6 Guido Trotter

723 989a8bee Michael Hanselmann
    """
724 383477e9 Michael Hanselmann
    self._wm = _inotify_wm_cls()
725 989a8bee Michael Hanselmann
    self._inotify_handler = \
726 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
727 989a8bee Michael Hanselmann
    self._notifier = \
728 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
729 989a8bee Michael Hanselmann
    try:
730 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
731 989a8bee Michael Hanselmann
    except Exception:
732 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
733 989a8bee Michael Hanselmann
      self._notifier.stop()
734 989a8bee Michael Hanselmann
      raise
735 989a8bee Michael Hanselmann
736 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
737 989a8bee Michael Hanselmann
    """Callback for inotify.
738 989a8bee Michael Hanselmann

739 989a8bee Michael Hanselmann
    """
740 6c2549d6 Guido Trotter
    if not notifier_enabled:
741 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
742 989a8bee Michael Hanselmann
743 989a8bee Michael Hanselmann
  def Wait(self, timeout):
744 989a8bee Michael Hanselmann
    """Waits for the job file to change.
745 989a8bee Michael Hanselmann

746 989a8bee Michael Hanselmann
    @type timeout: float
747 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
748 989a8bee Michael Hanselmann
    @return: Whether there have been events
749 989a8bee Michael Hanselmann

750 989a8bee Michael Hanselmann
    """
751 989a8bee Michael Hanselmann
    assert timeout >= 0
752 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
753 989a8bee Michael Hanselmann
    if have_events:
754 989a8bee Michael Hanselmann
      self._notifier.read_events()
755 989a8bee Michael Hanselmann
    self._notifier.process_events()
756 989a8bee Michael Hanselmann
    return have_events
757 989a8bee Michael Hanselmann
758 989a8bee Michael Hanselmann
  def Close(self):
759 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
760 989a8bee Michael Hanselmann

761 989a8bee Michael Hanselmann
    """
762 989a8bee Michael Hanselmann
    self._notifier.stop()
763 989a8bee Michael Hanselmann
764 989a8bee Michael Hanselmann
765 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
766 383477e9 Michael Hanselmann
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
767 989a8bee Michael Hanselmann
    """Initializes this class.
768 989a8bee Michael Hanselmann

769 989a8bee Michael Hanselmann
    @type filename: string
770 989a8bee Michael Hanselmann
    @param filename: Path to job file
771 989a8bee Michael Hanselmann

772 989a8bee Michael Hanselmann
    """
773 989a8bee Michael Hanselmann
    self._filewaiter = None
774 989a8bee Michael Hanselmann
    self._filename = filename
775 383477e9 Michael Hanselmann
    self._waiter_cls = _waiter_cls
776 6c2549d6 Guido Trotter
777 989a8bee Michael Hanselmann
  def Wait(self, timeout):
778 989a8bee Michael Hanselmann
    """Waits for a job to change.
779 6c2549d6 Guido Trotter

780 989a8bee Michael Hanselmann
    @type timeout: float
781 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
782 989a8bee Michael Hanselmann
    @return: Whether there have been events
783 989a8bee Michael Hanselmann

784 989a8bee Michael Hanselmann
    """
785 989a8bee Michael Hanselmann
    if self._filewaiter:
786 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
787 989a8bee Michael Hanselmann
788 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
789 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
790 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
791 989a8bee Michael Hanselmann
    # race condition.
792 383477e9 Michael Hanselmann
    self._filewaiter = self._waiter_cls(self._filename)
793 989a8bee Michael Hanselmann
794 989a8bee Michael Hanselmann
    return True
795 989a8bee Michael Hanselmann
796 989a8bee Michael Hanselmann
  def Close(self):
797 989a8bee Michael Hanselmann
    """Closes underlying waiter.
798 989a8bee Michael Hanselmann

799 989a8bee Michael Hanselmann
    """
800 989a8bee Michael Hanselmann
    if self._filewaiter:
801 989a8bee Michael Hanselmann
      self._filewaiter.Close()
802 989a8bee Michael Hanselmann
803 989a8bee Michael Hanselmann
804 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
805 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
806 989a8bee Michael Hanselmann

807 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
808 989a8bee Michael Hanselmann
  the current job status has changed.
809 989a8bee Michael Hanselmann

810 989a8bee Michael Hanselmann
  """
811 989a8bee Michael Hanselmann
  @staticmethod
812 dfc8824a Michael Hanselmann
  def _CheckForChanges(counter, job_load_fn, check_fn):
813 dfc8824a Michael Hanselmann
    if counter.next() > 0:
814 dfc8824a Michael Hanselmann
      # If this isn't the first check the job is given some more time to change
815 dfc8824a Michael Hanselmann
      # again. This gives better performance for jobs generating many
816 dfc8824a Michael Hanselmann
      # changes/messages.
817 dfc8824a Michael Hanselmann
      time.sleep(0.1)
818 dfc8824a Michael Hanselmann
819 989a8bee Michael Hanselmann
    job = job_load_fn()
820 989a8bee Michael Hanselmann
    if not job:
821 989a8bee Michael Hanselmann
      raise errors.JobLost()
822 989a8bee Michael Hanselmann
823 989a8bee Michael Hanselmann
    result = check_fn(job)
824 989a8bee Michael Hanselmann
    if result is None:
825 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
826 989a8bee Michael Hanselmann
827 989a8bee Michael Hanselmann
    return result
828 989a8bee Michael Hanselmann
829 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
830 383477e9 Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout,
831 383477e9 Michael Hanselmann
               _waiter_cls=_JobChangesWaiter):
832 989a8bee Michael Hanselmann
    """Waits for changes on a job.
833 989a8bee Michael Hanselmann

834 989a8bee Michael Hanselmann
    @type filename: string
835 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
836 989a8bee Michael Hanselmann
    @type job_load_fn: callable
837 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
838 989a8bee Michael Hanselmann
    @type fields: list of strings
839 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
840 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
841 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
842 989a8bee Michael Hanselmann
    @type prev_log_serial: int
843 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
844 989a8bee Michael Hanselmann
    @type timeout: float
845 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
846 989a8bee Michael Hanselmann

847 989a8bee Michael Hanselmann
    """
848 dfc8824a Michael Hanselmann
    counter = itertools.count()
849 6c2549d6 Guido Trotter
    try:
850 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
851 383477e9 Michael Hanselmann
      waiter = _waiter_cls(filename)
852 989a8bee Michael Hanselmann
      try:
853 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
854 dfc8824a Michael Hanselmann
                                          counter, job_load_fn, check_fn),
855 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
856 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
857 989a8bee Michael Hanselmann
      finally:
858 989a8bee Michael Hanselmann
        waiter.Close()
859 383477e9 Michael Hanselmann
    except errors.JobLost:
860 6c2549d6 Guido Trotter
      return None
861 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
862 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
863 6c2549d6 Guido Trotter
864 6c2549d6 Guido Trotter
865 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
866 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
867 6760e4ed Michael Hanselmann

868 6760e4ed Michael Hanselmann
  """
869 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
870 6760e4ed Michael Hanselmann
    to_encode = err
871 6760e4ed Michael Hanselmann
  else:
872 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
873 6760e4ed Michael Hanselmann
874 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
875 6760e4ed Michael Hanselmann
876 6760e4ed Michael Hanselmann
877 26d3fd2f Michael Hanselmann
class _TimeoutStrategyWrapper:
878 26d3fd2f Michael Hanselmann
  def __init__(self, fn):
879 26d3fd2f Michael Hanselmann
    """Initializes this class.
880 26d3fd2f Michael Hanselmann

881 26d3fd2f Michael Hanselmann
    """
882 26d3fd2f Michael Hanselmann
    self._fn = fn
883 26d3fd2f Michael Hanselmann
    self._next = None
884 26d3fd2f Michael Hanselmann
885 26d3fd2f Michael Hanselmann
  def _Advance(self):
886 26d3fd2f Michael Hanselmann
    """Gets the next timeout if necessary.
887 26d3fd2f Michael Hanselmann

888 26d3fd2f Michael Hanselmann
    """
889 26d3fd2f Michael Hanselmann
    if self._next is None:
890 26d3fd2f Michael Hanselmann
      self._next = self._fn()
891 26d3fd2f Michael Hanselmann
892 26d3fd2f Michael Hanselmann
  def Peek(self):
893 26d3fd2f Michael Hanselmann
    """Returns the next timeout.
894 26d3fd2f Michael Hanselmann

895 26d3fd2f Michael Hanselmann
    """
896 26d3fd2f Michael Hanselmann
    self._Advance()
897 26d3fd2f Michael Hanselmann
    return self._next
898 26d3fd2f Michael Hanselmann
899 26d3fd2f Michael Hanselmann
  def Next(self):
900 26d3fd2f Michael Hanselmann
    """Returns the current timeout and advances the internal state.
901 26d3fd2f Michael Hanselmann

902 26d3fd2f Michael Hanselmann
    """
903 26d3fd2f Michael Hanselmann
    self._Advance()
904 26d3fd2f Michael Hanselmann
    result = self._next
905 26d3fd2f Michael Hanselmann
    self._next = None
906 26d3fd2f Michael Hanselmann
    return result
907 26d3fd2f Michael Hanselmann
908 26d3fd2f Michael Hanselmann
909 b80cc518 Michael Hanselmann
class _OpExecContext:
910 26d3fd2f Michael Hanselmann
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
911 b80cc518 Michael Hanselmann
    """Initializes this class.
912 b80cc518 Michael Hanselmann

913 b80cc518 Michael Hanselmann
    """
914 b80cc518 Michael Hanselmann
    self.op = op
915 b80cc518 Michael Hanselmann
    self.index = index
916 b80cc518 Michael Hanselmann
    self.log_prefix = log_prefix
917 b80cc518 Michael Hanselmann
    self.summary = op.input.Summary()
918 b80cc518 Michael Hanselmann
919 b95479a5 Michael Hanselmann
    # Create local copy to modify
920 580b1fdd Jose A. Lopes
    if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
921 b95479a5 Michael Hanselmann
      self.jobdeps = op.input.depends[:]
922 b95479a5 Michael Hanselmann
    else:
923 b95479a5 Michael Hanselmann
      self.jobdeps = None
924 b95479a5 Michael Hanselmann
925 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = timeout_strategy_factory
926 26d3fd2f Michael Hanselmann
    self._ResetTimeoutStrategy()
927 26d3fd2f Michael Hanselmann
928 26d3fd2f Michael Hanselmann
  def _ResetTimeoutStrategy(self):
929 26d3fd2f Michael Hanselmann
    """Creates a new timeout strategy.
930 26d3fd2f Michael Hanselmann

931 26d3fd2f Michael Hanselmann
    """
932 26d3fd2f Michael Hanselmann
    self._timeout_strategy = \
933 26d3fd2f Michael Hanselmann
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
934 26d3fd2f Michael Hanselmann
935 26d3fd2f Michael Hanselmann
  def CheckPriorityIncrease(self):
936 26d3fd2f Michael Hanselmann
    """Checks whether priority can and should be increased.
937 26d3fd2f Michael Hanselmann

938 26d3fd2f Michael Hanselmann
    Called when locks couldn't be acquired.
939 26d3fd2f Michael Hanselmann

940 26d3fd2f Michael Hanselmann
    """
941 26d3fd2f Michael Hanselmann
    op = self.op
942 26d3fd2f Michael Hanselmann
943 26d3fd2f Michael Hanselmann
    # Exhausted all retries and next round should not use blocking acquire
944 26d3fd2f Michael Hanselmann
    # for locks?
945 26d3fd2f Michael Hanselmann
    if (self._timeout_strategy.Peek() is None and
946 26d3fd2f Michael Hanselmann
        op.priority > constants.OP_PRIO_HIGHEST):
947 26d3fd2f Michael Hanselmann
      logging.debug("Increasing priority")
948 26d3fd2f Michael Hanselmann
      op.priority -= 1
949 26d3fd2f Michael Hanselmann
      self._ResetTimeoutStrategy()
950 26d3fd2f Michael Hanselmann
      return True
951 26d3fd2f Michael Hanselmann
952 26d3fd2f Michael Hanselmann
    return False
953 26d3fd2f Michael Hanselmann
954 26d3fd2f Michael Hanselmann
  def GetNextLockTimeout(self):
955 26d3fd2f Michael Hanselmann
    """Returns the next lock acquire timeout.
956 26d3fd2f Michael Hanselmann

957 26d3fd2f Michael Hanselmann
    """
958 26d3fd2f Michael Hanselmann
    return self._timeout_strategy.Next()
959 26d3fd2f Michael Hanselmann
960 b80cc518 Michael Hanselmann
961 be760ba8 Michael Hanselmann
class _JobProcessor(object):
962 75d81fc8 Michael Hanselmann
  (DEFER,
963 75d81fc8 Michael Hanselmann
   WAITDEP,
964 75d81fc8 Michael Hanselmann
   FINISHED) = range(1, 4)
965 75d81fc8 Michael Hanselmann
966 26d3fd2f Michael Hanselmann
  def __init__(self, queue, opexec_fn, job,
967 26d3fd2f Michael Hanselmann
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
968 be760ba8 Michael Hanselmann
    """Initializes this class.
969 be760ba8 Michael Hanselmann

970 be760ba8 Michael Hanselmann
    """
971 be760ba8 Michael Hanselmann
    self.queue = queue
972 be760ba8 Michael Hanselmann
    self.opexec_fn = opexec_fn
973 be760ba8 Michael Hanselmann
    self.job = job
974 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = _timeout_strategy_factory
975 be760ba8 Michael Hanselmann
976 be760ba8 Michael Hanselmann
  @staticmethod
977 26d3fd2f Michael Hanselmann
  def _FindNextOpcode(job, timeout_strategy_factory):
978 be760ba8 Michael Hanselmann
    """Locates the next opcode to run.
979 be760ba8 Michael Hanselmann

980 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
981 be760ba8 Michael Hanselmann
    @param job: Job object
982 26d3fd2f Michael Hanselmann
    @param timeout_strategy_factory: Callable to create new timeout strategy
983 be760ba8 Michael Hanselmann

984 be760ba8 Michael Hanselmann
    """
985 be760ba8 Michael Hanselmann
    # Create some sort of a cache to speed up locating next opcode for future
986 be760ba8 Michael Hanselmann
    # lookups
987 be760ba8 Michael Hanselmann
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
988 be760ba8 Michael Hanselmann
    # pending and one for processed ops.
989 03b63608 Michael Hanselmann
    if job.ops_iter is None:
990 03b63608 Michael Hanselmann
      job.ops_iter = enumerate(job.ops)
991 be760ba8 Michael Hanselmann
992 be760ba8 Michael Hanselmann
    # Find next opcode to run
993 be760ba8 Michael Hanselmann
    while True:
994 be760ba8 Michael Hanselmann
      try:
995 03b63608 Michael Hanselmann
        (idx, op) = job.ops_iter.next()
996 be760ba8 Michael Hanselmann
      except StopIteration:
997 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for a finished job")
998 be760ba8 Michael Hanselmann
999 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_RUNNING:
1000 be760ba8 Michael Hanselmann
        # Found an opcode already marked as running
1001 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for job marked as running")
1002 be760ba8 Michael Hanselmann
1003 26d3fd2f Michael Hanselmann
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
1004 26d3fd2f Michael Hanselmann
                             timeout_strategy_factory)
1005 be760ba8 Michael Hanselmann
1006 66bd7445 Michael Hanselmann
      if op.status not in constants.OPS_FINALIZED:
1007 66bd7445 Michael Hanselmann
        return opctx
1008 be760ba8 Michael Hanselmann
1009 66bd7445 Michael Hanselmann
      # This is a job that was partially completed before master daemon
1010 66bd7445 Michael Hanselmann
      # shutdown, so it can be expected that some opcodes are already
1011 66bd7445 Michael Hanselmann
      # completed successfully (if any did error out, then the whole job
1012 66bd7445 Michael Hanselmann
      # should have been aborted and not resubmitted for processing).
1013 66bd7445 Michael Hanselmann
      logging.info("%s: opcode %s already processed, skipping",
1014 66bd7445 Michael Hanselmann
                   opctx.log_prefix, opctx.summary)
1015 be760ba8 Michael Hanselmann
1016 be760ba8 Michael Hanselmann
  @staticmethod
1017 be760ba8 Michael Hanselmann
  def _MarkWaitlock(job, op):
1018 be760ba8 Michael Hanselmann
    """Marks an opcode as waiting for locks.
1019 be760ba8 Michael Hanselmann

1020 be760ba8 Michael Hanselmann
    The job's start timestamp is also set if necessary.
1021 be760ba8 Michael Hanselmann

1022 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
1023 be760ba8 Michael Hanselmann
    @param job: Job object
1024 a38e8674 Michael Hanselmann
    @type op: L{_QueuedOpCode}
1025 a38e8674 Michael Hanselmann
    @param op: Opcode object
1026 be760ba8 Michael Hanselmann

1027 be760ba8 Michael Hanselmann
    """
1028 be760ba8 Michael Hanselmann
    assert op in job.ops
1029 5fd6b694 Michael Hanselmann
    assert op.status in (constants.OP_STATUS_QUEUED,
1030 47099cd1 Michael Hanselmann
                         constants.OP_STATUS_WAITING)
1031 5fd6b694 Michael Hanselmann
1032 5fd6b694 Michael Hanselmann
    update = False
1033 be760ba8 Michael Hanselmann
1034 be760ba8 Michael Hanselmann
    op.result = None
1035 5fd6b694 Michael Hanselmann
1036 5fd6b694 Michael Hanselmann
    if op.status == constants.OP_STATUS_QUEUED:
1037 47099cd1 Michael Hanselmann
      op.status = constants.OP_STATUS_WAITING
1038 5fd6b694 Michael Hanselmann
      update = True
1039 5fd6b694 Michael Hanselmann
1040 5fd6b694 Michael Hanselmann
    if op.start_timestamp is None:
1041 5fd6b694 Michael Hanselmann
      op.start_timestamp = TimeStampNow()
1042 5fd6b694 Michael Hanselmann
      update = True
1043 be760ba8 Michael Hanselmann
1044 be760ba8 Michael Hanselmann
    if job.start_timestamp is None:
1045 be760ba8 Michael Hanselmann
      job.start_timestamp = op.start_timestamp
1046 5fd6b694 Michael Hanselmann
      update = True
1047 5fd6b694 Michael Hanselmann
1048 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1049 5fd6b694 Michael Hanselmann
1050 5fd6b694 Michael Hanselmann
    return update
1051 be760ba8 Michael Hanselmann
1052 b95479a5 Michael Hanselmann
  @staticmethod
1053 b95479a5 Michael Hanselmann
  def _CheckDependencies(queue, job, opctx):
1054 b95479a5 Michael Hanselmann
    """Checks if an opcode has dependencies and if so, processes them.
1055 b95479a5 Michael Hanselmann

1056 b95479a5 Michael Hanselmann
    @type queue: L{JobQueue}
1057 b95479a5 Michael Hanselmann
    @param queue: Queue object
1058 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1059 b95479a5 Michael Hanselmann
    @param job: Job object
1060 b95479a5 Michael Hanselmann
    @type opctx: L{_OpExecContext}
1061 b95479a5 Michael Hanselmann
    @param opctx: Opcode execution context
1062 b95479a5 Michael Hanselmann
    @rtype: bool
1063 b95479a5 Michael Hanselmann
    @return: Whether opcode will be re-scheduled by dependency tracker
1064 b95479a5 Michael Hanselmann

1065 b95479a5 Michael Hanselmann
    """
1066 b95479a5 Michael Hanselmann
    op = opctx.op
1067 b95479a5 Michael Hanselmann
1068 b95479a5 Michael Hanselmann
    result = False
1069 b95479a5 Michael Hanselmann
1070 b95479a5 Michael Hanselmann
    while opctx.jobdeps:
1071 b95479a5 Michael Hanselmann
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1072 b95479a5 Michael Hanselmann
1073 b95479a5 Michael Hanselmann
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1074 b95479a5 Michael Hanselmann
                                                          dep_status)
1075 b95479a5 Michael Hanselmann
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1076 b95479a5 Michael Hanselmann
1077 b95479a5 Michael Hanselmann
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1078 b95479a5 Michael Hanselmann
1079 b95479a5 Michael Hanselmann
      if depresult == _JobDependencyManager.CONTINUE:
1080 b95479a5 Michael Hanselmann
        # Remove dependency and continue
1081 b95479a5 Michael Hanselmann
        opctx.jobdeps.pop(0)
1082 b95479a5 Michael Hanselmann
1083 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.WAIT:
1084 b95479a5 Michael Hanselmann
        # Need to wait for notification, dependency tracker will re-add job
1085 b95479a5 Michael Hanselmann
        # to workerpool
1086 b95479a5 Michael Hanselmann
        result = True
1087 b95479a5 Michael Hanselmann
        break
1088 b95479a5 Michael Hanselmann
1089 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.CANCEL:
1090 b95479a5 Michael Hanselmann
        # Job was cancelled, cancel this job as well
1091 b95479a5 Michael Hanselmann
        job.Cancel()
1092 b95479a5 Michael Hanselmann
        assert op.status == constants.OP_STATUS_CANCELING
1093 b95479a5 Michael Hanselmann
        break
1094 b95479a5 Michael Hanselmann
1095 b95479a5 Michael Hanselmann
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1096 b95479a5 Michael Hanselmann
                         _JobDependencyManager.ERROR):
1097 b95479a5 Michael Hanselmann
        # Job failed or there was an error, this job must fail
1098 b95479a5 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1099 b95479a5 Michael Hanselmann
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1100 b95479a5 Michael Hanselmann
        break
1101 b95479a5 Michael Hanselmann
1102 b95479a5 Michael Hanselmann
      else:
1103 b95479a5 Michael Hanselmann
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1104 b95479a5 Michael Hanselmann
                                     depresult)
1105 b95479a5 Michael Hanselmann
1106 b95479a5 Michael Hanselmann
    return result
1107 b95479a5 Michael Hanselmann
1108 b80cc518 Michael Hanselmann
  def _ExecOpCodeUnlocked(self, opctx):
1109 be760ba8 Michael Hanselmann
    """Processes one opcode and returns the result.
1110 be760ba8 Michael Hanselmann

1111 be760ba8 Michael Hanselmann
    """
1112 b80cc518 Michael Hanselmann
    op = opctx.op
1113 b80cc518 Michael Hanselmann
1114 e6e17529 Hrvoje Ribicic
    assert op.status in (constants.OP_STATUS_WAITING,
1115 e6e17529 Hrvoje Ribicic
                         constants.OP_STATUS_CANCELING)
1116 e6e17529 Hrvoje Ribicic
1117 e6e17529 Hrvoje Ribicic
    # The very last check if the job was cancelled before trying to execute
1118 e6e17529 Hrvoje Ribicic
    if op.status == constants.OP_STATUS_CANCELING:
1119 e6e17529 Hrvoje Ribicic
      return (constants.OP_STATUS_CANCELING, None)
1120 be760ba8 Michael Hanselmann
1121 26d3fd2f Michael Hanselmann
    timeout = opctx.GetNextLockTimeout()
1122 26d3fd2f Michael Hanselmann
1123 be760ba8 Michael Hanselmann
    try:
1124 be760ba8 Michael Hanselmann
      # Make sure not to hold queue lock while calling ExecOpCode
1125 be760ba8 Michael Hanselmann
      result = self.opexec_fn(op.input,
1126 26d3fd2f Michael Hanselmann
                              _OpExecCallbacks(self.queue, self.job, op),
1127 e4e59de8 Michael Hanselmann
                              timeout=timeout)
1128 26d3fd2f Michael Hanselmann
    except mcpu.LockAcquireTimeout:
1129 26d3fd2f Michael Hanselmann
      assert timeout is not None, "Received timeout for blocking acquire"
1130 26d3fd2f Michael Hanselmann
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1131 9e49dfc5 Michael Hanselmann
1132 47099cd1 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_WAITING,
1133 9e49dfc5 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1134 9e49dfc5 Michael Hanselmann
1135 9e49dfc5 Michael Hanselmann
      # Was job cancelled while we were waiting for the lock?
1136 9e49dfc5 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELING:
1137 9e49dfc5 Michael Hanselmann
        return (constants.OP_STATUS_CANCELING, None)
1138 9e49dfc5 Michael Hanselmann
1139 942e2262 Michael Hanselmann
      # Queue is shutting down, return to queued
1140 942e2262 Michael Hanselmann
      if not self.queue.AcceptingJobsUnlocked():
1141 942e2262 Michael Hanselmann
        return (constants.OP_STATUS_QUEUED, None)
1142 942e2262 Michael Hanselmann
1143 5fd6b694 Michael Hanselmann
      # Stay in waitlock while trying to re-acquire lock
1144 47099cd1 Michael Hanselmann
      return (constants.OP_STATUS_WAITING, None)
1145 be760ba8 Michael Hanselmann
    except CancelJob:
1146 b80cc518 Michael Hanselmann
      logging.exception("%s: Canceling job", opctx.log_prefix)
1147 be760ba8 Michael Hanselmann
      assert op.status == constants.OP_STATUS_CANCELING
1148 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_CANCELING, None)
1149 942e2262 Michael Hanselmann
1150 942e2262 Michael Hanselmann
    except QueueShutdown:
1151 942e2262 Michael Hanselmann
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1152 942e2262 Michael Hanselmann
1153 942e2262 Michael Hanselmann
      assert op.status == constants.OP_STATUS_WAITING
1154 942e2262 Michael Hanselmann
1155 942e2262 Michael Hanselmann
      # Job hadn't been started yet, so it should return to the queue
1156 942e2262 Michael Hanselmann
      return (constants.OP_STATUS_QUEUED, None)
1157 942e2262 Michael Hanselmann
1158 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1159 b80cc518 Michael Hanselmann
      logging.exception("%s: Caught exception in %s",
1160 b80cc518 Michael Hanselmann
                        opctx.log_prefix, opctx.summary)
1161 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1162 be760ba8 Michael Hanselmann
    else:
1163 b80cc518 Michael Hanselmann
      logging.debug("%s: %s successful",
1164 b80cc518 Michael Hanselmann
                    opctx.log_prefix, opctx.summary)
1165 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_SUCCESS, result)
1166 be760ba8 Michael Hanselmann
1167 26d3fd2f Michael Hanselmann
  def __call__(self, _nextop_fn=None):
1168 be760ba8 Michael Hanselmann
    """Continues execution of a job.
1169 be760ba8 Michael Hanselmann

1170 26d3fd2f Michael Hanselmann
    @param _nextop_fn: Callback function for tests
1171 75d81fc8 Michael Hanselmann
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1172 75d81fc8 Michael Hanselmann
      be deferred and C{WAITDEP} if the dependency manager
1173 75d81fc8 Michael Hanselmann
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1174 be760ba8 Michael Hanselmann

1175 be760ba8 Michael Hanselmann
    """
1176 be760ba8 Michael Hanselmann
    queue = self.queue
1177 be760ba8 Michael Hanselmann
    job = self.job
1178 be760ba8 Michael Hanselmann
1179 be760ba8 Michael Hanselmann
    logging.debug("Processing job %s", job.id)
1180 be760ba8 Michael Hanselmann
1181 be760ba8 Michael Hanselmann
    queue.acquire(shared=1)
1182 be760ba8 Michael Hanselmann
    try:
1183 be760ba8 Michael Hanselmann
      opcount = len(job.ops)
1184 be760ba8 Michael Hanselmann
1185 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Expected writable job"
1186 c0f6d0d8 Michael Hanselmann
1187 66bd7445 Michael Hanselmann
      # Don't do anything for finalized jobs
1188 66bd7445 Michael Hanselmann
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1189 75d81fc8 Michael Hanselmann
        return self.FINISHED
1190 66bd7445 Michael Hanselmann
1191 26d3fd2f Michael Hanselmann
      # Is a previous opcode still pending?
1192 26d3fd2f Michael Hanselmann
      if job.cur_opctx:
1193 26d3fd2f Michael Hanselmann
        opctx = job.cur_opctx
1194 5fd6b694 Michael Hanselmann
        job.cur_opctx = None
1195 26d3fd2f Michael Hanselmann
      else:
1196 26d3fd2f Michael Hanselmann
        if __debug__ and _nextop_fn:
1197 26d3fd2f Michael Hanselmann
          _nextop_fn()
1198 26d3fd2f Michael Hanselmann
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1199 26d3fd2f Michael Hanselmann
1200 b80cc518 Michael Hanselmann
      op = opctx.op
1201 be760ba8 Michael Hanselmann
1202 be760ba8 Michael Hanselmann
      # Consistency check
1203 be760ba8 Michael Hanselmann
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1204 66bd7445 Michael Hanselmann
                                     constants.OP_STATUS_CANCELING)
1205 5fd6b694 Michael Hanselmann
                        for i in job.ops[opctx.index + 1:])
1206 be760ba8 Michael Hanselmann
1207 be760ba8 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_QUEUED,
1208 47099cd1 Michael Hanselmann
                           constants.OP_STATUS_WAITING,
1209 66bd7445 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1210 be760ba8 Michael Hanselmann
1211 26d3fd2f Michael Hanselmann
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1212 26d3fd2f Michael Hanselmann
              op.priority >= constants.OP_PRIO_HIGHEST)
1213 26d3fd2f Michael Hanselmann
1214 b95479a5 Michael Hanselmann
      waitjob = None
1215 b95479a5 Michael Hanselmann
1216 66bd7445 Michael Hanselmann
      if op.status != constants.OP_STATUS_CANCELING:
1217 30c945d0 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
1218 47099cd1 Michael Hanselmann
                             constants.OP_STATUS_WAITING)
1219 30c945d0 Michael Hanselmann
1220 be760ba8 Michael Hanselmann
        # Prepare to start opcode
1221 5fd6b694 Michael Hanselmann
        if self._MarkWaitlock(job, op):
1222 5fd6b694 Michael Hanselmann
          # Write to disk
1223 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1224 be760ba8 Michael Hanselmann
1225 47099cd1 Michael Hanselmann
        assert op.status == constants.OP_STATUS_WAITING
1226 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1227 5fd6b694 Michael Hanselmann
        assert job.start_timestamp and op.start_timestamp
1228 b95479a5 Michael Hanselmann
        assert waitjob is None
1229 b95479a5 Michael Hanselmann
1230 b95479a5 Michael Hanselmann
        # Check if waiting for a job is necessary
1231 b95479a5 Michael Hanselmann
        waitjob = self._CheckDependencies(queue, job, opctx)
1232 be760ba8 Michael Hanselmann
1233 47099cd1 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_WAITING,
1234 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_CANCELING,
1235 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_ERROR)
1236 be760ba8 Michael Hanselmann
1237 b95479a5 Michael Hanselmann
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1238 b95479a5 Michael Hanselmann
                                         constants.OP_STATUS_ERROR)):
1239 b95479a5 Michael Hanselmann
          logging.info("%s: opcode %s waiting for locks",
1240 b95479a5 Michael Hanselmann
                       opctx.log_prefix, opctx.summary)
1241 be760ba8 Michael Hanselmann
1242 b95479a5 Michael Hanselmann
          assert not opctx.jobdeps, "Not all dependencies were removed"
1243 b95479a5 Michael Hanselmann
1244 b95479a5 Michael Hanselmann
          queue.release()
1245 b95479a5 Michael Hanselmann
          try:
1246 b95479a5 Michael Hanselmann
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1247 b95479a5 Michael Hanselmann
          finally:
1248 b95479a5 Michael Hanselmann
            queue.acquire(shared=1)
1249 b95479a5 Michael Hanselmann
1250 b95479a5 Michael Hanselmann
          op.status = op_status
1251 b95479a5 Michael Hanselmann
          op.result = op_result
1252 b95479a5 Michael Hanselmann
1253 b95479a5 Michael Hanselmann
          assert not waitjob
1254 be760ba8 Michael Hanselmann
1255 942e2262 Michael Hanselmann
        if op.status in (constants.OP_STATUS_WAITING,
1256 942e2262 Michael Hanselmann
                         constants.OP_STATUS_QUEUED):
1257 942e2262 Michael Hanselmann
          # waiting: Couldn't get locks in time
1258 942e2262 Michael Hanselmann
          # queued: Queue is shutting down
1259 26d3fd2f Michael Hanselmann
          assert not op.end_timestamp
1260 be760ba8 Michael Hanselmann
        else:
1261 26d3fd2f Michael Hanselmann
          # Finalize opcode
1262 26d3fd2f Michael Hanselmann
          op.end_timestamp = TimeStampNow()
1263 be760ba8 Michael Hanselmann
1264 26d3fd2f Michael Hanselmann
          if op.status == constants.OP_STATUS_CANCELING:
1265 26d3fd2f Michael Hanselmann
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1266 26d3fd2f Michael Hanselmann
                                  for i in job.ops[opctx.index:])
1267 26d3fd2f Michael Hanselmann
          else:
1268 26d3fd2f Michael Hanselmann
            assert op.status in constants.OPS_FINALIZED
1269 be760ba8 Michael Hanselmann
1270 942e2262 Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
1271 942e2262 Michael Hanselmann
        # Queue is shutting down
1272 942e2262 Michael Hanselmann
        assert not waitjob
1273 942e2262 Michael Hanselmann
1274 942e2262 Michael Hanselmann
        finalize = False
1275 942e2262 Michael Hanselmann
1276 942e2262 Michael Hanselmann
        # Reset context
1277 942e2262 Michael Hanselmann
        job.cur_opctx = None
1278 942e2262 Michael Hanselmann
1279 942e2262 Michael Hanselmann
        # In no case must the status be finalized here
1280 942e2262 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1281 942e2262 Michael Hanselmann
1282 942e2262 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1283 be760ba8 Michael Hanselmann
        finalize = False
1284 be760ba8 Michael Hanselmann
1285 b95479a5 Michael Hanselmann
        if not waitjob and opctx.CheckPriorityIncrease():
1286 5fd6b694 Michael Hanselmann
          # Priority was changed, need to update on-disk file
1287 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1288 be760ba8 Michael Hanselmann
1289 26d3fd2f Michael Hanselmann
        # Keep around for another round
1290 26d3fd2f Michael Hanselmann
        job.cur_opctx = opctx
1291 be760ba8 Michael Hanselmann
1292 26d3fd2f Michael Hanselmann
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1293 26d3fd2f Michael Hanselmann
                op.priority >= constants.OP_PRIO_HIGHEST)
1294 be760ba8 Michael Hanselmann
1295 26d3fd2f Michael Hanselmann
        # In no case must the status be finalized here
1296 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1297 be760ba8 Michael Hanselmann
1298 be760ba8 Michael Hanselmann
      else:
1299 26d3fd2f Michael Hanselmann
        # Ensure all opcodes so far have been successful
1300 26d3fd2f Michael Hanselmann
        assert (opctx.index == 0 or
1301 26d3fd2f Michael Hanselmann
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1302 26d3fd2f Michael Hanselmann
                           for i in job.ops[:opctx.index]))
1303 26d3fd2f Michael Hanselmann
1304 26d3fd2f Michael Hanselmann
        # Reset context
1305 26d3fd2f Michael Hanselmann
        job.cur_opctx = None
1306 26d3fd2f Michael Hanselmann
1307 26d3fd2f Michael Hanselmann
        if op.status == constants.OP_STATUS_SUCCESS:
1308 26d3fd2f Michael Hanselmann
          finalize = False
1309 26d3fd2f Michael Hanselmann
1310 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_ERROR:
1311 26d3fd2f Michael Hanselmann
          # Ensure failed opcode has an exception as its result
1312 26d3fd2f Michael Hanselmann
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1313 26d3fd2f Michael Hanselmann
1314 26d3fd2f Michael Hanselmann
          to_encode = errors.OpExecError("Preceding opcode failed")
1315 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1316 26d3fd2f Michael Hanselmann
                                _EncodeOpError(to_encode))
1317 26d3fd2f Michael Hanselmann
          finalize = True
1318 be760ba8 Michael Hanselmann
1319 26d3fd2f Michael Hanselmann
          # Consistency check
1320 26d3fd2f Michael Hanselmann
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1321 26d3fd2f Michael Hanselmann
                            errors.GetEncodedError(i.result)
1322 26d3fd2f Michael Hanselmann
                            for i in job.ops[opctx.index:])
1323 be760ba8 Michael Hanselmann
1324 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELING:
1325 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1326 26d3fd2f Michael Hanselmann
                                "Job canceled by request")
1327 26d3fd2f Michael Hanselmann
          finalize = True
1328 26d3fd2f Michael Hanselmann
1329 26d3fd2f Michael Hanselmann
        else:
1330 26d3fd2f Michael Hanselmann
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1331 26d3fd2f Michael Hanselmann
1332 66bd7445 Michael Hanselmann
        if opctx.index == (opcount - 1):
1333 66bd7445 Michael Hanselmann
          # Finalize on last opcode
1334 66bd7445 Michael Hanselmann
          finalize = True
1335 66bd7445 Michael Hanselmann
1336 66bd7445 Michael Hanselmann
        if finalize:
1337 26d3fd2f Michael Hanselmann
          # All opcodes have been run, finalize job
1338 66bd7445 Michael Hanselmann
          job.Finalize()
1339 26d3fd2f Michael Hanselmann
1340 26d3fd2f Michael Hanselmann
        # Write to disk. If the job status is final, this is the final write
1341 26d3fd2f Michael Hanselmann
        # allowed. Once the file has been written, it can be archived anytime.
1342 26d3fd2f Michael Hanselmann
        queue.UpdateJobUnlocked(job)
1343 be760ba8 Michael Hanselmann
1344 b95479a5 Michael Hanselmann
        assert not waitjob
1345 b95479a5 Michael Hanselmann
1346 66bd7445 Michael Hanselmann
        if finalize:
1347 26d3fd2f Michael Hanselmann
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1348 75d81fc8 Michael Hanselmann
          return self.FINISHED
1349 be760ba8 Michael Hanselmann
1350 b95479a5 Michael Hanselmann
      assert not waitjob or queue.depmgr.JobWaiting(job)
1351 b95479a5 Michael Hanselmann
1352 75d81fc8 Michael Hanselmann
      if waitjob:
1353 75d81fc8 Michael Hanselmann
        return self.WAITDEP
1354 75d81fc8 Michael Hanselmann
      else:
1355 75d81fc8 Michael Hanselmann
        return self.DEFER
1356 be760ba8 Michael Hanselmann
    finally:
1357 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Job became read-only while being processed"
1358 be760ba8 Michael Hanselmann
      queue.release()
1359 be760ba8 Michael Hanselmann
1360 be760ba8 Michael Hanselmann
1361 df5a5730 Michael Hanselmann
def _EvaluateJobProcessorResult(depmgr, job, result):
1362 df5a5730 Michael Hanselmann
  """Looks at a result from L{_JobProcessor} for a job.
1363 df5a5730 Michael Hanselmann

1364 df5a5730 Michael Hanselmann
  To be used in a L{_JobQueueWorker}.
1365 df5a5730 Michael Hanselmann

1366 df5a5730 Michael Hanselmann
  """
1367 df5a5730 Michael Hanselmann
  if result == _JobProcessor.FINISHED:
1368 df5a5730 Michael Hanselmann
    # Notify waiting jobs
1369 df5a5730 Michael Hanselmann
    depmgr.NotifyWaiters(job.id)
1370 df5a5730 Michael Hanselmann
1371 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.DEFER:
1372 df5a5730 Michael Hanselmann
    # Schedule again
1373 df5a5730 Michael Hanselmann
    raise workerpool.DeferTask(priority=job.CalcPriority())
1374 df5a5730 Michael Hanselmann
1375 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.WAITDEP:
1376 df5a5730 Michael Hanselmann
    # No-op, dependency manager will re-schedule
1377 df5a5730 Michael Hanselmann
    pass
1378 df5a5730 Michael Hanselmann
1379 df5a5730 Michael Hanselmann
  else:
1380 df5a5730 Michael Hanselmann
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1381 df5a5730 Michael Hanselmann
                                 (result, ))
1382 df5a5730 Michael Hanselmann
1383 df5a5730 Michael Hanselmann
1384 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
1385 031a3e57 Michael Hanselmann
  """The actual job workers.
1386 031a3e57 Michael Hanselmann

1387 031a3e57 Michael Hanselmann
  """
1388 b459a848 Andrea Spadaccini
  def RunTask(self, job): # pylint: disable=W0221
1389 e2715f69 Michael Hanselmann
    """Job executor.
1390 e2715f69 Michael Hanselmann

1391 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1392 ea03467c Iustin Pop
    @param job: the job to be processed
1393 ea03467c Iustin Pop

1394 e2715f69 Michael Hanselmann
    """
1395 f8a4adfa Michael Hanselmann
    assert job.writable, "Expected writable job"
1396 f8a4adfa Michael Hanselmann
1397 b95479a5 Michael Hanselmann
    # Ensure only one worker is active on a single job. If a job registers for
1398 b95479a5 Michael Hanselmann
    # a dependency job, and the other job notifies before the first worker is
1399 b95479a5 Michael Hanselmann
    # done, the job can end up in the tasklist more than once.
1400 b95479a5 Michael Hanselmann
    job.processor_lock.acquire()
1401 b95479a5 Michael Hanselmann
    try:
1402 b95479a5 Michael Hanselmann
      return self._RunTaskInner(job)
1403 b95479a5 Michael Hanselmann
    finally:
1404 b95479a5 Michael Hanselmann
      job.processor_lock.release()
1405 b95479a5 Michael Hanselmann
1406 b95479a5 Michael Hanselmann
  def _RunTaskInner(self, job):
1407 b95479a5 Michael Hanselmann
    """Executes a job.
1408 b95479a5 Michael Hanselmann

1409 b95479a5 Michael Hanselmann
    Must be called with per-job lock acquired.
1410 b95479a5 Michael Hanselmann

1411 b95479a5 Michael Hanselmann
    """
1412 be760ba8 Michael Hanselmann
    queue = job.queue
1413 be760ba8 Michael Hanselmann
    assert queue == self.pool.queue
1414 be760ba8 Michael Hanselmann
1415 0aeeb6e3 Michael Hanselmann
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1416 0aeeb6e3 Michael Hanselmann
    setname_fn(None)
1417 daba67c7 Michael Hanselmann
1418 be760ba8 Michael Hanselmann
    proc = mcpu.Processor(queue.context, job.id)
1419 be760ba8 Michael Hanselmann
1420 0aeeb6e3 Michael Hanselmann
    # Create wrapper for setting thread name
1421 0aeeb6e3 Michael Hanselmann
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1422 0aeeb6e3 Michael Hanselmann
                                    proc.ExecOpCode)
1423 0aeeb6e3 Michael Hanselmann
1424 df5a5730 Michael Hanselmann
    _EvaluateJobProcessorResult(queue.depmgr, job,
1425 df5a5730 Michael Hanselmann
                                _JobProcessor(queue, wrap_execop_fn, job)())
1426 75d81fc8 Michael Hanselmann
1427 0aeeb6e3 Michael Hanselmann
  @staticmethod
1428 0aeeb6e3 Michael Hanselmann
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1429 0aeeb6e3 Michael Hanselmann
    """Updates the worker thread name to include a short summary of the opcode.
1430 0aeeb6e3 Michael Hanselmann

1431 0aeeb6e3 Michael Hanselmann
    @param setname_fn: Callable setting worker thread name
1432 0aeeb6e3 Michael Hanselmann
    @param execop_fn: Callable for executing opcode (usually
1433 0aeeb6e3 Michael Hanselmann
                      L{mcpu.Processor.ExecOpCode})
1434 0aeeb6e3 Michael Hanselmann

1435 0aeeb6e3 Michael Hanselmann
    """
1436 0aeeb6e3 Michael Hanselmann
    setname_fn(op)
1437 0aeeb6e3 Michael Hanselmann
    try:
1438 0aeeb6e3 Michael Hanselmann
      return execop_fn(op, *args, **kwargs)
1439 0aeeb6e3 Michael Hanselmann
    finally:
1440 0aeeb6e3 Michael Hanselmann
      setname_fn(None)
1441 0aeeb6e3 Michael Hanselmann
1442 0aeeb6e3 Michael Hanselmann
  @staticmethod
1443 0aeeb6e3 Michael Hanselmann
  def _GetWorkerName(job, op):
1444 0aeeb6e3 Michael Hanselmann
    """Sets the worker thread name.
1445 0aeeb6e3 Michael Hanselmann

1446 0aeeb6e3 Michael Hanselmann
    @type job: L{_QueuedJob}
1447 0aeeb6e3 Michael Hanselmann
    @type op: L{opcodes.OpCode}
1448 0aeeb6e3 Michael Hanselmann

1449 0aeeb6e3 Michael Hanselmann
    """
1450 0aeeb6e3 Michael Hanselmann
    parts = ["Job%s" % job.id]
1451 0aeeb6e3 Michael Hanselmann
1452 0aeeb6e3 Michael Hanselmann
    if op:
1453 0aeeb6e3 Michael Hanselmann
      parts.append(op.TinySummary())
1454 0aeeb6e3 Michael Hanselmann
1455 0aeeb6e3 Michael Hanselmann
    return "/".join(parts)
1456 0aeeb6e3 Michael Hanselmann
1457 e2715f69 Michael Hanselmann
1458 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
1459 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
1460 ea03467c Iustin Pop

1461 ea03467c Iustin Pop
  """
1462 5bdce580 Michael Hanselmann
  def __init__(self, queue):
1463 0aeeb6e3 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("Jq",
1464 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
1465 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
1466 5bdce580 Michael Hanselmann
    self.queue = queue
1467 e2715f69 Michael Hanselmann
1468 e2715f69 Michael Hanselmann
1469 b95479a5 Michael Hanselmann
class _JobDependencyManager:
1470 b95479a5 Michael Hanselmann
  """Keeps track of job dependencies.
1471 b95479a5 Michael Hanselmann

1472 b95479a5 Michael Hanselmann
  """
1473 b95479a5 Michael Hanselmann
  (WAIT,
1474 b95479a5 Michael Hanselmann
   ERROR,
1475 b95479a5 Michael Hanselmann
   CANCEL,
1476 b95479a5 Michael Hanselmann
   CONTINUE,
1477 b95479a5 Michael Hanselmann
   WRONGSTATUS) = range(1, 6)
1478 b95479a5 Michael Hanselmann
1479 b95479a5 Michael Hanselmann
  def __init__(self, getstatus_fn, enqueue_fn):
1480 b95479a5 Michael Hanselmann
    """Initializes this class.
1481 b95479a5 Michael Hanselmann

1482 b95479a5 Michael Hanselmann
    """
1483 b95479a5 Michael Hanselmann
    self._getstatus_fn = getstatus_fn
1484 b95479a5 Michael Hanselmann
    self._enqueue_fn = enqueue_fn
1485 b95479a5 Michael Hanselmann
1486 b95479a5 Michael Hanselmann
    self._waiters = {}
1487 b95479a5 Michael Hanselmann
    self._lock = locking.SharedLock("JobDepMgr")
1488 b95479a5 Michael Hanselmann
1489 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1490 b459a848 Andrea Spadaccini
  def GetLockInfo(self, requested): # pylint: disable=W0613
1491 fcb21ad7 Michael Hanselmann
    """Retrieves information about waiting jobs.
1492 fcb21ad7 Michael Hanselmann

1493 fcb21ad7 Michael Hanselmann
    @type requested: set
1494 fcb21ad7 Michael Hanselmann
    @param requested: Requested information, see C{query.LQ_*}
1495 fcb21ad7 Michael Hanselmann

1496 fcb21ad7 Michael Hanselmann
    """
1497 fcb21ad7 Michael Hanselmann
    # No need to sort here, that's being done by the lock manager and query
1498 fcb21ad7 Michael Hanselmann
    # library. There are no priorities for notifying jobs, hence all show up as
1499 fcb21ad7 Michael Hanselmann
    # one item under "pending".
1500 fcb21ad7 Michael Hanselmann
    return [("job/%s" % job_id, None, None,
1501 fcb21ad7 Michael Hanselmann
             [("job", [job.id for job in waiters])])
1502 fcb21ad7 Michael Hanselmann
            for job_id, waiters in self._waiters.items()
1503 fcb21ad7 Michael Hanselmann
            if waiters]
1504 fcb21ad7 Michael Hanselmann
1505 fcb21ad7 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1506 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
1507 b95479a5 Michael Hanselmann
    """Checks if a job is waiting.
1508 b95479a5 Michael Hanselmann

1509 b95479a5 Michael Hanselmann
    """
1510 b95479a5 Michael Hanselmann
    return compat.any(job in jobs
1511 b95479a5 Michael Hanselmann
                      for jobs in self._waiters.values())
1512 b95479a5 Michael Hanselmann
1513 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1514 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1515 b95479a5 Michael Hanselmann
    """Checks if a dependency job has the requested status.
1516 b95479a5 Michael Hanselmann

1517 b95479a5 Michael Hanselmann
    If the other job is not yet in a finalized status, the calling job will be
1518 b95479a5 Michael Hanselmann
    notified (re-added to the workerpool) at a later point.
1519 b95479a5 Michael Hanselmann

1520 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1521 b95479a5 Michael Hanselmann
    @param job: Job object
1522 76b62028 Iustin Pop
    @type dep_job_id: int
1523 b95479a5 Michael Hanselmann
    @param dep_job_id: ID of dependency job
1524 b95479a5 Michael Hanselmann
    @type dep_status: list
1525 b95479a5 Michael Hanselmann
    @param dep_status: Required status
1526 b95479a5 Michael Hanselmann

1527 b95479a5 Michael Hanselmann
    """
1528 76b62028 Iustin Pop
    assert ht.TJobId(job.id)
1529 76b62028 Iustin Pop
    assert ht.TJobId(dep_job_id)
1530 b95479a5 Michael Hanselmann
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1531 b95479a5 Michael Hanselmann
1532 b95479a5 Michael Hanselmann
    if job.id == dep_job_id:
1533 b95479a5 Michael Hanselmann
      return (self.ERROR, "Job can't depend on itself")
1534 b95479a5 Michael Hanselmann
1535 b95479a5 Michael Hanselmann
    # Get status of dependency job
1536 b95479a5 Michael Hanselmann
    try:
1537 b95479a5 Michael Hanselmann
      status = self._getstatus_fn(dep_job_id)
1538 b95479a5 Michael Hanselmann
    except errors.JobLost, err:
1539 b95479a5 Michael Hanselmann
      return (self.ERROR, "Dependency error: %s" % err)
1540 b95479a5 Michael Hanselmann
1541 b95479a5 Michael Hanselmann
    assert status in constants.JOB_STATUS_ALL
1542 b95479a5 Michael Hanselmann
1543 b95479a5 Michael Hanselmann
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1544 b95479a5 Michael Hanselmann
1545 b95479a5 Michael Hanselmann
    if status not in constants.JOBS_FINALIZED:
1546 b95479a5 Michael Hanselmann
      # Register for notification and wait for job to finish
1547 b95479a5 Michael Hanselmann
      job_id_waiters.add(job)
1548 b95479a5 Michael Hanselmann
      return (self.WAIT,
1549 b95479a5 Michael Hanselmann
              "Need to wait for job %s, wanted status '%s'" %
1550 b95479a5 Michael Hanselmann
              (dep_job_id, dep_status))
1551 b95479a5 Michael Hanselmann
1552 b95479a5 Michael Hanselmann
    # Remove from waiters list
1553 b95479a5 Michael Hanselmann
    if job in job_id_waiters:
1554 b95479a5 Michael Hanselmann
      job_id_waiters.remove(job)
1555 b95479a5 Michael Hanselmann
1556 b95479a5 Michael Hanselmann
    if (status == constants.JOB_STATUS_CANCELED and
1557 b95479a5 Michael Hanselmann
        constants.JOB_STATUS_CANCELED not in dep_status):
1558 b95479a5 Michael Hanselmann
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1559 b95479a5 Michael Hanselmann
1560 b95479a5 Michael Hanselmann
    elif not dep_status or status in dep_status:
1561 b95479a5 Michael Hanselmann
      return (self.CONTINUE,
1562 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s'" %
1563 b95479a5 Michael Hanselmann
              (dep_job_id, status))
1564 b95479a5 Michael Hanselmann
1565 b95479a5 Michael Hanselmann
    else:
1566 b95479a5 Michael Hanselmann
      return (self.WRONGSTATUS,
1567 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s',"
1568 b95479a5 Michael Hanselmann
              " not one of '%s' as required" %
1569 b95479a5 Michael Hanselmann
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1570 b95479a5 Michael Hanselmann
1571 37d76f1e Michael Hanselmann
  def _RemoveEmptyWaitersUnlocked(self):
1572 37d76f1e Michael Hanselmann
    """Remove all jobs without actual waiters.
1573 37d76f1e Michael Hanselmann

1574 37d76f1e Michael Hanselmann
    """
1575 37d76f1e Michael Hanselmann
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1576 37d76f1e Michael Hanselmann
                   if not waiters]:
1577 37d76f1e Michael Hanselmann
      del self._waiters[job_id]
1578 37d76f1e Michael Hanselmann
1579 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
1580 b95479a5 Michael Hanselmann
    """Notifies all jobs waiting for a certain job ID.
1581 b95479a5 Michael Hanselmann

1582 1316ebc2 Michael Hanselmann
    @attention: Do not call until L{CheckAndRegister} returned a status other
1583 1316ebc2 Michael Hanselmann
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1584 76b62028 Iustin Pop
    @type job_id: int
1585 b95479a5 Michael Hanselmann
    @param job_id: Job ID
1586 b95479a5 Michael Hanselmann

1587 b95479a5 Michael Hanselmann
    """
1588 76b62028 Iustin Pop
    assert ht.TJobId(job_id)
1589 b95479a5 Michael Hanselmann
1590 37d76f1e Michael Hanselmann
    self._lock.acquire()
1591 37d76f1e Michael Hanselmann
    try:
1592 37d76f1e Michael Hanselmann
      self._RemoveEmptyWaitersUnlocked()
1593 37d76f1e Michael Hanselmann
1594 37d76f1e Michael Hanselmann
      jobs = self._waiters.pop(job_id, None)
1595 37d76f1e Michael Hanselmann
    finally:
1596 37d76f1e Michael Hanselmann
      self._lock.release()
1597 37d76f1e Michael Hanselmann
1598 b95479a5 Michael Hanselmann
    if jobs:
1599 b95479a5 Michael Hanselmann
      # Re-add jobs to workerpool
1600 b95479a5 Michael Hanselmann
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1601 b95479a5 Michael Hanselmann
                    len(jobs), job_id)
1602 b95479a5 Michael Hanselmann
      self._enqueue_fn(jobs)
1603 b95479a5 Michael Hanselmann
1604 b95479a5 Michael Hanselmann
1605 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
1606 6c881c52 Iustin Pop
  """Decorator for "public" functions.
1607 ea03467c Iustin Pop

1608 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
1609 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
1610 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
1611 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
1612 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
1613 ea03467c Iustin Pop

1614 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
1615 f1da30e6 Michael Hanselmann

1616 6c881c52 Iustin Pop
  Example::
1617 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
1618 6c881c52 Iustin Pop
    @_RequireOpenQueue
1619 6c881c52 Iustin Pop
    def Example(self):
1620 6c881c52 Iustin Pop
      pass
1621 db37da70 Michael Hanselmann

1622 6c881c52 Iustin Pop
  """
1623 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
1624 b459a848 Andrea Spadaccini
    # pylint: disable=W0212
1625 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
1626 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
1627 6c881c52 Iustin Pop
  return wrapper
1628 db37da70 Michael Hanselmann
1629 db37da70 Michael Hanselmann
1630 c8d0be94 Michael Hanselmann
def _RequireNonDrainedQueue(fn):
1631 c8d0be94 Michael Hanselmann
  """Decorator checking for a non-drained queue.
1632 c8d0be94 Michael Hanselmann

1633 c8d0be94 Michael Hanselmann
  To be used with functions submitting new jobs.
1634 c8d0be94 Michael Hanselmann

1635 c8d0be94 Michael Hanselmann
  """
1636 c8d0be94 Michael Hanselmann
  def wrapper(self, *args, **kwargs):
1637 c8d0be94 Michael Hanselmann
    """Wrapper function.
1638 c8d0be94 Michael Hanselmann

1639 c8d0be94 Michael Hanselmann
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1640 c8d0be94 Michael Hanselmann

1641 c8d0be94 Michael Hanselmann
    """
1642 c8d0be94 Michael Hanselmann
    # Ok when sharing the big job queue lock, as the drain file is created when
1643 c8d0be94 Michael Hanselmann
    # the lock is exclusive.
1644 c8d0be94 Michael Hanselmann
    # Needs access to protected member, pylint: disable=W0212
1645 c8d0be94 Michael Hanselmann
    if self._drained:
1646 c8d0be94 Michael Hanselmann
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1647 6d5ea385 Michael Hanselmann
1648 6d5ea385 Michael Hanselmann
    if not self._accepting_jobs:
1649 6d5ea385 Michael Hanselmann
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1650 6d5ea385 Michael Hanselmann
1651 c8d0be94 Michael Hanselmann
    return fn(self, *args, **kwargs)
1652 c8d0be94 Michael Hanselmann
  return wrapper
1653 c8d0be94 Michael Hanselmann
1654 c8d0be94 Michael Hanselmann
1655 6c881c52 Iustin Pop
class JobQueue(object):
1656 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
1657 db37da70 Michael Hanselmann

1658 6c881c52 Iustin Pop
  """
1659 85f03e0d Michael Hanselmann
  def __init__(self, context):
1660 ea03467c Iustin Pop
    """Constructor for JobQueue.
1661 ea03467c Iustin Pop

1662 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
1663 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
1664 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
1665 ea03467c Iustin Pop
    running).
1666 ea03467c Iustin Pop

1667 ea03467c Iustin Pop
    @type context: GanetiContext
1668 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
1669 ea03467c Iustin Pop
        data and other ganeti objects
1670 ea03467c Iustin Pop

1671 ea03467c Iustin Pop
    """
1672 5bdce580 Michael Hanselmann
    self.context = context
1673 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
1674 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
1675 f1da30e6 Michael Hanselmann
1676 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
1677 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
1678 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
1679 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
1680 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
1681 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
1682 ebb80afa Guido Trotter
1683 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
1684 ebb80afa Guido Trotter
    self.release = self._lock.release
1685 85f03e0d Michael Hanselmann
1686 6d5ea385 Michael Hanselmann
    # Accept jobs by default
1687 6d5ea385 Michael Hanselmann
    self._accepting_jobs = True
1688 6d5ea385 Michael Hanselmann
1689 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
1690 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
1691 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1692 f1da30e6 Michael Hanselmann
1693 04ab05ce Michael Hanselmann
    # Read serial file
1694 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
1695 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
1696 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
1697 c4beba1c Iustin Pop
1698 23752136 Michael Hanselmann
    # Get initial list of nodes
1699 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
1700 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
1701 59303563 Iustin Pop
                       if n.master_candidate)
1702 8e00939c Michael Hanselmann
1703 8e00939c Michael Hanselmann
    # Remove master node
1704 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
1705 23752136 Michael Hanselmann
1706 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
1707 23752136 Michael Hanselmann
1708 6d5ea385 Michael Hanselmann
    self._queue_size = None
1709 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1710 6d5ea385 Michael Hanselmann
    assert ht.TInt(self._queue_size)
1711 ff699aa9 Michael Hanselmann
    self._drained = jstore.CheckDrainFlag()
1712 20571a26 Guido Trotter
1713 b95479a5 Michael Hanselmann
    # Job dependencies
1714 b95479a5 Michael Hanselmann
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1715 b95479a5 Michael Hanselmann
                                        self._EnqueueJobs)
1716 fcb21ad7 Michael Hanselmann
    self.context.glm.AddToLockMonitor(self.depmgr)
1717 b95479a5 Michael Hanselmann
1718 85f03e0d Michael Hanselmann
    # Setup worker pool
1719 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
1720 711b5124 Michael Hanselmann
1721 fcd70b89 Klaus Aehlig
  def _PickupJobUnlocked(self, job_id):
1722 fcd70b89 Klaus Aehlig
    """Load a job from the job queue
1723 fcd70b89 Klaus Aehlig

1724 fcd70b89 Klaus Aehlig
    Pick up a job that already is in the job queue and start/resume it.
1725 fcd70b89 Klaus Aehlig

1726 fcd70b89 Klaus Aehlig
    """
1727 fcd70b89 Klaus Aehlig
    job = self._LoadJobUnlocked(job_id)
1728 fcd70b89 Klaus Aehlig
1729 fcd70b89 Klaus Aehlig
    if job is None:
1730 fcd70b89 Klaus Aehlig
      logging.warning("Job %s could not be read", job_id)
1731 fcd70b89 Klaus Aehlig
      return
1732 fcd70b89 Klaus Aehlig
1733 be6cdf67 Michele Tartara
    job.AddReasons(pickup=True)
1734 be6cdf67 Michele Tartara
1735 fcd70b89 Klaus Aehlig
    status = job.CalcStatus()
1736 fcd70b89 Klaus Aehlig
    if status == constants.JOB_STATUS_QUEUED:
1737 fcd70b89 Klaus Aehlig
      self._EnqueueJobsUnlocked([job])
1738 fcd70b89 Klaus Aehlig
      logging.info("Restarting job %s", job.id)
1739 fcd70b89 Klaus Aehlig
1740 fcd70b89 Klaus Aehlig
    elif status in (constants.JOB_STATUS_RUNNING,
1741 fcd70b89 Klaus Aehlig
                    constants.JOB_STATUS_WAITING,
1742 fcd70b89 Klaus Aehlig
                    constants.JOB_STATUS_CANCELING):
1743 fcd70b89 Klaus Aehlig
      logging.warning("Unfinished job %s found: %s", job.id, job)
1744 fcd70b89 Klaus Aehlig
1745 fcd70b89 Klaus Aehlig
      if status == constants.JOB_STATUS_WAITING:
1746 fcd70b89 Klaus Aehlig
        job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1747 fcd70b89 Klaus Aehlig
        self._EnqueueJobsUnlocked([job])
1748 fcd70b89 Klaus Aehlig
        logging.info("Restarting job %s", job.id)
1749 fcd70b89 Klaus Aehlig
      else:
1750 f3ac6f36 Klaus Aehlig
        to_encode = errors.OpExecError("Unclean master daemon shutdown")
1751 fcd70b89 Klaus Aehlig
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1752 f3ac6f36 Klaus Aehlig
                              _EncodeOpError(to_encode))
1753 fcd70b89 Klaus Aehlig
        job.Finalize()
1754 fcd70b89 Klaus Aehlig
1755 fcd70b89 Klaus Aehlig
    self.UpdateJobUnlocked(job)
1756 fcd70b89 Klaus Aehlig
1757 fcd70b89 Klaus Aehlig
  @locking.ssynchronized(_LOCK)
1758 fcd70b89 Klaus Aehlig
  def PickupJob(self, job_id):
1759 fcd70b89 Klaus Aehlig
    self._PickupJobUnlocked(job_id)
1760 fcd70b89 Klaus Aehlig
1761 fb1ffbca Michael Hanselmann
  def _GetRpc(self, address_list):
1762 fb1ffbca Michael Hanselmann
    """Gets RPC runner with context.
1763 fb1ffbca Michael Hanselmann

1764 fb1ffbca Michael Hanselmann
    """
1765 fb1ffbca Michael Hanselmann
    return rpc.JobQueueRunner(self.context, address_list)
1766 fb1ffbca Michael Hanselmann
1767 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1768 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1769 99aabbed Iustin Pop
  def AddNode(self, node):
1770 99aabbed Iustin Pop
    """Register a new node with the queue.
1771 99aabbed Iustin Pop

1772 99aabbed Iustin Pop
    @type node: L{objects.Node}
1773 99aabbed Iustin Pop
    @param node: the node object to be added
1774 99aabbed Iustin Pop

1775 99aabbed Iustin Pop
    """
1776 99aabbed Iustin Pop
    node_name = node.name
1777 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1778 23752136 Michael Hanselmann
1779 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1780 fb1ffbca Michael Hanselmann
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1781 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1782 c8457ce7 Iustin Pop
    if msg:
1783 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1784 c8457ce7 Iustin Pop
                      node_name, msg)
1785 23752136 Michael Hanselmann
1786 59303563 Iustin Pop
    if not node.master_candidate:
1787 59303563 Iustin Pop
      # remove if existing, ignoring errors
1788 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1789 59303563 Iustin Pop
      # and skip the replication of the job ids
1790 59303563 Iustin Pop
      return
1791 59303563 Iustin Pop
1792 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1793 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1794 23752136 Michael Hanselmann
1795 d2e03a33 Michael Hanselmann
    # Upload current serial file
1796 e2b4a7ba Michael Hanselmann
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1797 d2e03a33 Michael Hanselmann
1798 fb1ffbca Michael Hanselmann
    # Static address list
1799 fb1ffbca Michael Hanselmann
    addrs = [node.primary_ip]
1800 fb1ffbca Michael Hanselmann
1801 d2e03a33 Michael Hanselmann
    for file_name in files:
1802 9f774ee8 Michael Hanselmann
      # Read file content
1803 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1804 9f774ee8 Michael Hanselmann
1805 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1806 cffbbae7 Michael Hanselmann
                             file_name, content)
1807 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1808 c8457ce7 Iustin Pop
      if msg:
1809 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1810 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1811 d2e03a33 Michael Hanselmann
1812 be6c403e Michael Hanselmann
    # Set queue drained flag
1813 be6c403e Michael Hanselmann
    result = \
1814 be6c403e Michael Hanselmann
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1815 be6c403e Michael Hanselmann
                                                       self._drained)
1816 be6c403e Michael Hanselmann
    msg = result[node_name].fail_msg
1817 be6c403e Michael Hanselmann
    if msg:
1818 be6c403e Michael Hanselmann
      logging.error("Failed to set queue drained flag on node %s: %s",
1819 be6c403e Michael Hanselmann
                    node_name, msg)
1820 be6c403e Michael Hanselmann
1821 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1822 d2e03a33 Michael Hanselmann
1823 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1824 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1825 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1826 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1827 ea03467c Iustin Pop

1828 ea03467c Iustin Pop
    @type node_name: str
1829 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1830 ea03467c Iustin Pop

1831 ea03467c Iustin Pop
    """
1832 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1833 23752136 Michael Hanselmann
1834 7e950d31 Iustin Pop
  @staticmethod
1835 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1836 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1837 ea03467c Iustin Pop

1838 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1839 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1840 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1841 ea03467c Iustin Pop

1842 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1843 ea03467c Iustin Pop
    @type nodes: list
1844 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1845 ea03467c Iustin Pop
    @type failmsg: str
1846 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1847 ea03467c Iustin Pop

1848 ea03467c Iustin Pop
    """
1849 e74798c1 Michael Hanselmann
    failed = []
1850 e74798c1 Michael Hanselmann
    success = []
1851 e74798c1 Michael Hanselmann
1852 e74798c1 Michael Hanselmann
    for node in nodes:
1853 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1854 c8457ce7 Iustin Pop
      if msg:
1855 e74798c1 Michael Hanselmann
        failed.append(node)
1856 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1857 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1858 c8457ce7 Iustin Pop
      else:
1859 c8457ce7 Iustin Pop
        success.append(node)
1860 e74798c1 Michael Hanselmann
1861 e74798c1 Michael Hanselmann
    # +1 for the master node
1862 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1863 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1864 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1865 e74798c1 Michael Hanselmann
1866 99aabbed Iustin Pop
  def _GetNodeIp(self):
1867 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1868 99aabbed Iustin Pop

1869 ea03467c Iustin Pop
    @rtype: (list, list)
1870 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1871 ea03467c Iustin Pop
        names and the second one with the node addresses
1872 ea03467c Iustin Pop

1873 99aabbed Iustin Pop
    """
1874 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1875 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1876 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1877 99aabbed Iustin Pop
    return name_list, addr_list
1878 99aabbed Iustin Pop
1879 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1880 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1881 8e00939c Michael Hanselmann

1882 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1883 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1884 ea03467c Iustin Pop

1885 ea03467c Iustin Pop
    @type file_name: str
1886 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1887 ea03467c Iustin Pop
    @type data: str
1888 ea03467c Iustin Pop
    @param data: the new contents of the file
1889 4c36bdf5 Guido Trotter
    @type replicate: boolean
1890 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1891 ea03467c Iustin Pop

1892 8e00939c Michael Hanselmann
    """
1893 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1894 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1895 fe05a931 Michele Tartara
                    gid=getents.daemons_gid,
1896 fe05a931 Michele Tartara
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1897 8e00939c Michael Hanselmann
1898 4c36bdf5 Guido Trotter
    if replicate:
1899 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1900 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1901 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1902 23752136 Michael Hanselmann
1903 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1904 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1905 ea03467c Iustin Pop

1906 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1907 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1908 ea03467c Iustin Pop

1909 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1910 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1911 ea03467c Iustin Pop

1912 ea03467c Iustin Pop
    """
1913 dd875d32 Michael Hanselmann
    # Rename them locally
1914 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1915 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1916 abc1f2ce Michael Hanselmann
1917 dd875d32 Michael Hanselmann
    # ... and on all nodes
1918 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1919 fb1ffbca Michael Hanselmann
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1920 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1921 abc1f2ce Michael Hanselmann
1922 85f03e0d Michael Hanselmann
  @staticmethod
1923 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1924 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1925 ea03467c Iustin Pop

1926 ea03467c Iustin Pop
    @type job_id: str
1927 ea03467c Iustin Pop
    @param job_id: the job identifier
1928 ea03467c Iustin Pop
    @rtype: str
1929 ea03467c Iustin Pop
    @return: the path to the job file
1930 ea03467c Iustin Pop

1931 ea03467c Iustin Pop
    """
1932 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1933 f1da30e6 Michael Hanselmann
1934 1410a389 Michael Hanselmann
  @staticmethod
1935 1410a389 Michael Hanselmann
  def _GetArchivedJobPath(job_id):
1936 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1937 ea03467c Iustin Pop

1938 ea03467c Iustin Pop
    @type job_id: str
1939 ea03467c Iustin Pop
    @param job_id: the job identifier
1940 ea03467c Iustin Pop
    @rtype: str
1941 ea03467c Iustin Pop
    @return: the path to the archived job file
1942 ea03467c Iustin Pop

1943 ea03467c Iustin Pop
    """
1944 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1945 1410a389 Michael Hanselmann
                          jstore.GetArchiveDirectory(job_id),
1946 1410a389 Michael Hanselmann
                          "job-%s" % job_id)
1947 0cb94105 Michael Hanselmann
1948 cb66225d Michael Hanselmann
  @staticmethod
1949 0422250e Michael Hanselmann
  def _DetermineJobDirectories(archived):
1950 bb921668 Michael Hanselmann
    """Build list of directories containing job files.
1951 bb921668 Michael Hanselmann

1952 bb921668 Michael Hanselmann
    @type archived: bool
1953 bb921668 Michael Hanselmann
    @param archived: Whether to include directories for archived jobs
1954 bb921668 Michael Hanselmann
    @rtype: list
1955 bb921668 Michael Hanselmann

1956 bb921668 Michael Hanselmann
    """
1957 0422250e Michael Hanselmann
    result = [pathutils.QUEUE_DIR]
1958 0422250e Michael Hanselmann
1959 0422250e Michael Hanselmann
    if archived:
1960 0422250e Michael Hanselmann
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1961 0422250e Michael Hanselmann
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1962 0422250e Michael Hanselmann
                        utils.ListVisibleFiles(archive_path)))
1963 0422250e Michael Hanselmann
1964 0422250e Michael Hanselmann
    return result
1965 0422250e Michael Hanselmann
1966 0422250e Michael Hanselmann
  @classmethod
1967 0422250e Michael Hanselmann
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1968 911a495b Iustin Pop
    """Return all known job IDs.
1969 911a495b Iustin Pop

1970 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1971 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1972 ac0930b9 Iustin Pop
    extra IDs).
1973 ac0930b9 Iustin Pop

1974 85a1c57d Guido Trotter
    @type sort: boolean
1975 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1976 ea03467c Iustin Pop
    @rtype: list
1977 ea03467c Iustin Pop
    @return: the list of job IDs
1978 ea03467c Iustin Pop

1979 911a495b Iustin Pop
    """
1980 85a1c57d Guido Trotter
    jlist = []
1981 0422250e Michael Hanselmann
1982 0422250e Michael Hanselmann
    for path in cls._DetermineJobDirectories(archived):
1983 0422250e Michael Hanselmann
      for filename in utils.ListVisibleFiles(path):
1984 0422250e Michael Hanselmann
        m = constants.JOB_FILE_RE.match(filename)
1985 0422250e Michael Hanselmann
        if m:
1986 0422250e Michael Hanselmann
          jlist.append(int(m.group(1)))
1987 0422250e Michael Hanselmann
1988 85a1c57d Guido Trotter
    if sort:
1989 76b62028 Iustin Pop
      jlist.sort()
1990 f0d874fe Iustin Pop
    return jlist
1991 911a495b Iustin Pop
1992 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1993 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1994 ea03467c Iustin Pop

1995 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1996 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1997 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1998 ea03467c Iustin Pop

1999 76b62028 Iustin Pop
    @type job_id: int
2000 ea03467c Iustin Pop
    @param job_id: the job id
2001 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
2002 ea03467c Iustin Pop
    @return: either None or the job object
2003 ea03467c Iustin Pop

2004 ea03467c Iustin Pop
    """
2005 95a4e33f Hrvoje Ribicic
    assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
2006 95a4e33f Hrvoje Ribicic
2007 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
2008 5685c1a5 Michael Hanselmann
    if job:
2009 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
2010 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Found read-only job in memcache"
2011 5685c1a5 Michael Hanselmann
      return job
2012 ac0930b9 Iustin Pop
2013 3d6c5566 Guido Trotter
    try:
2014 194c8ca4 Michael Hanselmann
      job = self._LoadJobFromDisk(job_id, False)
2015 aa9f8167 Iustin Pop
      if job is None:
2016 aa9f8167 Iustin Pop
        return job
2017 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
2018 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
2019 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
2020 3d6c5566 Guido Trotter
      if old_path == new_path:
2021 3d6c5566 Guido Trotter
        # job already archived (future case)
2022 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
2023 3d6c5566 Guido Trotter
      else:
2024 3d6c5566 Guido Trotter
        # non-archived case
2025 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
2026 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
2027 3d6c5566 Guido Trotter
      return None
2028 162c8636 Guido Trotter
2029 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Job just loaded is not writable"
2030 c0f6d0d8 Michael Hanselmann
2031 162c8636 Guido Trotter
    self._memcache[job_id] = job
2032 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
2033 162c8636 Guido Trotter
    return job
2034 162c8636 Guido Trotter
2035 c0f6d0d8 Michael Hanselmann
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2036 162c8636 Guido Trotter
    """Load the given job file from disk.
2037 162c8636 Guido Trotter

2038 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2039 162c8636 Guido Trotter

2040 76b62028 Iustin Pop
    @type job_id: int
2041 162c8636 Guido Trotter
    @param job_id: job identifier
2042 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2043 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2044 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
2045 162c8636 Guido Trotter
    @return: either None or the job object
2046 162c8636 Guido Trotter

2047 162c8636 Guido Trotter
    """
2048 8a3cd185 Michael Hanselmann
    path_functions = [(self._GetJobPath, False)]
2049 194c8ca4 Michael Hanselmann
2050 194c8ca4 Michael Hanselmann
    if try_archived:
2051 8a3cd185 Michael Hanselmann
      path_functions.append((self._GetArchivedJobPath, True))
2052 194c8ca4 Michael Hanselmann
2053 194c8ca4 Michael Hanselmann
    raw_data = None
2054 8a3cd185 Michael Hanselmann
    archived = None
2055 194c8ca4 Michael Hanselmann
2056 8a3cd185 Michael Hanselmann
    for (fn, archived) in path_functions:
2057 194c8ca4 Michael Hanselmann
      filepath = fn(job_id)
2058 194c8ca4 Michael Hanselmann
      logging.debug("Loading job from %s", filepath)
2059 194c8ca4 Michael Hanselmann
      try:
2060 194c8ca4 Michael Hanselmann
        raw_data = utils.ReadFile(filepath)
2061 194c8ca4 Michael Hanselmann
      except EnvironmentError, err:
2062 194c8ca4 Michael Hanselmann
        if err.errno != errno.ENOENT:
2063 194c8ca4 Michael Hanselmann
          raise
2064 194c8ca4 Michael Hanselmann
      else:
2065 194c8ca4 Michael Hanselmann
        break
2066 194c8ca4 Michael Hanselmann
2067 194c8ca4 Michael Hanselmann
    if not raw_data:
2068 194c8ca4 Michael Hanselmann
      return None
2069 13998ef2 Michael Hanselmann
2070 c0f6d0d8 Michael Hanselmann
    if writable is None:
2071 8a3cd185 Michael Hanselmann
      writable = not archived
2072 c0f6d0d8 Michael Hanselmann
2073 94ed59a5 Iustin Pop
    try:
2074 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
2075 8a3cd185 Michael Hanselmann
      job = _QueuedJob.Restore(self, data, writable, archived)
2076 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
2077 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
2078 94ed59a5 Iustin Pop
2079 ac0930b9 Iustin Pop
    return job
2080 f1da30e6 Michael Hanselmann
2081 c0f6d0d8 Michael Hanselmann
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2082 0f9c08dc Guido Trotter
    """Load the given job file from disk.
2083 0f9c08dc Guido Trotter

2084 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2085 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
2086 0f9c08dc Guido Trotter
    exception is logged.
2087 0f9c08dc Guido Trotter

2088 76b62028 Iustin Pop
    @type job_id: int
2089 0f9c08dc Guido Trotter
    @param job_id: job identifier
2090 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2091 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2092 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
2093 0f9c08dc Guido Trotter
    @return: either None or the job object
2094 0f9c08dc Guido Trotter

2095 0f9c08dc Guido Trotter
    """
2096 0f9c08dc Guido Trotter
    try:
2097 c0f6d0d8 Michael Hanselmann
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2098 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
2099 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
2100 0f9c08dc Guido Trotter
      return None
2101 0f9c08dc Guido Trotter
2102 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
2103 20571a26 Guido Trotter
    """Update the queue size.
2104 20571a26 Guido Trotter

2105 20571a26 Guido Trotter
    """
2106 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2107 20571a26 Guido Trotter
2108 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2109 20571a26 Guido Trotter
  @_RequireOpenQueue
2110 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
2111 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
2112 3ccafd0e Iustin Pop

2113 ea03467c Iustin Pop
    @type drain_flag: boolean
2114 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
2115 ea03467c Iustin Pop

2116 3ccafd0e Iustin Pop
    """
2117 be6c403e Michael Hanselmann
    # Change flag locally
2118 ff699aa9 Michael Hanselmann
    jstore.SetDrainFlag(drain_flag)
2119 20571a26 Guido Trotter
2120 20571a26 Guido Trotter
    self._drained = drain_flag
2121 20571a26 Guido Trotter
2122 be6c403e Michael Hanselmann
    # ... and on all nodes
2123 be6c403e Michael Hanselmann
    (names, addrs) = self._GetNodeIp()
2124 be6c403e Michael Hanselmann
    result = \
2125 be6c403e Michael Hanselmann
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2126 be6c403e Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
2127 be6c403e Michael Hanselmann
                         "Setting queue drain flag to %s" % drain_flag)
2128 be6c403e Michael Hanselmann
2129 3ccafd0e Iustin Pop
    return True
2130 3ccafd0e Iustin Pop
2131 704b51ff Klaus Aehlig
  @classmethod
2132 704b51ff Klaus Aehlig
  def SubmitJob(cls, ops):
2133 2971c913 Iustin Pop
    """Create and store a new job.
2134 2971c913 Iustin Pop

2135 2971c913 Iustin Pop
    """
2136 704b51ff Klaus Aehlig
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops)
2137 2971c913 Iustin Pop
2138 704b51ff Klaus Aehlig
  @classmethod
2139 704b51ff Klaus Aehlig
  def SubmitJobToDrainedQueue(cls, ops):
2140 346c3037 Klaus Aehlig
    """Forcefully create and store a new job.
2141 346c3037 Klaus Aehlig

2142 346c3037 Klaus Aehlig
    Do so, even if the job queue is drained.
2143 346c3037 Klaus Aehlig

2144 346c3037 Klaus Aehlig
    """
2145 704b51ff Klaus Aehlig
    return luxi.Client(address=pathutils.QUERY_SOCKET)\
2146 704b51ff Klaus Aehlig
        .SubmitJobToDrainedQueue(ops)
2147 346c3037 Klaus Aehlig
2148 704b51ff Klaus Aehlig
  @classmethod
2149 704b51ff Klaus Aehlig
  def SubmitManyJobs(cls, jobs):
2150 2971c913 Iustin Pop
    """Create and store multiple jobs.
2151 2971c913 Iustin Pop

2152 2971c913 Iustin Pop
    """
2153 704b51ff Klaus Aehlig
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
2154 2971c913 Iustin Pop
2155 b247c6fc Michael Hanselmann
  @staticmethod
2156 b247c6fc Michael Hanselmann
  def _FormatSubmitError(msg, ops):
2157 b247c6fc Michael Hanselmann
    """Formats errors which occurred while submitting a job.
2158 b247c6fc Michael Hanselmann

2159 b247c6fc Michael Hanselmann
    """
2160 b247c6fc Michael Hanselmann
    return ("%s; opcodes %s" %
2161 b247c6fc Michael Hanselmann
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2162 b247c6fc Michael Hanselmann
2163 b247c6fc Michael Hanselmann
  @staticmethod
2164 b247c6fc Michael Hanselmann
  def _ResolveJobDependencies(resolve_fn, deps):
2165 b247c6fc Michael Hanselmann
    """Resolves relative job IDs in dependencies.
2166 b247c6fc Michael Hanselmann

2167 b247c6fc Michael Hanselmann
    @type resolve_fn: callable
2168 b247c6fc Michael Hanselmann
    @param resolve_fn: Function to resolve a relative job ID
2169 b247c6fc Michael Hanselmann
    @type deps: list
2170 b247c6fc Michael Hanselmann
    @param deps: Dependencies
2171 4c27b231 Michael Hanselmann
    @rtype: tuple; (boolean, string or list)
2172 4c27b231 Michael Hanselmann
    @return: If successful (first tuple item), the returned list contains
2173 4c27b231 Michael Hanselmann
      resolved job IDs along with the requested status; if not successful,
2174 4c27b231 Michael Hanselmann
      the second element is an error message
2175 b247c6fc Michael Hanselmann

2176 b247c6fc Michael Hanselmann
    """
2177 b247c6fc Michael Hanselmann
    result = []
2178 b247c6fc Michael Hanselmann
2179 b247c6fc Michael Hanselmann
    for (dep_job_id, dep_status) in deps:
2180 b247c6fc Michael Hanselmann
      if ht.TRelativeJobId(dep_job_id):
2181 b247c6fc Michael Hanselmann
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2182 b247c6fc Michael Hanselmann
        try:
2183 b247c6fc Michael Hanselmann
          job_id = resolve_fn(dep_job_id)
2184 b247c6fc Michael Hanselmann
        except IndexError:
2185 b247c6fc Michael Hanselmann
          # Abort
2186 b247c6fc Michael Hanselmann
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2187 b247c6fc Michael Hanselmann
      else:
2188 b247c6fc Michael Hanselmann
        job_id = dep_job_id
2189 b247c6fc Michael Hanselmann
2190 b247c6fc Michael Hanselmann
      result.append((job_id, dep_status))
2191 b247c6fc Michael Hanselmann
2192 b247c6fc Michael Hanselmann
    return (True, result)
2193 b247c6fc Michael Hanselmann
2194 75d81fc8 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2195 7b5c4a69 Michael Hanselmann
  def _EnqueueJobs(self, jobs):
2196 7b5c4a69 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2197 7b5c4a69 Michael Hanselmann

2198 7b5c4a69 Michael Hanselmann
    @type jobs: list
2199 7b5c4a69 Michael Hanselmann
    @param jobs: List of all jobs
2200 7b5c4a69 Michael Hanselmann

2201 7b5c4a69 Michael Hanselmann
    """
2202 75d81fc8 Michael Hanselmann
    return self._EnqueueJobsUnlocked(jobs)
2203 75d81fc8 Michael Hanselmann
2204 75d81fc8 Michael Hanselmann
  def _EnqueueJobsUnlocked(self, jobs):
2205 75d81fc8 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2206 75d81fc8 Michael Hanselmann

2207 75d81fc8 Michael Hanselmann
    @type jobs: list
2208 75d81fc8 Michael Hanselmann
    @param jobs: List of all jobs
2209 75d81fc8 Michael Hanselmann

2210 75d81fc8 Michael Hanselmann
    """
2211 75d81fc8 Michael Hanselmann
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2212 7b5c4a69 Michael Hanselmann
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2213 99fb250b Michael Hanselmann
                             priority=[job.CalcPriority() for job in jobs],
2214 99fb250b Michael Hanselmann
                             task_id=map(_GetIdAttr, jobs))
2215 7b5c4a69 Michael Hanselmann
2216 b95479a5 Michael Hanselmann
  def _GetJobStatusForDependencies(self, job_id):
2217 b95479a5 Michael Hanselmann
    """Gets the status of a job for dependencies.
2218 b95479a5 Michael Hanselmann

2219 76b62028 Iustin Pop
    @type job_id: int
2220 b95479a5 Michael Hanselmann
    @param job_id: Job ID
2221 b95479a5 Michael Hanselmann
    @raise errors.JobLost: If job can't be found
2222 b95479a5 Michael Hanselmann

2223 b95479a5 Michael Hanselmann
    """
2224 b95479a5 Michael Hanselmann
    # Not using in-memory cache as doing so would require an exclusive lock
2225 b95479a5 Michael Hanselmann
2226 b95479a5 Michael Hanselmann
    # Try to load from disk
2227 c0f6d0d8 Michael Hanselmann
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2228 c0f6d0d8 Michael Hanselmann
2229 17385bd2 Andrea Spadaccini
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2230 b95479a5 Michael Hanselmann
2231 b95479a5 Michael Hanselmann
    if job:
2232 b95479a5 Michael Hanselmann
      return job.CalcStatus()
2233 b95479a5 Michael Hanselmann
2234 b95479a5 Michael Hanselmann
    raise errors.JobLost("Job %s not found" % job_id)
2235 b95479a5 Michael Hanselmann
2236 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2237 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
2238 ea03467c Iustin Pop
    """Update a job's on disk storage.
2239 ea03467c Iustin Pop

2240 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
2241 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
2242 ea03467c Iustin Pop
    nodes.
2243 ea03467c Iustin Pop

2244 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
2245 ea03467c Iustin Pop
    @param job: the changed job
2246 4c36bdf5 Guido Trotter
    @type replicate: boolean
2247 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
2248 ea03467c Iustin Pop

2249 ea03467c Iustin Pop
    """
2250 66bd7445 Michael Hanselmann
    if __debug__:
2251 66bd7445 Michael Hanselmann
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2252 66bd7445 Michael Hanselmann
      assert (finalized ^ (job.end_timestamp is None))
2253 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't update read-only job"
2254 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't update archived job"
2255 66bd7445 Michael Hanselmann
2256 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
2257 a182a3ed Michael Hanselmann
    data = serializer.DumpJson(job.Serialize())
2258 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
2259 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
2260 ac0930b9 Iustin Pop
2261 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2262 5c735209 Iustin Pop
                        timeout):
2263 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
2264 6c5a7090 Michael Hanselmann

2265 76b62028 Iustin Pop
    @type job_id: int
2266 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
2267 6c5a7090 Michael Hanselmann
    @type fields: list of strings
2268 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
2269 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
2270 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
2271 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
2272 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
2273 5c735209 Iustin Pop
    @type timeout: float
2274 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
2275 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
2276 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
2277 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
2278 ea03467c Iustin Pop

2279 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
2280 ea03467c Iustin Pop
        we instead return a special value,
2281 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2282 ea03467c Iustin Pop
        as such by the clients
2283 6c5a7090 Michael Hanselmann

2284 6c5a7090 Michael Hanselmann
    """
2285 04569469 Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2286 c0f6d0d8 Michael Hanselmann
                             writable=False)
2287 989a8bee Michael Hanselmann
2288 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
2289 989a8bee Michael Hanselmann
2290 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
2291 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
2292 dfe57c22 Michael Hanselmann
2293 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2294 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2295 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
2296 188c5e0a Michael Hanselmann
    """Cancels a job.
2297 188c5e0a Michael Hanselmann

2298 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
2299 ea03467c Iustin Pop

2300 76b62028 Iustin Pop
    @type job_id: int
2301 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
2302 188c5e0a Michael Hanselmann

2303 188c5e0a Michael Hanselmann
    """
2304 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
2305 188c5e0a Michael Hanselmann
2306 aebd0e4e Michael Hanselmann
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2307 aebd0e4e Michael Hanselmann
2308 4679547e Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2309 4679547e Michael Hanselmann
  @_RequireOpenQueue
2310 4679547e Michael Hanselmann
  def ChangeJobPriority(self, job_id, priority):
2311 4679547e Michael Hanselmann
    """Changes a job's priority.
2312 4679547e Michael Hanselmann

2313 4679547e Michael Hanselmann
    @type job_id: int
2314 4679547e Michael Hanselmann
    @param job_id: ID of the job whose priority should be changed
2315 4679547e Michael Hanselmann
    @type priority: int
2316 4679547e Michael Hanselmann
    @param priority: New priority
2317 4679547e Michael Hanselmann

2318 4679547e Michael Hanselmann
    """
2319 4679547e Michael Hanselmann
    logging.info("Changing priority of job %s to %s", job_id, priority)
2320 4679547e Michael Hanselmann
2321 4679547e Michael Hanselmann
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2322 4679547e Michael Hanselmann
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2323 4679547e Michael Hanselmann
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2324 4679547e Michael Hanselmann
                                (priority, allowed))
2325 4679547e Michael Hanselmann
2326 4679547e Michael Hanselmann
    def fn(job):
2327 4679547e Michael Hanselmann
      (success, msg) = job.ChangePriority(priority)
2328 4679547e Michael Hanselmann
2329 4679547e Michael Hanselmann
      if success:
2330 4679547e Michael Hanselmann
        try:
2331 4679547e Michael Hanselmann
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2332 4679547e Michael Hanselmann
        except workerpool.NoSuchTask:
2333 4679547e Michael Hanselmann
          logging.debug("Job %s is not in workerpool at this time", job.id)
2334 4679547e Michael Hanselmann
2335 4679547e Michael Hanselmann
      return (success, msg)
2336 4679547e Michael Hanselmann
2337 4679547e Michael Hanselmann
    return self._ModifyJobUnlocked(job_id, fn)
2338 4679547e Michael Hanselmann
2339 aebd0e4e Michael Hanselmann
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2340 aebd0e4e Michael Hanselmann
    """Modifies a job.
2341 aebd0e4e Michael Hanselmann

2342 aebd0e4e Michael Hanselmann
    @type job_id: int
2343 aebd0e4e Michael Hanselmann
    @param job_id: Job ID
2344 aebd0e4e Michael Hanselmann
    @type mod_fn: callable
2345 aebd0e4e Michael Hanselmann
    @param mod_fn: Modifying function, receiving job object as parameter,
2346 aebd0e4e Michael Hanselmann
      returning tuple of (status boolean, message string)
2347 aebd0e4e Michael Hanselmann

2348 aebd0e4e Michael Hanselmann
    """
2349 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2350 188c5e0a Michael Hanselmann
    if not job:
2351 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2352 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
2353 fbf0262f Michael Hanselmann
2354 aebd0e4e Michael Hanselmann
    assert job.writable, "Can't modify read-only job"
2355 aebd0e4e Michael Hanselmann
    assert not job.archived, "Can't modify archived job"
2356 c0f6d0d8 Michael Hanselmann
2357 aebd0e4e Michael Hanselmann
    (success, msg) = mod_fn(job)
2358 188c5e0a Michael Hanselmann
2359 099b2870 Michael Hanselmann
    if success:
2360 66bd7445 Michael Hanselmann
      # If the job was finalized (e.g. cancelled), this is the final write
2361 66bd7445 Michael Hanselmann
      # allowed. The job can be archived anytime.
2362 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
2363 fbf0262f Michael Hanselmann
2364 099b2870 Michael Hanselmann
    return (success, msg)
2365 fbf0262f Michael Hanselmann
2366 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
2367 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
2368 d7fd1f28 Michael Hanselmann
    """Archives jobs.
2369 c609f802 Michael Hanselmann

2370 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
2371 25e7b43f Iustin Pop
    @param jobs: Job objects
2372 d7fd1f28 Michael Hanselmann
    @rtype: int
2373 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
2374 c609f802 Michael Hanselmann

2375 c609f802 Michael Hanselmann
    """
2376 d7fd1f28 Michael Hanselmann
    archive_jobs = []
2377 d7fd1f28 Michael Hanselmann
    rename_files = []
2378 d7fd1f28 Michael Hanselmann
    for job in jobs:
2379 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't archive read-only job"
2380 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't cancel archived job"
2381 c0f6d0d8 Michael Hanselmann
2382 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2383 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
2384 d7fd1f28 Michael Hanselmann
        continue
2385 c609f802 Michael Hanselmann
2386 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
2387 c609f802 Michael Hanselmann
2388 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
2389 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
2390 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
2391 c609f802 Michael Hanselmann
2392 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
2393 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
2394 f1da30e6 Michael Hanselmann
2395 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
2396 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
2397 d7fd1f28 Michael Hanselmann
2398 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2399 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
2400 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
2401 20571a26 Guido Trotter
    # archived jobs to fix this.
2402 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
2403 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
2404 78d12585 Michael Hanselmann
2405 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2406 07cd723a Iustin Pop
  @_RequireOpenQueue
2407 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
2408 07cd723a Iustin Pop
    """Archives a job.
2409 07cd723a Iustin Pop

2410 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2411 ea03467c Iustin Pop

2412 76b62028 Iustin Pop
    @type job_id: int
2413 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
2414 78d12585 Michael Hanselmann
    @rtype: bool
2415 78d12585 Michael Hanselmann
    @return: Whether job was archived
2416 07cd723a Iustin Pop

2417 07cd723a Iustin Pop
    """
2418 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
2419 78d12585 Michael Hanselmann
2420 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2421 78d12585 Michael Hanselmann
    if not job:
2422 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2423 78d12585 Michael Hanselmann
      return False
2424 78d12585 Michael Hanselmann
2425 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
2426 07cd723a Iustin Pop
2427 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2428 07cd723a Iustin Pop
  @_RequireOpenQueue
2429 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
2430 07cd723a Iustin Pop
    """Archives all jobs based on age.
2431 07cd723a Iustin Pop

2432 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
2433 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
2434 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
2435 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
2436 07cd723a Iustin Pop

2437 07cd723a Iustin Pop
    @type age: int
2438 07cd723a Iustin Pop
    @param age: the minimum age in seconds
2439 07cd723a Iustin Pop

2440 07cd723a Iustin Pop
    """
2441 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
2442 07cd723a Iustin Pop
2443 07cd723a Iustin Pop
    now = time.time()
2444 f8ad5591 Michael Hanselmann
    end_time = now + timeout
2445 f8ad5591 Michael Hanselmann
    archived_count = 0
2446 f8ad5591 Michael Hanselmann
    last_touched = 0
2447 f8ad5591 Michael Hanselmann
2448 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
2449 d7fd1f28 Michael Hanselmann
    pending = []
2450 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
2451 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
2452 f8ad5591 Michael Hanselmann
2453 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
2454 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
2455 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
2456 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
2457 f8ad5591 Michael Hanselmann
        break
2458 f8ad5591 Michael Hanselmann
2459 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
2460 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
2461 f8ad5591 Michael Hanselmann
      if job:
2462 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
2463 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
2464 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
2465 f8ad5591 Michael Hanselmann
          else:
2466 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
2467 07cd723a Iustin Pop
        else:
2468 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
2469 f8ad5591 Michael Hanselmann
2470 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
2471 d7fd1f28 Michael Hanselmann
          pending.append(job)
2472 d7fd1f28 Michael Hanselmann
2473 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
2474 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
2475 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
2476 d7fd1f28 Michael Hanselmann
            pending = []
2477 f8ad5591 Michael Hanselmann
2478 d7fd1f28 Michael Hanselmann
    if pending:
2479 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
2480 07cd723a Iustin Pop
2481 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
2482 07cd723a Iustin Pop
2483 e07f7f7a Michael Hanselmann
  def _Query(self, fields, qfilter):
2484 e07f7f7a Michael Hanselmann
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2485 e07f7f7a Michael Hanselmann
                       namefield="id")
2486 e07f7f7a Michael Hanselmann
2487 0422250e Michael Hanselmann
    # Archived jobs are only looked at if the "archived" field is referenced
2488 0422250e Michael Hanselmann
    # either as a requested field or in the filter. By default archived jobs
2489 0422250e Michael Hanselmann
    # are ignored.
2490 0422250e Michael Hanselmann
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2491 0422250e Michael Hanselmann
2492 e07f7f7a Michael Hanselmann
    job_ids = qobj.RequestedNames()
2493 e07f7f7a Michael Hanselmann
2494 e07f7f7a Michael Hanselmann
    list_all = (job_ids is None)
2495 e07f7f7a Michael Hanselmann
2496 e07f7f7a Michael Hanselmann
    if list_all:
2497 e07f7f7a Michael Hanselmann
      # Since files are added to/removed from the queue atomically, there's no
2498 e07f7f7a Michael Hanselmann
      # risk of getting the job ids in an inconsistent state.
2499 0422250e Michael Hanselmann
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2500 e07f7f7a Michael Hanselmann
2501 e07f7f7a Michael Hanselmann
    jobs = []
2502 e07f7f7a Michael Hanselmann
2503 e07f7f7a Michael Hanselmann
    for job_id in job_ids:
2504 e07f7f7a Michael Hanselmann
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2505 e07f7f7a Michael Hanselmann
      if job is not None or not list_all:
2506 e07f7f7a Michael Hanselmann
        jobs.append((job_id, job))
2507 e07f7f7a Michael Hanselmann
2508 e07f7f7a Michael Hanselmann
    return (qobj, jobs, list_all)
2509 e07f7f7a Michael Hanselmann
2510 e07f7f7a Michael Hanselmann
  def QueryJobs(self, fields, qfilter):
2511 e07f7f7a Michael Hanselmann
    """Returns a list of jobs in queue.
2512 e07f7f7a Michael Hanselmann

2513 e07f7f7a Michael Hanselmann
    @type fields: sequence
2514 e07f7f7a Michael Hanselmann
    @param fields: List of wanted fields
2515 e07f7f7a Michael Hanselmann
    @type qfilter: None or query2 filter (list)
2516 e07f7f7a Michael Hanselmann
    @param qfilter: Query filter
2517 e07f7f7a Michael Hanselmann

2518 e07f7f7a Michael Hanselmann
    """
2519 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2520 e07f7f7a Michael Hanselmann
2521 76b62028 Iustin Pop
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2522 e07f7f7a Michael Hanselmann
2523 e07f7f7a Michael Hanselmann
  def OldStyleQueryJobs(self, job_ids, fields):
2524 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
2525 e2715f69 Michael Hanselmann

2526 ea03467c Iustin Pop
    @type job_ids: list
2527 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
2528 ea03467c Iustin Pop
    @type fields: list
2529 ea03467c Iustin Pop
    @param fields: names of fields to return
2530 ea03467c Iustin Pop
    @rtype: list
2531 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
2532 ea03467c Iustin Pop
        the requested fields
2533 e2715f69 Michael Hanselmann

2534 e2715f69 Michael Hanselmann
    """
2535 76b62028 Iustin Pop
    # backwards compat:
2536 76b62028 Iustin Pop
    job_ids = [int(jid) for jid in job_ids]
2537 e07f7f7a Michael Hanselmann
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2538 e2715f69 Michael Hanselmann
2539 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2540 e2715f69 Michael Hanselmann
2541 76b62028 Iustin Pop
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2542 e2715f69 Michael Hanselmann
2543 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2544 6d5ea385 Michael Hanselmann
  def PrepareShutdown(self):
2545 6d5ea385 Michael Hanselmann
    """Prepare to stop the job queue.
2546 6d5ea385 Michael Hanselmann

2547 6d5ea385 Michael Hanselmann
    Disables execution of jobs in the workerpool and returns whether there are
2548 6d5ea385 Michael Hanselmann
    any jobs currently running. If the latter is the case, the job queue is not
2549 6d5ea385 Michael Hanselmann
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2550 6d5ea385 Michael Hanselmann
    be called without interfering with any job. Queued and unfinished jobs will
2551 6d5ea385 Michael Hanselmann
    be resumed next time.
2552 6d5ea385 Michael Hanselmann

2553 6d5ea385 Michael Hanselmann
    Once this function has been called no new job submissions will be accepted
2554 6d5ea385 Michael Hanselmann
    (see L{_RequireNonDrainedQueue}).
2555 6d5ea385 Michael Hanselmann

2556 6d5ea385 Michael Hanselmann
    @rtype: bool
2557 6d5ea385 Michael Hanselmann
    @return: Whether there are any running jobs
2558 6d5ea385 Michael Hanselmann

2559 6d5ea385 Michael Hanselmann
    """
2560 6d5ea385 Michael Hanselmann
    if self._accepting_jobs:
2561 6d5ea385 Michael Hanselmann
      self._accepting_jobs = False
2562 6d5ea385 Michael Hanselmann
2563 6d5ea385 Michael Hanselmann
      # Tell worker pool to stop processing pending tasks
2564 6d5ea385 Michael Hanselmann
      self._wpool.SetActive(False)
2565 6d5ea385 Michael Hanselmann
2566 6d5ea385 Michael Hanselmann
    return self._wpool.HasRunningTasks()
2567 6d5ea385 Michael Hanselmann
2568 942e2262 Michael Hanselmann
  def AcceptingJobsUnlocked(self):
2569 942e2262 Michael Hanselmann
    """Returns whether jobs are accepted.
2570 942e2262 Michael Hanselmann

2571 942e2262 Michael Hanselmann
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2572 942e2262 Michael Hanselmann
    queue is shutting down.
2573 942e2262 Michael Hanselmann

2574 942e2262 Michael Hanselmann
    @rtype: bool
2575 942e2262 Michael Hanselmann

2576 942e2262 Michael Hanselmann
    """
2577 942e2262 Michael Hanselmann
    return self._accepting_jobs
2578 942e2262 Michael Hanselmann
2579 6d5ea385 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2580 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2581 e2715f69 Michael Hanselmann
  def Shutdown(self):
2582 e2715f69 Michael Hanselmann
    """Stops the job queue.
2583 e2715f69 Michael Hanselmann

2584 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
2585 ea03467c Iustin Pop

2586 e2715f69 Michael Hanselmann
    """
2587 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
2588 85f03e0d Michael Hanselmann
2589 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
2590 a71f9c7d Guido Trotter
    self._queue_filelock = None