Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 178ad717

History | View | Annotate | Download (75.4 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 76b62028 Iustin Pop
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 e2715f69 Michael Hanselmann
import logging
33 f1da30e6 Michael Hanselmann
import errno
34 f1048938 Iustin Pop
import time
35 5685c1a5 Michael Hanselmann
import weakref
36 b95479a5 Michael Hanselmann
import threading
37 dfc8824a Michael Hanselmann
import itertools
38 99fb250b Michael Hanselmann
import operator
39 498ae1cc Iustin Pop
40 6c2549d6 Guido Trotter
try:
41 b459a848 Andrea Spadaccini
  # pylint: disable=E0611
42 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
43 6c2549d6 Guido Trotter
except ImportError:
44 6c2549d6 Guido Trotter
  import pyinotify
45 6c2549d6 Guido Trotter
46 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
47 e2715f69 Michael Hanselmann
from ganeti import constants
48 f1da30e6 Michael Hanselmann
from ganeti import serializer
49 e2715f69 Michael Hanselmann
from ganeti import workerpool
50 99bd4f0a Guido Trotter
from ganeti import locking
51 704b51ff Klaus Aehlig
from ganeti import luxi
52 f1da30e6 Michael Hanselmann
from ganeti import opcodes
53 580b1fdd Jose A. Lopes
from ganeti import opcodes_base
54 7a1ecaed Iustin Pop
from ganeti import errors
55 e2715f69 Michael Hanselmann
from ganeti import mcpu
56 7996a135 Iustin Pop
from ganeti import utils
57 04ab05ce Michael Hanselmann
from ganeti import jstore
58 4869595d Petr Pudlak
import ganeti.rpc.node as rpc
59 82b22e19 René Nussbaumer
from ganeti import runtime
60 a744b676 Manuel Franceschini
from ganeti import netutils
61 989a8bee Michael Hanselmann
from ganeti import compat
62 b95479a5 Michael Hanselmann
from ganeti import ht
63 a06c6ae8 Michael Hanselmann
from ganeti import query
64 a06c6ae8 Michael Hanselmann
from ganeti import qlang
65 e2b4a7ba Michael Hanselmann
from ganeti import pathutils
66 cffbbae7 Michael Hanselmann
from ganeti import vcluster
67 e2715f69 Michael Hanselmann
68 fbf0262f Michael Hanselmann
69 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
70 e2715f69 Michael Hanselmann
71 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
72 ebb80afa Guido Trotter
_LOCK = "_lock"
73 ebb80afa Guido Trotter
_QUEUE = "_queue"
74 99bd4f0a Guido Trotter
75 99fb250b Michael Hanselmann
#: Retrieves "id" attribute
76 99fb250b Michael Hanselmann
_GetIdAttr = operator.attrgetter("id")
77 99fb250b Michael Hanselmann
78 498ae1cc Iustin Pop
79 9728ae5d Iustin Pop
class CancelJob(Exception):
80 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
81 fbf0262f Michael Hanselmann

82 fbf0262f Michael Hanselmann
  """
83 fbf0262f Michael Hanselmann
84 fbf0262f Michael Hanselmann
85 942e2262 Michael Hanselmann
class QueueShutdown(Exception):
86 942e2262 Michael Hanselmann
  """Special exception to abort a job when the job queue is shutting down.
87 942e2262 Michael Hanselmann

88 942e2262 Michael Hanselmann
  """
89 942e2262 Michael Hanselmann
90 942e2262 Michael Hanselmann
91 70552c46 Michael Hanselmann
def TimeStampNow():
92 ea03467c Iustin Pop
  """Returns the current timestamp.
93 ea03467c Iustin Pop

94 ea03467c Iustin Pop
  @rtype: tuple
95 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
96 ea03467c Iustin Pop

97 ea03467c Iustin Pop
  """
98 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
99 70552c46 Michael Hanselmann
100 70552c46 Michael Hanselmann
101 cffbbae7 Michael Hanselmann
def _CallJqUpdate(runner, names, file_name, content):
102 cffbbae7 Michael Hanselmann
  """Updates job queue file after virtualizing filename.
103 cffbbae7 Michael Hanselmann

104 cffbbae7 Michael Hanselmann
  """
105 cffbbae7 Michael Hanselmann
  virt_file_name = vcluster.MakeVirtualPath(file_name)
106 cffbbae7 Michael Hanselmann
  return runner.call_jobqueue_update(names, virt_file_name, content)
107 cffbbae7 Michael Hanselmann
108 cffbbae7 Michael Hanselmann
109 a06c6ae8 Michael Hanselmann
class _SimpleJobQuery:
110 a06c6ae8 Michael Hanselmann
  """Wrapper for job queries.
111 a06c6ae8 Michael Hanselmann

112 a06c6ae8 Michael Hanselmann
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
113 a06c6ae8 Michael Hanselmann

114 a06c6ae8 Michael Hanselmann
  """
115 a06c6ae8 Michael Hanselmann
  def __init__(self, fields):
116 a06c6ae8 Michael Hanselmann
    """Initializes this class.
117 a06c6ae8 Michael Hanselmann

118 a06c6ae8 Michael Hanselmann
    """
119 a06c6ae8 Michael Hanselmann
    self._query = query.Query(query.JOB_FIELDS, fields)
120 a06c6ae8 Michael Hanselmann
121 a06c6ae8 Michael Hanselmann
  def __call__(self, job):
122 a06c6ae8 Michael Hanselmann
    """Executes a job query using cached field list.
123 a06c6ae8 Michael Hanselmann

124 a06c6ae8 Michael Hanselmann
    """
125 a06c6ae8 Michael Hanselmann
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
126 a06c6ae8 Michael Hanselmann
127 a06c6ae8 Michael Hanselmann
128 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
129 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
130 e2715f69 Michael Hanselmann

131 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
132 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
133 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
134 ea03467c Iustin Pop
  @ivar status: the current status
135 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
136 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
137 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
138 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
139 f1048938 Iustin Pop

140 e2715f69 Michael Hanselmann
  """
141 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
142 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
143 66d895a8 Iustin Pop
               "__weakref__"]
144 66d895a8 Iustin Pop
145 85f03e0d Michael Hanselmann
  def __init__(self, op):
146 66abb9ff Michael Hanselmann
    """Initializes instances of this class.
147 ea03467c Iustin Pop

148 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
149 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
150 ea03467c Iustin Pop

151 ea03467c Iustin Pop
    """
152 85f03e0d Michael Hanselmann
    self.input = op
153 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
154 85f03e0d Michael Hanselmann
    self.result = None
155 85f03e0d Michael Hanselmann
    self.log = []
156 70552c46 Michael Hanselmann
    self.start_timestamp = None
157 b9b5abcb Iustin Pop
    self.exec_timestamp = None
158 70552c46 Michael Hanselmann
    self.end_timestamp = None
159 f1da30e6 Michael Hanselmann
160 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
161 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
162 8f5c488d Michael Hanselmann
163 f1da30e6 Michael Hanselmann
  @classmethod
164 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
165 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
166 ea03467c Iustin Pop

167 ea03467c Iustin Pop
    @type state: dict
168 ea03467c Iustin Pop
    @param state: the serialized state
169 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
170 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
171 ea03467c Iustin Pop

172 ea03467c Iustin Pop
    """
173 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
174 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
175 85f03e0d Michael Hanselmann
    obj.status = state["status"]
176 85f03e0d Michael Hanselmann
    obj.result = state["result"]
177 85f03e0d Michael Hanselmann
    obj.log = state["log"]
178 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
179 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
180 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
181 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
182 f1da30e6 Michael Hanselmann
    return obj
183 f1da30e6 Michael Hanselmann
184 f1da30e6 Michael Hanselmann
  def Serialize(self):
185 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
186 ea03467c Iustin Pop

187 ea03467c Iustin Pop
    @rtype: dict
188 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
189 ea03467c Iustin Pop

190 ea03467c Iustin Pop
    """
191 6c5a7090 Michael Hanselmann
    return {
192 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
193 6c5a7090 Michael Hanselmann
      "status": self.status,
194 6c5a7090 Michael Hanselmann
      "result": self.result,
195 6c5a7090 Michael Hanselmann
      "log": self.log,
196 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
197 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
198 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
199 8f5c488d Michael Hanselmann
      "priority": self.priority,
200 6c5a7090 Michael Hanselmann
      }
201 f1048938 Iustin Pop
202 e2715f69 Michael Hanselmann
203 e2715f69 Michael Hanselmann
class _QueuedJob(object):
204 e2715f69 Michael Hanselmann
  """In-memory job representation.
205 e2715f69 Michael Hanselmann

206 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
207 ea03467c Iustin Pop
  be taken care of by users of this class.
208 ea03467c Iustin Pop

209 ea03467c Iustin Pop
  @type queue: L{JobQueue}
210 ea03467c Iustin Pop
  @ivar queue: the parent queue
211 ea03467c Iustin Pop
  @ivar id: the job ID
212 ea03467c Iustin Pop
  @type ops: list
213 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
214 ea03467c Iustin Pop
  @type log_serial: int
215 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
216 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
217 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
218 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
219 c0f6d0d8 Michael Hanselmann
  @ivar writable: Whether the job is allowed to be modified
220 e2715f69 Michael Hanselmann

221 e2715f69 Michael Hanselmann
  """
222 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
223 26d3fd2f Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
224 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
225 8a3cd185 Michael Hanselmann
               "__weakref__", "processor_lock", "writable", "archived"]
226 66d895a8 Iustin Pop
227 e0f2bf1e Michele Tartara
  def _AddReasons(self):
228 e0f2bf1e Michele Tartara
    """Extend the reason trail
229 e0f2bf1e Michele Tartara

230 e0f2bf1e Michele Tartara
    Add the reason for all the opcodes of this job to be executed.
231 e0f2bf1e Michele Tartara

232 e0f2bf1e Michele Tartara
    """
233 e0f2bf1e Michele Tartara
    count = 0
234 e0f2bf1e Michele Tartara
    for queued_op in self.ops:
235 e0f2bf1e Michele Tartara
      op = queued_op.input
236 580b1fdd Jose A. Lopes
      reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__)
237 e0f2bf1e Michele Tartara
      reason_text = "job=%d;index=%d" % (self.id, count)
238 e0f2bf1e Michele Tartara
      reason = getattr(op, "reason", [])
239 e0f2bf1e Michele Tartara
      reason.append((reason_src, reason_text, utils.EpochNano()))
240 e0f2bf1e Michele Tartara
      op.reason = reason
241 e0f2bf1e Michele Tartara
      count = count + 1
242 e0f2bf1e Michele Tartara
243 c0f6d0d8 Michael Hanselmann
  def __init__(self, queue, job_id, ops, writable):
244 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
245 ea03467c Iustin Pop

246 ea03467c Iustin Pop
    @type queue: L{JobQueue}
247 ea03467c Iustin Pop
    @param queue: our parent queue
248 ea03467c Iustin Pop
    @type job_id: job_id
249 ea03467c Iustin Pop
    @param job_id: our job id
250 ea03467c Iustin Pop
    @type ops: list
251 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
252 ea03467c Iustin Pop
        in _QueuedOpCodes
253 c0f6d0d8 Michael Hanselmann
    @type writable: bool
254 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
255 ea03467c Iustin Pop

256 ea03467c Iustin Pop
    """
257 e2715f69 Michael Hanselmann
    if not ops:
258 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
259 e2715f69 Michael Hanselmann
260 85f03e0d Michael Hanselmann
    self.queue = queue
261 76b62028 Iustin Pop
    self.id = int(job_id)
262 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
263 e0f2bf1e Michele Tartara
    self._AddReasons()
264 6c5a7090 Michael Hanselmann
    self.log_serial = 0
265 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
266 c56ec146 Iustin Pop
    self.start_timestamp = None
267 c56ec146 Iustin Pop
    self.end_timestamp = None
268 8a3cd185 Michael Hanselmann
    self.archived = False
269 6c5a7090 Michael Hanselmann
270 c0f6d0d8 Michael Hanselmann
    self._InitInMemory(self, writable)
271 fa4aa6b4 Michael Hanselmann
272 8a3cd185 Michael Hanselmann
    assert not self.archived, "New jobs can not be marked as archived"
273 8a3cd185 Michael Hanselmann
274 fa4aa6b4 Michael Hanselmann
  @staticmethod
275 c0f6d0d8 Michael Hanselmann
  def _InitInMemory(obj, writable):
276 fa4aa6b4 Michael Hanselmann
    """Initializes in-memory variables.
277 fa4aa6b4 Michael Hanselmann

278 fa4aa6b4 Michael Hanselmann
    """
279 c0f6d0d8 Michael Hanselmann
    obj.writable = writable
280 03b63608 Michael Hanselmann
    obj.ops_iter = None
281 26d3fd2f Michael Hanselmann
    obj.cur_opctx = None
282 f8a4adfa Michael Hanselmann
283 f8a4adfa Michael Hanselmann
    # Read-only jobs are not processed and therefore don't need a lock
284 f8a4adfa Michael Hanselmann
    if writable:
285 f8a4adfa Michael Hanselmann
      obj.processor_lock = threading.Lock()
286 f8a4adfa Michael Hanselmann
    else:
287 f8a4adfa Michael Hanselmann
      obj.processor_lock = None
288 be760ba8 Michael Hanselmann
289 9fa2e150 Michael Hanselmann
  def __repr__(self):
290 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
291 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
292 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
293 9fa2e150 Michael Hanselmann
294 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
295 9fa2e150 Michael Hanselmann
296 f1da30e6 Michael Hanselmann
  @classmethod
297 8a3cd185 Michael Hanselmann
  def Restore(cls, queue, state, writable, archived):
298 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
299 ea03467c Iustin Pop

300 ea03467c Iustin Pop
    @type queue: L{JobQueue}
301 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
302 ea03467c Iustin Pop
    @type state: dict
303 ea03467c Iustin Pop
    @param state: the serialized state
304 c0f6d0d8 Michael Hanselmann
    @type writable: bool
305 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
306 8a3cd185 Michael Hanselmann
    @type archived: bool
307 8a3cd185 Michael Hanselmann
    @param archived: Whether job was already archived
308 ea03467c Iustin Pop
    @rtype: _JobQueue
309 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
310 ea03467c Iustin Pop

311 ea03467c Iustin Pop
    """
312 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
313 85f03e0d Michael Hanselmann
    obj.queue = queue
314 76b62028 Iustin Pop
    obj.id = int(state["id"])
315 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
316 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
317 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
318 8a3cd185 Michael Hanselmann
    obj.archived = archived
319 6c5a7090 Michael Hanselmann
320 6c5a7090 Michael Hanselmann
    obj.ops = []
321 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
322 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
323 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
324 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
325 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
326 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
327 6c5a7090 Michael Hanselmann
328 c0f6d0d8 Michael Hanselmann
    cls._InitInMemory(obj, writable)
329 be760ba8 Michael Hanselmann
330 f1da30e6 Michael Hanselmann
    return obj
331 f1da30e6 Michael Hanselmann
332 f1da30e6 Michael Hanselmann
  def Serialize(self):
333 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
334 ea03467c Iustin Pop

335 ea03467c Iustin Pop
    @rtype: dict
336 ea03467c Iustin Pop
    @return: the serialized state
337 ea03467c Iustin Pop

338 ea03467c Iustin Pop
    """
339 f1da30e6 Michael Hanselmann
    return {
340 f1da30e6 Michael Hanselmann
      "id": self.id,
341 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
342 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
343 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
344 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
345 f1da30e6 Michael Hanselmann
      }
346 f1da30e6 Michael Hanselmann
347 85f03e0d Michael Hanselmann
  def CalcStatus(self):
348 ea03467c Iustin Pop
    """Compute the status of this job.
349 ea03467c Iustin Pop

350 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
351 ea03467c Iustin Pop
    based on their status, computes the job status.
352 ea03467c Iustin Pop

353 ea03467c Iustin Pop
    The algorithm is:
354 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
355 ea03467c Iustin Pop
        status will be the same
356 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
357 ea03467c Iustin Pop
          - waitlock
358 fbf0262f Michael Hanselmann
          - canceling
359 ea03467c Iustin Pop
          - running
360 ea03467c Iustin Pop

361 ea03467c Iustin Pop
        will determine the job status
362 ea03467c Iustin Pop

363 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
364 ea03467c Iustin Pop
        and the job status will be the same
365 ea03467c Iustin Pop

366 ea03467c Iustin Pop
    @return: the job status
367 ea03467c Iustin Pop

368 ea03467c Iustin Pop
    """
369 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
370 e2715f69 Michael Hanselmann
371 e2715f69 Michael Hanselmann
    all_success = True
372 85f03e0d Michael Hanselmann
    for op in self.ops:
373 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
374 e2715f69 Michael Hanselmann
        continue
375 e2715f69 Michael Hanselmann
376 e2715f69 Michael Hanselmann
      all_success = False
377 e2715f69 Michael Hanselmann
378 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
379 e2715f69 Michael Hanselmann
        pass
380 47099cd1 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING:
381 47099cd1 Michael Hanselmann
        status = constants.JOB_STATUS_WAITING
382 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
383 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
384 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
385 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
386 fbf0262f Michael Hanselmann
        break
387 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
388 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
389 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
390 f1da30e6 Michael Hanselmann
        break
391 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
392 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
393 4cb1d919 Michael Hanselmann
        break
394 e2715f69 Michael Hanselmann
395 e2715f69 Michael Hanselmann
    if all_success:
396 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
397 e2715f69 Michael Hanselmann
398 e2715f69 Michael Hanselmann
    return status
399 e2715f69 Michael Hanselmann
400 8f5c488d Michael Hanselmann
  def CalcPriority(self):
401 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
402 8f5c488d Michael Hanselmann

403 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
404 8f5c488d Michael Hanselmann
    priority is used.
405 8f5c488d Michael Hanselmann

406 8f5c488d Michael Hanselmann
    @rtype: int
407 8f5c488d Michael Hanselmann

408 8f5c488d Michael Hanselmann
    """
409 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
410 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
411 8f5c488d Michael Hanselmann
412 8f5c488d Michael Hanselmann
    if not priorities:
413 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
414 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
415 8f5c488d Michael Hanselmann
416 8f5c488d Michael Hanselmann
    return min(priorities)
417 8f5c488d Michael Hanselmann
418 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
419 ea03467c Iustin Pop
    """Selectively returns the log entries.
420 ea03467c Iustin Pop

421 ea03467c Iustin Pop
    @type newer_than: None or int
422 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
423 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
424 ea03467c Iustin Pop
        than this value
425 ea03467c Iustin Pop
    @rtype: list
426 ea03467c Iustin Pop
    @return: the list of the log entries selected
427 ea03467c Iustin Pop

428 ea03467c Iustin Pop
    """
429 6c5a7090 Michael Hanselmann
    if newer_than is None:
430 6c5a7090 Michael Hanselmann
      serial = -1
431 6c5a7090 Michael Hanselmann
    else:
432 6c5a7090 Michael Hanselmann
      serial = newer_than
433 6c5a7090 Michael Hanselmann
434 6c5a7090 Michael Hanselmann
    entries = []
435 6c5a7090 Michael Hanselmann
    for op in self.ops:
436 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
437 6c5a7090 Michael Hanselmann
438 6c5a7090 Michael Hanselmann
    return entries
439 6c5a7090 Michael Hanselmann
440 6a290889 Guido Trotter
  def GetInfo(self, fields):
441 6a290889 Guido Trotter
    """Returns information about a job.
442 6a290889 Guido Trotter

443 6a290889 Guido Trotter
    @type fields: list
444 6a290889 Guido Trotter
    @param fields: names of fields to return
445 6a290889 Guido Trotter
    @rtype: list
446 6a290889 Guido Trotter
    @return: list with one element for each field
447 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
448 6a290889 Guido Trotter
        has been passed
449 6a290889 Guido Trotter

450 6a290889 Guido Trotter
    """
451 a06c6ae8 Michael Hanselmann
    return _SimpleJobQuery(fields)(self)
452 6a290889 Guido Trotter
453 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
454 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
455 34327f51 Iustin Pop

456 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
457 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
458 34327f51 Iustin Pop
    finalised are not changed.
459 34327f51 Iustin Pop

460 34327f51 Iustin Pop
    @param status: a given opcode status
461 34327f51 Iustin Pop
    @param result: the opcode result
462 34327f51 Iustin Pop

463 34327f51 Iustin Pop
    """
464 747f6113 Michael Hanselmann
    not_marked = True
465 747f6113 Michael Hanselmann
    for op in self.ops:
466 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
467 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
468 747f6113 Michael Hanselmann
        continue
469 747f6113 Michael Hanselmann
      op.status = status
470 747f6113 Michael Hanselmann
      op.result = result
471 747f6113 Michael Hanselmann
      not_marked = False
472 34327f51 Iustin Pop
473 66bd7445 Michael Hanselmann
  def Finalize(self):
474 66bd7445 Michael Hanselmann
    """Marks the job as finalized.
475 66bd7445 Michael Hanselmann

476 66bd7445 Michael Hanselmann
    """
477 66bd7445 Michael Hanselmann
    self.end_timestamp = TimeStampNow()
478 66bd7445 Michael Hanselmann
479 099b2870 Michael Hanselmann
  def Cancel(self):
480 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
481 a0d2fe2c Michael Hanselmann

482 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
483 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
484 a0d2fe2c Michael Hanselmann
      as canceling and a text message
485 a0d2fe2c Michael Hanselmann

486 a0d2fe2c Michael Hanselmann
    """
487 099b2870 Michael Hanselmann
    status = self.CalcStatus()
488 099b2870 Michael Hanselmann
489 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
490 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
491 099b2870 Michael Hanselmann
                             "Job canceled by request")
492 66bd7445 Michael Hanselmann
      self.Finalize()
493 86b16e9d Michael Hanselmann
      return (True, "Job %s canceled" % self.id)
494 099b2870 Michael Hanselmann
495 47099cd1 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITING:
496 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
497 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
498 86b16e9d Michael Hanselmann
      return (True, "Job %s will be canceled" % self.id)
499 099b2870 Michael Hanselmann
500 86b16e9d Michael Hanselmann
    else:
501 86b16e9d Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
502 86b16e9d Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
503 099b2870 Michael Hanselmann
504 4679547e Michael Hanselmann
  def ChangePriority(self, priority):
505 4679547e Michael Hanselmann
    """Changes the job priority.
506 4679547e Michael Hanselmann

507 4679547e Michael Hanselmann
    @type priority: int
508 4679547e Michael Hanselmann
    @param priority: New priority
509 4679547e Michael Hanselmann
    @rtype: tuple; (bool, string)
510 4679547e Michael Hanselmann
    @return: Boolean describing whether job's priority was successfully changed
511 4679547e Michael Hanselmann
      and a text message
512 4679547e Michael Hanselmann

513 4679547e Michael Hanselmann
    """
514 4679547e Michael Hanselmann
    status = self.CalcStatus()
515 4679547e Michael Hanselmann
516 4679547e Michael Hanselmann
    if status in constants.JOBS_FINALIZED:
517 4679547e Michael Hanselmann
      return (False, "Job %s is finished" % self.id)
518 4679547e Michael Hanselmann
    elif status == constants.JOB_STATUS_CANCELING:
519 4679547e Michael Hanselmann
      return (False, "Job %s is cancelling" % self.id)
520 4679547e Michael Hanselmann
    else:
521 4679547e Michael Hanselmann
      assert status in (constants.JOB_STATUS_QUEUED,
522 4679547e Michael Hanselmann
                        constants.JOB_STATUS_WAITING,
523 4679547e Michael Hanselmann
                        constants.JOB_STATUS_RUNNING)
524 4679547e Michael Hanselmann
525 4679547e Michael Hanselmann
      changed = False
526 4679547e Michael Hanselmann
      for op in self.ops:
527 4679547e Michael Hanselmann
        if (op.status == constants.OP_STATUS_RUNNING or
528 4679547e Michael Hanselmann
            op.status in constants.OPS_FINALIZED):
529 4679547e Michael Hanselmann
          assert not changed, \
530 4679547e Michael Hanselmann
            ("Found opcode for which priority should not be changed after"
531 4679547e Michael Hanselmann
             " priority has been changed for previous opcodes")
532 4679547e Michael Hanselmann
          continue
533 4679547e Michael Hanselmann
534 4679547e Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
535 4679547e Michael Hanselmann
                             constants.OP_STATUS_WAITING)
536 4679547e Michael Hanselmann
537 4679547e Michael Hanselmann
        changed = True
538 4679547e Michael Hanselmann
539 3c631ea2 Michael Hanselmann
        # Set new priority (doesn't modify opcode input)
540 4679547e Michael Hanselmann
        op.priority = priority
541 4679547e Michael Hanselmann
542 4679547e Michael Hanselmann
      if changed:
543 4679547e Michael Hanselmann
        return (True, ("Priorities of pending opcodes for job %s have been"
544 4679547e Michael Hanselmann
                       " changed to %s" % (self.id, priority)))
545 4679547e Michael Hanselmann
      else:
546 4679547e Michael Hanselmann
        return (False, "Job %s had no pending opcodes" % self.id)
547 4679547e Michael Hanselmann
548 f1048938 Iustin Pop
549 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
550 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
551 031a3e57 Michael Hanselmann
    """Initializes this class.
552 ea03467c Iustin Pop

553 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
554 031a3e57 Michael Hanselmann
    @param queue: Job queue
555 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
556 031a3e57 Michael Hanselmann
    @param job: Job object
557 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
558 031a3e57 Michael Hanselmann
    @param op: OpCode
559 031a3e57 Michael Hanselmann

560 031a3e57 Michael Hanselmann
    """
561 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
562 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
563 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
564 031a3e57 Michael Hanselmann
565 031a3e57 Michael Hanselmann
    self._queue = queue
566 031a3e57 Michael Hanselmann
    self._job = job
567 031a3e57 Michael Hanselmann
    self._op = op
568 031a3e57 Michael Hanselmann
569 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
570 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
571 dc1e2262 Michael Hanselmann

572 dc1e2262 Michael Hanselmann
    """
573 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
574 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
575 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
576 dc1e2262 Michael Hanselmann
      raise CancelJob()
577 dc1e2262 Michael Hanselmann
578 942e2262 Michael Hanselmann
    # See if queue is shutting down
579 942e2262 Michael Hanselmann
    if not self._queue.AcceptingJobsUnlocked():
580 942e2262 Michael Hanselmann
      logging.debug("Queue is shutting down")
581 942e2262 Michael Hanselmann
      raise QueueShutdown()
582 942e2262 Michael Hanselmann
583 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
584 031a3e57 Michael Hanselmann
  def NotifyStart(self):
585 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
586 e92376d7 Iustin Pop

587 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
588 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
589 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
590 47099cd1 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
591 e92376d7 Iustin Pop

592 e92376d7 Iustin Pop
    """
593 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
594 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
595 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
596 fbf0262f Michael Hanselmann
597 271daef8 Iustin Pop
    # Cancel here if we were asked to
598 dc1e2262 Michael Hanselmann
    self._CheckCancel()
599 fbf0262f Michael Hanselmann
600 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
601 9bdab621 Michael Hanselmann
602 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
603 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
604 271daef8 Iustin Pop
605 271daef8 Iustin Pop
    # And finally replicate the job status
606 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
607 031a3e57 Michael Hanselmann
608 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
609 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
610 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
611 9bf5e01f Guido Trotter

612 9bf5e01f Guido Trotter
    """
613 9bf5e01f Guido Trotter
    self._job.log_serial += 1
614 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
615 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
616 9bf5e01f Guido Trotter
617 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
618 031a3e57 Michael Hanselmann
    """Append a log entry.
619 031a3e57 Michael Hanselmann

620 031a3e57 Michael Hanselmann
    """
621 031a3e57 Michael Hanselmann
    assert len(args) < 3
622 031a3e57 Michael Hanselmann
623 031a3e57 Michael Hanselmann
    if len(args) == 1:
624 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
625 031a3e57 Michael Hanselmann
      log_msg = args[0]
626 031a3e57 Michael Hanselmann
    else:
627 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
628 031a3e57 Michael Hanselmann
629 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
630 031a3e57 Michael Hanselmann
    # precision.
631 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
632 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
633 031a3e57 Michael Hanselmann
634 e4e59de8 Michael Hanselmann
  def CurrentPriority(self):
635 e4e59de8 Michael Hanselmann
    """Returns current priority for opcode.
636 ef2df7d3 Michael Hanselmann

637 ef2df7d3 Michael Hanselmann
    """
638 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
639 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
640 dc1e2262 Michael Hanselmann
641 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
642 dc1e2262 Michael Hanselmann
    self._CheckCancel()
643 dc1e2262 Michael Hanselmann
644 e4e59de8 Michael Hanselmann
    return self._op.priority
645 e4e59de8 Michael Hanselmann
646 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
647 6a373640 Michael Hanselmann
    """Submits jobs for processing.
648 6a373640 Michael Hanselmann

649 6a373640 Michael Hanselmann
    See L{JobQueue.SubmitManyJobs}.
650 6a373640 Michael Hanselmann

651 6a373640 Michael Hanselmann
    """
652 6a373640 Michael Hanselmann
    # Locking is done in job queue
653 6a373640 Michael Hanselmann
    return self._queue.SubmitManyJobs(jobs)
654 6a373640 Michael Hanselmann
655 031a3e57 Michael Hanselmann
656 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
657 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
658 989a8bee Michael Hanselmann
    """Initializes this class.
659 6c2549d6 Guido Trotter

660 989a8bee Michael Hanselmann
    @type fields: list of strings
661 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
662 989a8bee Michael Hanselmann
    @type prev_job_info: string
663 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
664 989a8bee Michael Hanselmann
    @type prev_log_serial: string
665 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
666 6c2549d6 Guido Trotter

667 989a8bee Michael Hanselmann
    """
668 dc2879ea Michael Hanselmann
    self._squery = _SimpleJobQuery(fields)
669 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
670 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
671 6c2549d6 Guido Trotter
672 989a8bee Michael Hanselmann
  def __call__(self, job):
673 989a8bee Michael Hanselmann
    """Checks whether job has changed.
674 6c2549d6 Guido Trotter

675 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
676 989a8bee Michael Hanselmann
    @param job: Job object
677 6c2549d6 Guido Trotter

678 6c2549d6 Guido Trotter
    """
679 c0f6d0d8 Michael Hanselmann
    assert not job.writable, "Expected read-only job"
680 c0f6d0d8 Michael Hanselmann
681 989a8bee Michael Hanselmann
    status = job.CalcStatus()
682 dc2879ea Michael Hanselmann
    job_info = self._squery(job)
683 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
684 6c2549d6 Guido Trotter
685 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
686 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
687 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
688 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
689 6c2549d6 Guido Trotter
    # significantly different.
690 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
691 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
692 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
693 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
694 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
695 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
696 6c2549d6 Guido Trotter
697 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
698 6c2549d6 Guido Trotter
    # no changes.
699 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
700 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
701 47099cd1 Michael Hanselmann
                       constants.JOB_STATUS_WAITING) or
702 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
703 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
704 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
705 989a8bee Michael Hanselmann
      return (job_info, log_entries)
706 6c2549d6 Guido Trotter
707 989a8bee Michael Hanselmann
    return None
708 989a8bee Michael Hanselmann
709 989a8bee Michael Hanselmann
710 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
711 383477e9 Michael Hanselmann
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
712 989a8bee Michael Hanselmann
    """Initializes this class.
713 989a8bee Michael Hanselmann

714 989a8bee Michael Hanselmann
    @type filename: string
715 989a8bee Michael Hanselmann
    @param filename: Path to job file
716 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
717 6c2549d6 Guido Trotter

718 989a8bee Michael Hanselmann
    """
719 383477e9 Michael Hanselmann
    self._wm = _inotify_wm_cls()
720 989a8bee Michael Hanselmann
    self._inotify_handler = \
721 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
722 989a8bee Michael Hanselmann
    self._notifier = \
723 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
724 989a8bee Michael Hanselmann
    try:
725 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
726 989a8bee Michael Hanselmann
    except Exception:
727 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
728 989a8bee Michael Hanselmann
      self._notifier.stop()
729 989a8bee Michael Hanselmann
      raise
730 989a8bee Michael Hanselmann
731 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
732 989a8bee Michael Hanselmann
    """Callback for inotify.
733 989a8bee Michael Hanselmann

734 989a8bee Michael Hanselmann
    """
735 6c2549d6 Guido Trotter
    if not notifier_enabled:
736 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
737 989a8bee Michael Hanselmann
738 989a8bee Michael Hanselmann
  def Wait(self, timeout):
739 989a8bee Michael Hanselmann
    """Waits for the job file to change.
740 989a8bee Michael Hanselmann

741 989a8bee Michael Hanselmann
    @type timeout: float
742 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
743 989a8bee Michael Hanselmann
    @return: Whether there have been events
744 989a8bee Michael Hanselmann

745 989a8bee Michael Hanselmann
    """
746 989a8bee Michael Hanselmann
    assert timeout >= 0
747 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
748 989a8bee Michael Hanselmann
    if have_events:
749 989a8bee Michael Hanselmann
      self._notifier.read_events()
750 989a8bee Michael Hanselmann
    self._notifier.process_events()
751 989a8bee Michael Hanselmann
    return have_events
752 989a8bee Michael Hanselmann
753 989a8bee Michael Hanselmann
  def Close(self):
754 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
755 989a8bee Michael Hanselmann

756 989a8bee Michael Hanselmann
    """
757 989a8bee Michael Hanselmann
    self._notifier.stop()
758 989a8bee Michael Hanselmann
759 989a8bee Michael Hanselmann
760 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
761 383477e9 Michael Hanselmann
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
762 989a8bee Michael Hanselmann
    """Initializes this class.
763 989a8bee Michael Hanselmann

764 989a8bee Michael Hanselmann
    @type filename: string
765 989a8bee Michael Hanselmann
    @param filename: Path to job file
766 989a8bee Michael Hanselmann

767 989a8bee Michael Hanselmann
    """
768 989a8bee Michael Hanselmann
    self._filewaiter = None
769 989a8bee Michael Hanselmann
    self._filename = filename
770 383477e9 Michael Hanselmann
    self._waiter_cls = _waiter_cls
771 6c2549d6 Guido Trotter
772 989a8bee Michael Hanselmann
  def Wait(self, timeout):
773 989a8bee Michael Hanselmann
    """Waits for a job to change.
774 6c2549d6 Guido Trotter

775 989a8bee Michael Hanselmann
    @type timeout: float
776 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
777 989a8bee Michael Hanselmann
    @return: Whether there have been events
778 989a8bee Michael Hanselmann

779 989a8bee Michael Hanselmann
    """
780 989a8bee Michael Hanselmann
    if self._filewaiter:
781 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
782 989a8bee Michael Hanselmann
783 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
784 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
785 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
786 989a8bee Michael Hanselmann
    # race condition.
787 383477e9 Michael Hanselmann
    self._filewaiter = self._waiter_cls(self._filename)
788 989a8bee Michael Hanselmann
789 989a8bee Michael Hanselmann
    return True
790 989a8bee Michael Hanselmann
791 989a8bee Michael Hanselmann
  def Close(self):
792 989a8bee Michael Hanselmann
    """Closes underlying waiter.
793 989a8bee Michael Hanselmann

794 989a8bee Michael Hanselmann
    """
795 989a8bee Michael Hanselmann
    if self._filewaiter:
796 989a8bee Michael Hanselmann
      self._filewaiter.Close()
797 989a8bee Michael Hanselmann
798 989a8bee Michael Hanselmann
799 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
800 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
801 989a8bee Michael Hanselmann

802 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
803 989a8bee Michael Hanselmann
  the current job status has changed.
804 989a8bee Michael Hanselmann

805 989a8bee Michael Hanselmann
  """
806 989a8bee Michael Hanselmann
  @staticmethod
807 dfc8824a Michael Hanselmann
  def _CheckForChanges(counter, job_load_fn, check_fn):
808 dfc8824a Michael Hanselmann
    if counter.next() > 0:
809 dfc8824a Michael Hanselmann
      # If this isn't the first check the job is given some more time to change
810 dfc8824a Michael Hanselmann
      # again. This gives better performance for jobs generating many
811 dfc8824a Michael Hanselmann
      # changes/messages.
812 dfc8824a Michael Hanselmann
      time.sleep(0.1)
813 dfc8824a Michael Hanselmann
814 989a8bee Michael Hanselmann
    job = job_load_fn()
815 989a8bee Michael Hanselmann
    if not job:
816 989a8bee Michael Hanselmann
      raise errors.JobLost()
817 989a8bee Michael Hanselmann
818 989a8bee Michael Hanselmann
    result = check_fn(job)
819 989a8bee Michael Hanselmann
    if result is None:
820 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
821 989a8bee Michael Hanselmann
822 989a8bee Michael Hanselmann
    return result
823 989a8bee Michael Hanselmann
824 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
825 383477e9 Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout,
826 383477e9 Michael Hanselmann
               _waiter_cls=_JobChangesWaiter):
827 989a8bee Michael Hanselmann
    """Waits for changes on a job.
828 989a8bee Michael Hanselmann

829 989a8bee Michael Hanselmann
    @type filename: string
830 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
831 989a8bee Michael Hanselmann
    @type job_load_fn: callable
832 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
833 989a8bee Michael Hanselmann
    @type fields: list of strings
834 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
835 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
836 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
837 989a8bee Michael Hanselmann
    @type prev_log_serial: int
838 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
839 989a8bee Michael Hanselmann
    @type timeout: float
840 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
841 989a8bee Michael Hanselmann

842 989a8bee Michael Hanselmann
    """
843 dfc8824a Michael Hanselmann
    counter = itertools.count()
844 6c2549d6 Guido Trotter
    try:
845 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
846 383477e9 Michael Hanselmann
      waiter = _waiter_cls(filename)
847 989a8bee Michael Hanselmann
      try:
848 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
849 dfc8824a Michael Hanselmann
                                          counter, job_load_fn, check_fn),
850 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
851 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
852 989a8bee Michael Hanselmann
      finally:
853 989a8bee Michael Hanselmann
        waiter.Close()
854 383477e9 Michael Hanselmann
    except errors.JobLost:
855 6c2549d6 Guido Trotter
      return None
856 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
857 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
858 6c2549d6 Guido Trotter
859 6c2549d6 Guido Trotter
860 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
861 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
862 6760e4ed Michael Hanselmann

863 6760e4ed Michael Hanselmann
  """
864 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
865 6760e4ed Michael Hanselmann
    to_encode = err
866 6760e4ed Michael Hanselmann
  else:
867 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
868 6760e4ed Michael Hanselmann
869 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
870 6760e4ed Michael Hanselmann
871 6760e4ed Michael Hanselmann
872 26d3fd2f Michael Hanselmann
class _TimeoutStrategyWrapper:
873 26d3fd2f Michael Hanselmann
  def __init__(self, fn):
874 26d3fd2f Michael Hanselmann
    """Initializes this class.
875 26d3fd2f Michael Hanselmann

876 26d3fd2f Michael Hanselmann
    """
877 26d3fd2f Michael Hanselmann
    self._fn = fn
878 26d3fd2f Michael Hanselmann
    self._next = None
879 26d3fd2f Michael Hanselmann
880 26d3fd2f Michael Hanselmann
  def _Advance(self):
881 26d3fd2f Michael Hanselmann
    """Gets the next timeout if necessary.
882 26d3fd2f Michael Hanselmann

883 26d3fd2f Michael Hanselmann
    """
884 26d3fd2f Michael Hanselmann
    if self._next is None:
885 26d3fd2f Michael Hanselmann
      self._next = self._fn()
886 26d3fd2f Michael Hanselmann
887 26d3fd2f Michael Hanselmann
  def Peek(self):
888 26d3fd2f Michael Hanselmann
    """Returns the next timeout.
889 26d3fd2f Michael Hanselmann

890 26d3fd2f Michael Hanselmann
    """
891 26d3fd2f Michael Hanselmann
    self._Advance()
892 26d3fd2f Michael Hanselmann
    return self._next
893 26d3fd2f Michael Hanselmann
894 26d3fd2f Michael Hanselmann
  def Next(self):
895 26d3fd2f Michael Hanselmann
    """Returns the current timeout and advances the internal state.
896 26d3fd2f Michael Hanselmann

897 26d3fd2f Michael Hanselmann
    """
898 26d3fd2f Michael Hanselmann
    self._Advance()
899 26d3fd2f Michael Hanselmann
    result = self._next
900 26d3fd2f Michael Hanselmann
    self._next = None
901 26d3fd2f Michael Hanselmann
    return result
902 26d3fd2f Michael Hanselmann
903 26d3fd2f Michael Hanselmann
904 b80cc518 Michael Hanselmann
class _OpExecContext:
905 26d3fd2f Michael Hanselmann
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
906 b80cc518 Michael Hanselmann
    """Initializes this class.
907 b80cc518 Michael Hanselmann

908 b80cc518 Michael Hanselmann
    """
909 b80cc518 Michael Hanselmann
    self.op = op
910 b80cc518 Michael Hanselmann
    self.index = index
911 b80cc518 Michael Hanselmann
    self.log_prefix = log_prefix
912 b80cc518 Michael Hanselmann
    self.summary = op.input.Summary()
913 b80cc518 Michael Hanselmann
914 b95479a5 Michael Hanselmann
    # Create local copy to modify
915 580b1fdd Jose A. Lopes
    if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
916 b95479a5 Michael Hanselmann
      self.jobdeps = op.input.depends[:]
917 b95479a5 Michael Hanselmann
    else:
918 b95479a5 Michael Hanselmann
      self.jobdeps = None
919 b95479a5 Michael Hanselmann
920 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = timeout_strategy_factory
921 26d3fd2f Michael Hanselmann
    self._ResetTimeoutStrategy()
922 26d3fd2f Michael Hanselmann
923 26d3fd2f Michael Hanselmann
  def _ResetTimeoutStrategy(self):
924 26d3fd2f Michael Hanselmann
    """Creates a new timeout strategy.
925 26d3fd2f Michael Hanselmann

926 26d3fd2f Michael Hanselmann
    """
927 26d3fd2f Michael Hanselmann
    self._timeout_strategy = \
928 26d3fd2f Michael Hanselmann
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
929 26d3fd2f Michael Hanselmann
930 26d3fd2f Michael Hanselmann
  def CheckPriorityIncrease(self):
931 26d3fd2f Michael Hanselmann
    """Checks whether priority can and should be increased.
932 26d3fd2f Michael Hanselmann

933 26d3fd2f Michael Hanselmann
    Called when locks couldn't be acquired.
934 26d3fd2f Michael Hanselmann

935 26d3fd2f Michael Hanselmann
    """
936 26d3fd2f Michael Hanselmann
    op = self.op
937 26d3fd2f Michael Hanselmann
938 26d3fd2f Michael Hanselmann
    # Exhausted all retries and next round should not use blocking acquire
939 26d3fd2f Michael Hanselmann
    # for locks?
940 26d3fd2f Michael Hanselmann
    if (self._timeout_strategy.Peek() is None and
941 26d3fd2f Michael Hanselmann
        op.priority > constants.OP_PRIO_HIGHEST):
942 26d3fd2f Michael Hanselmann
      logging.debug("Increasing priority")
943 26d3fd2f Michael Hanselmann
      op.priority -= 1
944 26d3fd2f Michael Hanselmann
      self._ResetTimeoutStrategy()
945 26d3fd2f Michael Hanselmann
      return True
946 26d3fd2f Michael Hanselmann
947 26d3fd2f Michael Hanselmann
    return False
948 26d3fd2f Michael Hanselmann
949 26d3fd2f Michael Hanselmann
  def GetNextLockTimeout(self):
950 26d3fd2f Michael Hanselmann
    """Returns the next lock acquire timeout.
951 26d3fd2f Michael Hanselmann

952 26d3fd2f Michael Hanselmann
    """
953 26d3fd2f Michael Hanselmann
    return self._timeout_strategy.Next()
954 26d3fd2f Michael Hanselmann
955 b80cc518 Michael Hanselmann
956 be760ba8 Michael Hanselmann
class _JobProcessor(object):
957 75d81fc8 Michael Hanselmann
  (DEFER,
958 75d81fc8 Michael Hanselmann
   WAITDEP,
959 75d81fc8 Michael Hanselmann
   FINISHED) = range(1, 4)
960 75d81fc8 Michael Hanselmann
961 26d3fd2f Michael Hanselmann
  def __init__(self, queue, opexec_fn, job,
962 26d3fd2f Michael Hanselmann
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
963 be760ba8 Michael Hanselmann
    """Initializes this class.
964 be760ba8 Michael Hanselmann

965 be760ba8 Michael Hanselmann
    """
966 be760ba8 Michael Hanselmann
    self.queue = queue
967 be760ba8 Michael Hanselmann
    self.opexec_fn = opexec_fn
968 be760ba8 Michael Hanselmann
    self.job = job
969 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = _timeout_strategy_factory
970 be760ba8 Michael Hanselmann
971 be760ba8 Michael Hanselmann
  @staticmethod
972 26d3fd2f Michael Hanselmann
  def _FindNextOpcode(job, timeout_strategy_factory):
973 be760ba8 Michael Hanselmann
    """Locates the next opcode to run.
974 be760ba8 Michael Hanselmann

975 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
976 be760ba8 Michael Hanselmann
    @param job: Job object
977 26d3fd2f Michael Hanselmann
    @param timeout_strategy_factory: Callable to create new timeout strategy
978 be760ba8 Michael Hanselmann

979 be760ba8 Michael Hanselmann
    """
980 be760ba8 Michael Hanselmann
    # Create some sort of a cache to speed up locating next opcode for future
981 be760ba8 Michael Hanselmann
    # lookups
982 be760ba8 Michael Hanselmann
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
983 be760ba8 Michael Hanselmann
    # pending and one for processed ops.
984 03b63608 Michael Hanselmann
    if job.ops_iter is None:
985 03b63608 Michael Hanselmann
      job.ops_iter = enumerate(job.ops)
986 be760ba8 Michael Hanselmann
987 be760ba8 Michael Hanselmann
    # Find next opcode to run
988 be760ba8 Michael Hanselmann
    while True:
989 be760ba8 Michael Hanselmann
      try:
990 03b63608 Michael Hanselmann
        (idx, op) = job.ops_iter.next()
991 be760ba8 Michael Hanselmann
      except StopIteration:
992 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for a finished job")
993 be760ba8 Michael Hanselmann
994 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_RUNNING:
995 be760ba8 Michael Hanselmann
        # Found an opcode already marked as running
996 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for job marked as running")
997 be760ba8 Michael Hanselmann
998 26d3fd2f Michael Hanselmann
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
999 26d3fd2f Michael Hanselmann
                             timeout_strategy_factory)
1000 be760ba8 Michael Hanselmann
1001 66bd7445 Michael Hanselmann
      if op.status not in constants.OPS_FINALIZED:
1002 66bd7445 Michael Hanselmann
        return opctx
1003 be760ba8 Michael Hanselmann
1004 66bd7445 Michael Hanselmann
      # This is a job that was partially completed before master daemon
1005 66bd7445 Michael Hanselmann
      # shutdown, so it can be expected that some opcodes are already
1006 66bd7445 Michael Hanselmann
      # completed successfully (if any did error out, then the whole job
1007 66bd7445 Michael Hanselmann
      # should have been aborted and not resubmitted for processing).
1008 66bd7445 Michael Hanselmann
      logging.info("%s: opcode %s already processed, skipping",
1009 66bd7445 Michael Hanselmann
                   opctx.log_prefix, opctx.summary)
1010 be760ba8 Michael Hanselmann
1011 be760ba8 Michael Hanselmann
  @staticmethod
1012 be760ba8 Michael Hanselmann
  def _MarkWaitlock(job, op):
1013 be760ba8 Michael Hanselmann
    """Marks an opcode as waiting for locks.
1014 be760ba8 Michael Hanselmann

1015 be760ba8 Michael Hanselmann
    The job's start timestamp is also set if necessary.
1016 be760ba8 Michael Hanselmann

1017 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
1018 be760ba8 Michael Hanselmann
    @param job: Job object
1019 a38e8674 Michael Hanselmann
    @type op: L{_QueuedOpCode}
1020 a38e8674 Michael Hanselmann
    @param op: Opcode object
1021 be760ba8 Michael Hanselmann

1022 be760ba8 Michael Hanselmann
    """
1023 be760ba8 Michael Hanselmann
    assert op in job.ops
1024 5fd6b694 Michael Hanselmann
    assert op.status in (constants.OP_STATUS_QUEUED,
1025 47099cd1 Michael Hanselmann
                         constants.OP_STATUS_WAITING)
1026 5fd6b694 Michael Hanselmann
1027 5fd6b694 Michael Hanselmann
    update = False
1028 be760ba8 Michael Hanselmann
1029 be760ba8 Michael Hanselmann
    op.result = None
1030 5fd6b694 Michael Hanselmann
1031 5fd6b694 Michael Hanselmann
    if op.status == constants.OP_STATUS_QUEUED:
1032 47099cd1 Michael Hanselmann
      op.status = constants.OP_STATUS_WAITING
1033 5fd6b694 Michael Hanselmann
      update = True
1034 5fd6b694 Michael Hanselmann
1035 5fd6b694 Michael Hanselmann
    if op.start_timestamp is None:
1036 5fd6b694 Michael Hanselmann
      op.start_timestamp = TimeStampNow()
1037 5fd6b694 Michael Hanselmann
      update = True
1038 be760ba8 Michael Hanselmann
1039 be760ba8 Michael Hanselmann
    if job.start_timestamp is None:
1040 be760ba8 Michael Hanselmann
      job.start_timestamp = op.start_timestamp
1041 5fd6b694 Michael Hanselmann
      update = True
1042 5fd6b694 Michael Hanselmann
1043 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1044 5fd6b694 Michael Hanselmann
1045 5fd6b694 Michael Hanselmann
    return update
1046 be760ba8 Michael Hanselmann
1047 b95479a5 Michael Hanselmann
  @staticmethod
1048 b95479a5 Michael Hanselmann
  def _CheckDependencies(queue, job, opctx):
1049 b95479a5 Michael Hanselmann
    """Checks if an opcode has dependencies and if so, processes them.
1050 b95479a5 Michael Hanselmann

1051 b95479a5 Michael Hanselmann
    @type queue: L{JobQueue}
1052 b95479a5 Michael Hanselmann
    @param queue: Queue object
1053 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1054 b95479a5 Michael Hanselmann
    @param job: Job object
1055 b95479a5 Michael Hanselmann
    @type opctx: L{_OpExecContext}
1056 b95479a5 Michael Hanselmann
    @param opctx: Opcode execution context
1057 b95479a5 Michael Hanselmann
    @rtype: bool
1058 b95479a5 Michael Hanselmann
    @return: Whether opcode will be re-scheduled by dependency tracker
1059 b95479a5 Michael Hanselmann

1060 b95479a5 Michael Hanselmann
    """
1061 b95479a5 Michael Hanselmann
    op = opctx.op
1062 b95479a5 Michael Hanselmann
1063 b95479a5 Michael Hanselmann
    result = False
1064 b95479a5 Michael Hanselmann
1065 b95479a5 Michael Hanselmann
    while opctx.jobdeps:
1066 b95479a5 Michael Hanselmann
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1067 b95479a5 Michael Hanselmann
1068 b95479a5 Michael Hanselmann
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1069 b95479a5 Michael Hanselmann
                                                          dep_status)
1070 b95479a5 Michael Hanselmann
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1071 b95479a5 Michael Hanselmann
1072 b95479a5 Michael Hanselmann
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1073 b95479a5 Michael Hanselmann
1074 b95479a5 Michael Hanselmann
      if depresult == _JobDependencyManager.CONTINUE:
1075 b95479a5 Michael Hanselmann
        # Remove dependency and continue
1076 b95479a5 Michael Hanselmann
        opctx.jobdeps.pop(0)
1077 b95479a5 Michael Hanselmann
1078 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.WAIT:
1079 b95479a5 Michael Hanselmann
        # Need to wait for notification, dependency tracker will re-add job
1080 b95479a5 Michael Hanselmann
        # to workerpool
1081 b95479a5 Michael Hanselmann
        result = True
1082 b95479a5 Michael Hanselmann
        break
1083 b95479a5 Michael Hanselmann
1084 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.CANCEL:
1085 b95479a5 Michael Hanselmann
        # Job was cancelled, cancel this job as well
1086 b95479a5 Michael Hanselmann
        job.Cancel()
1087 b95479a5 Michael Hanselmann
        assert op.status == constants.OP_STATUS_CANCELING
1088 b95479a5 Michael Hanselmann
        break
1089 b95479a5 Michael Hanselmann
1090 b95479a5 Michael Hanselmann
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1091 b95479a5 Michael Hanselmann
                         _JobDependencyManager.ERROR):
1092 b95479a5 Michael Hanselmann
        # Job failed or there was an error, this job must fail
1093 b95479a5 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1094 b95479a5 Michael Hanselmann
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1095 b95479a5 Michael Hanselmann
        break
1096 b95479a5 Michael Hanselmann
1097 b95479a5 Michael Hanselmann
      else:
1098 b95479a5 Michael Hanselmann
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1099 b95479a5 Michael Hanselmann
                                     depresult)
1100 b95479a5 Michael Hanselmann
1101 b95479a5 Michael Hanselmann
    return result
1102 b95479a5 Michael Hanselmann
1103 b80cc518 Michael Hanselmann
  def _ExecOpCodeUnlocked(self, opctx):
1104 be760ba8 Michael Hanselmann
    """Processes one opcode and returns the result.
1105 be760ba8 Michael Hanselmann

1106 be760ba8 Michael Hanselmann
    """
1107 b80cc518 Michael Hanselmann
    op = opctx.op
1108 b80cc518 Michael Hanselmann
1109 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1110 be760ba8 Michael Hanselmann
1111 26d3fd2f Michael Hanselmann
    timeout = opctx.GetNextLockTimeout()
1112 26d3fd2f Michael Hanselmann
1113 be760ba8 Michael Hanselmann
    try:
1114 be760ba8 Michael Hanselmann
      # Make sure not to hold queue lock while calling ExecOpCode
1115 be760ba8 Michael Hanselmann
      result = self.opexec_fn(op.input,
1116 26d3fd2f Michael Hanselmann
                              _OpExecCallbacks(self.queue, self.job, op),
1117 e4e59de8 Michael Hanselmann
                              timeout=timeout)
1118 26d3fd2f Michael Hanselmann
    except mcpu.LockAcquireTimeout:
1119 26d3fd2f Michael Hanselmann
      assert timeout is not None, "Received timeout for blocking acquire"
1120 26d3fd2f Michael Hanselmann
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1121 9e49dfc5 Michael Hanselmann
1122 47099cd1 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_WAITING,
1123 9e49dfc5 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1124 9e49dfc5 Michael Hanselmann
1125 9e49dfc5 Michael Hanselmann
      # Was job cancelled while we were waiting for the lock?
1126 9e49dfc5 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELING:
1127 9e49dfc5 Michael Hanselmann
        return (constants.OP_STATUS_CANCELING, None)
1128 9e49dfc5 Michael Hanselmann
1129 942e2262 Michael Hanselmann
      # Queue is shutting down, return to queued
1130 942e2262 Michael Hanselmann
      if not self.queue.AcceptingJobsUnlocked():
1131 942e2262 Michael Hanselmann
        return (constants.OP_STATUS_QUEUED, None)
1132 942e2262 Michael Hanselmann
1133 5fd6b694 Michael Hanselmann
      # Stay in waitlock while trying to re-acquire lock
1134 47099cd1 Michael Hanselmann
      return (constants.OP_STATUS_WAITING, None)
1135 be760ba8 Michael Hanselmann
    except CancelJob:
1136 b80cc518 Michael Hanselmann
      logging.exception("%s: Canceling job", opctx.log_prefix)
1137 be760ba8 Michael Hanselmann
      assert op.status == constants.OP_STATUS_CANCELING
1138 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_CANCELING, None)
1139 942e2262 Michael Hanselmann
1140 942e2262 Michael Hanselmann
    except QueueShutdown:
1141 942e2262 Michael Hanselmann
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1142 942e2262 Michael Hanselmann
1143 942e2262 Michael Hanselmann
      assert op.status == constants.OP_STATUS_WAITING
1144 942e2262 Michael Hanselmann
1145 942e2262 Michael Hanselmann
      # Job hadn't been started yet, so it should return to the queue
1146 942e2262 Michael Hanselmann
      return (constants.OP_STATUS_QUEUED, None)
1147 942e2262 Michael Hanselmann
1148 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1149 b80cc518 Michael Hanselmann
      logging.exception("%s: Caught exception in %s",
1150 b80cc518 Michael Hanselmann
                        opctx.log_prefix, opctx.summary)
1151 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1152 be760ba8 Michael Hanselmann
    else:
1153 b80cc518 Michael Hanselmann
      logging.debug("%s: %s successful",
1154 b80cc518 Michael Hanselmann
                    opctx.log_prefix, opctx.summary)
1155 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_SUCCESS, result)
1156 be760ba8 Michael Hanselmann
1157 26d3fd2f Michael Hanselmann
  def __call__(self, _nextop_fn=None):
1158 be760ba8 Michael Hanselmann
    """Continues execution of a job.
1159 be760ba8 Michael Hanselmann

1160 26d3fd2f Michael Hanselmann
    @param _nextop_fn: Callback function for tests
1161 75d81fc8 Michael Hanselmann
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1162 75d81fc8 Michael Hanselmann
      be deferred and C{WAITDEP} if the dependency manager
1163 75d81fc8 Michael Hanselmann
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1164 be760ba8 Michael Hanselmann

1165 be760ba8 Michael Hanselmann
    """
1166 be760ba8 Michael Hanselmann
    queue = self.queue
1167 be760ba8 Michael Hanselmann
    job = self.job
1168 be760ba8 Michael Hanselmann
1169 be760ba8 Michael Hanselmann
    logging.debug("Processing job %s", job.id)
1170 be760ba8 Michael Hanselmann
1171 be760ba8 Michael Hanselmann
    queue.acquire(shared=1)
1172 be760ba8 Michael Hanselmann
    try:
1173 be760ba8 Michael Hanselmann
      opcount = len(job.ops)
1174 be760ba8 Michael Hanselmann
1175 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Expected writable job"
1176 c0f6d0d8 Michael Hanselmann
1177 66bd7445 Michael Hanselmann
      # Don't do anything for finalized jobs
1178 66bd7445 Michael Hanselmann
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1179 75d81fc8 Michael Hanselmann
        return self.FINISHED
1180 66bd7445 Michael Hanselmann
1181 26d3fd2f Michael Hanselmann
      # Is a previous opcode still pending?
1182 26d3fd2f Michael Hanselmann
      if job.cur_opctx:
1183 26d3fd2f Michael Hanselmann
        opctx = job.cur_opctx
1184 5fd6b694 Michael Hanselmann
        job.cur_opctx = None
1185 26d3fd2f Michael Hanselmann
      else:
1186 26d3fd2f Michael Hanselmann
        if __debug__ and _nextop_fn:
1187 26d3fd2f Michael Hanselmann
          _nextop_fn()
1188 26d3fd2f Michael Hanselmann
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1189 26d3fd2f Michael Hanselmann
1190 b80cc518 Michael Hanselmann
      op = opctx.op
1191 be760ba8 Michael Hanselmann
1192 be760ba8 Michael Hanselmann
      # Consistency check
1193 be760ba8 Michael Hanselmann
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1194 66bd7445 Michael Hanselmann
                                     constants.OP_STATUS_CANCELING)
1195 5fd6b694 Michael Hanselmann
                        for i in job.ops[opctx.index + 1:])
1196 be760ba8 Michael Hanselmann
1197 be760ba8 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_QUEUED,
1198 47099cd1 Michael Hanselmann
                           constants.OP_STATUS_WAITING,
1199 66bd7445 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1200 be760ba8 Michael Hanselmann
1201 26d3fd2f Michael Hanselmann
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1202 26d3fd2f Michael Hanselmann
              op.priority >= constants.OP_PRIO_HIGHEST)
1203 26d3fd2f Michael Hanselmann
1204 b95479a5 Michael Hanselmann
      waitjob = None
1205 b95479a5 Michael Hanselmann
1206 66bd7445 Michael Hanselmann
      if op.status != constants.OP_STATUS_CANCELING:
1207 30c945d0 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
1208 47099cd1 Michael Hanselmann
                             constants.OP_STATUS_WAITING)
1209 30c945d0 Michael Hanselmann
1210 be760ba8 Michael Hanselmann
        # Prepare to start opcode
1211 5fd6b694 Michael Hanselmann
        if self._MarkWaitlock(job, op):
1212 5fd6b694 Michael Hanselmann
          # Write to disk
1213 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1214 be760ba8 Michael Hanselmann
1215 47099cd1 Michael Hanselmann
        assert op.status == constants.OP_STATUS_WAITING
1216 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1217 5fd6b694 Michael Hanselmann
        assert job.start_timestamp and op.start_timestamp
1218 b95479a5 Michael Hanselmann
        assert waitjob is None
1219 b95479a5 Michael Hanselmann
1220 b95479a5 Michael Hanselmann
        # Check if waiting for a job is necessary
1221 b95479a5 Michael Hanselmann
        waitjob = self._CheckDependencies(queue, job, opctx)
1222 be760ba8 Michael Hanselmann
1223 47099cd1 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_WAITING,
1224 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_CANCELING,
1225 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_ERROR)
1226 be760ba8 Michael Hanselmann
1227 b95479a5 Michael Hanselmann
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1228 b95479a5 Michael Hanselmann
                                         constants.OP_STATUS_ERROR)):
1229 b95479a5 Michael Hanselmann
          logging.info("%s: opcode %s waiting for locks",
1230 b95479a5 Michael Hanselmann
                       opctx.log_prefix, opctx.summary)
1231 be760ba8 Michael Hanselmann
1232 b95479a5 Michael Hanselmann
          assert not opctx.jobdeps, "Not all dependencies were removed"
1233 b95479a5 Michael Hanselmann
1234 b95479a5 Michael Hanselmann
          queue.release()
1235 b95479a5 Michael Hanselmann
          try:
1236 b95479a5 Michael Hanselmann
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1237 b95479a5 Michael Hanselmann
          finally:
1238 b95479a5 Michael Hanselmann
            queue.acquire(shared=1)
1239 b95479a5 Michael Hanselmann
1240 b95479a5 Michael Hanselmann
          op.status = op_status
1241 b95479a5 Michael Hanselmann
          op.result = op_result
1242 b95479a5 Michael Hanselmann
1243 b95479a5 Michael Hanselmann
          assert not waitjob
1244 be760ba8 Michael Hanselmann
1245 942e2262 Michael Hanselmann
        if op.status in (constants.OP_STATUS_WAITING,
1246 942e2262 Michael Hanselmann
                         constants.OP_STATUS_QUEUED):
1247 942e2262 Michael Hanselmann
          # waiting: Couldn't get locks in time
1248 942e2262 Michael Hanselmann
          # queued: Queue is shutting down
1249 26d3fd2f Michael Hanselmann
          assert not op.end_timestamp
1250 be760ba8 Michael Hanselmann
        else:
1251 26d3fd2f Michael Hanselmann
          # Finalize opcode
1252 26d3fd2f Michael Hanselmann
          op.end_timestamp = TimeStampNow()
1253 be760ba8 Michael Hanselmann
1254 26d3fd2f Michael Hanselmann
          if op.status == constants.OP_STATUS_CANCELING:
1255 26d3fd2f Michael Hanselmann
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1256 26d3fd2f Michael Hanselmann
                                  for i in job.ops[opctx.index:])
1257 26d3fd2f Michael Hanselmann
          else:
1258 26d3fd2f Michael Hanselmann
            assert op.status in constants.OPS_FINALIZED
1259 be760ba8 Michael Hanselmann
1260 942e2262 Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
1261 942e2262 Michael Hanselmann
        # Queue is shutting down
1262 942e2262 Michael Hanselmann
        assert not waitjob
1263 942e2262 Michael Hanselmann
1264 942e2262 Michael Hanselmann
        finalize = False
1265 942e2262 Michael Hanselmann
1266 942e2262 Michael Hanselmann
        # Reset context
1267 942e2262 Michael Hanselmann
        job.cur_opctx = None
1268 942e2262 Michael Hanselmann
1269 942e2262 Michael Hanselmann
        # In no case must the status be finalized here
1270 942e2262 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1271 942e2262 Michael Hanselmann
1272 942e2262 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1273 be760ba8 Michael Hanselmann
        finalize = False
1274 be760ba8 Michael Hanselmann
1275 b95479a5 Michael Hanselmann
        if not waitjob and opctx.CheckPriorityIncrease():
1276 5fd6b694 Michael Hanselmann
          # Priority was changed, need to update on-disk file
1277 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1278 be760ba8 Michael Hanselmann
1279 26d3fd2f Michael Hanselmann
        # Keep around for another round
1280 26d3fd2f Michael Hanselmann
        job.cur_opctx = opctx
1281 be760ba8 Michael Hanselmann
1282 26d3fd2f Michael Hanselmann
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1283 26d3fd2f Michael Hanselmann
                op.priority >= constants.OP_PRIO_HIGHEST)
1284 be760ba8 Michael Hanselmann
1285 26d3fd2f Michael Hanselmann
        # In no case must the status be finalized here
1286 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1287 be760ba8 Michael Hanselmann
1288 be760ba8 Michael Hanselmann
      else:
1289 26d3fd2f Michael Hanselmann
        # Ensure all opcodes so far have been successful
1290 26d3fd2f Michael Hanselmann
        assert (opctx.index == 0 or
1291 26d3fd2f Michael Hanselmann
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1292 26d3fd2f Michael Hanselmann
                           for i in job.ops[:opctx.index]))
1293 26d3fd2f Michael Hanselmann
1294 26d3fd2f Michael Hanselmann
        # Reset context
1295 26d3fd2f Michael Hanselmann
        job.cur_opctx = None
1296 26d3fd2f Michael Hanselmann
1297 26d3fd2f Michael Hanselmann
        if op.status == constants.OP_STATUS_SUCCESS:
1298 26d3fd2f Michael Hanselmann
          finalize = False
1299 26d3fd2f Michael Hanselmann
1300 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_ERROR:
1301 26d3fd2f Michael Hanselmann
          # Ensure failed opcode has an exception as its result
1302 26d3fd2f Michael Hanselmann
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1303 26d3fd2f Michael Hanselmann
1304 26d3fd2f Michael Hanselmann
          to_encode = errors.OpExecError("Preceding opcode failed")
1305 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1306 26d3fd2f Michael Hanselmann
                                _EncodeOpError(to_encode))
1307 26d3fd2f Michael Hanselmann
          finalize = True
1308 be760ba8 Michael Hanselmann
1309 26d3fd2f Michael Hanselmann
          # Consistency check
1310 26d3fd2f Michael Hanselmann
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1311 26d3fd2f Michael Hanselmann
                            errors.GetEncodedError(i.result)
1312 26d3fd2f Michael Hanselmann
                            for i in job.ops[opctx.index:])
1313 be760ba8 Michael Hanselmann
1314 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELING:
1315 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1316 26d3fd2f Michael Hanselmann
                                "Job canceled by request")
1317 26d3fd2f Michael Hanselmann
          finalize = True
1318 26d3fd2f Michael Hanselmann
1319 26d3fd2f Michael Hanselmann
        else:
1320 26d3fd2f Michael Hanselmann
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1321 26d3fd2f Michael Hanselmann
1322 66bd7445 Michael Hanselmann
        if opctx.index == (opcount - 1):
1323 66bd7445 Michael Hanselmann
          # Finalize on last opcode
1324 66bd7445 Michael Hanselmann
          finalize = True
1325 66bd7445 Michael Hanselmann
1326 66bd7445 Michael Hanselmann
        if finalize:
1327 26d3fd2f Michael Hanselmann
          # All opcodes have been run, finalize job
1328 66bd7445 Michael Hanselmann
          job.Finalize()
1329 26d3fd2f Michael Hanselmann
1330 26d3fd2f Michael Hanselmann
        # Write to disk. If the job status is final, this is the final write
1331 26d3fd2f Michael Hanselmann
        # allowed. Once the file has been written, it can be archived anytime.
1332 26d3fd2f Michael Hanselmann
        queue.UpdateJobUnlocked(job)
1333 be760ba8 Michael Hanselmann
1334 b95479a5 Michael Hanselmann
        assert not waitjob
1335 b95479a5 Michael Hanselmann
1336 66bd7445 Michael Hanselmann
        if finalize:
1337 26d3fd2f Michael Hanselmann
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1338 75d81fc8 Michael Hanselmann
          return self.FINISHED
1339 be760ba8 Michael Hanselmann
1340 b95479a5 Michael Hanselmann
      assert not waitjob or queue.depmgr.JobWaiting(job)
1341 b95479a5 Michael Hanselmann
1342 75d81fc8 Michael Hanselmann
      if waitjob:
1343 75d81fc8 Michael Hanselmann
        return self.WAITDEP
1344 75d81fc8 Michael Hanselmann
      else:
1345 75d81fc8 Michael Hanselmann
        return self.DEFER
1346 be760ba8 Michael Hanselmann
    finally:
1347 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Job became read-only while being processed"
1348 be760ba8 Michael Hanselmann
      queue.release()
1349 be760ba8 Michael Hanselmann
1350 be760ba8 Michael Hanselmann
1351 df5a5730 Michael Hanselmann
def _EvaluateJobProcessorResult(depmgr, job, result):
1352 df5a5730 Michael Hanselmann
  """Looks at a result from L{_JobProcessor} for a job.
1353 df5a5730 Michael Hanselmann

1354 df5a5730 Michael Hanselmann
  To be used in a L{_JobQueueWorker}.
1355 df5a5730 Michael Hanselmann

1356 df5a5730 Michael Hanselmann
  """
1357 df5a5730 Michael Hanselmann
  if result == _JobProcessor.FINISHED:
1358 df5a5730 Michael Hanselmann
    # Notify waiting jobs
1359 df5a5730 Michael Hanselmann
    depmgr.NotifyWaiters(job.id)
1360 df5a5730 Michael Hanselmann
1361 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.DEFER:
1362 df5a5730 Michael Hanselmann
    # Schedule again
1363 df5a5730 Michael Hanselmann
    raise workerpool.DeferTask(priority=job.CalcPriority())
1364 df5a5730 Michael Hanselmann
1365 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.WAITDEP:
1366 df5a5730 Michael Hanselmann
    # No-op, dependency manager will re-schedule
1367 df5a5730 Michael Hanselmann
    pass
1368 df5a5730 Michael Hanselmann
1369 df5a5730 Michael Hanselmann
  else:
1370 df5a5730 Michael Hanselmann
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1371 df5a5730 Michael Hanselmann
                                 (result, ))
1372 df5a5730 Michael Hanselmann
1373 df5a5730 Michael Hanselmann
1374 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
1375 031a3e57 Michael Hanselmann
  """The actual job workers.
1376 031a3e57 Michael Hanselmann

1377 031a3e57 Michael Hanselmann
  """
1378 b459a848 Andrea Spadaccini
  def RunTask(self, job): # pylint: disable=W0221
1379 e2715f69 Michael Hanselmann
    """Job executor.
1380 e2715f69 Michael Hanselmann

1381 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1382 ea03467c Iustin Pop
    @param job: the job to be processed
1383 ea03467c Iustin Pop

1384 e2715f69 Michael Hanselmann
    """
1385 f8a4adfa Michael Hanselmann
    assert job.writable, "Expected writable job"
1386 f8a4adfa Michael Hanselmann
1387 b95479a5 Michael Hanselmann
    # Ensure only one worker is active on a single job. If a job registers for
1388 b95479a5 Michael Hanselmann
    # a dependency job, and the other job notifies before the first worker is
1389 b95479a5 Michael Hanselmann
    # done, the job can end up in the tasklist more than once.
1390 b95479a5 Michael Hanselmann
    job.processor_lock.acquire()
1391 b95479a5 Michael Hanselmann
    try:
1392 b95479a5 Michael Hanselmann
      return self._RunTaskInner(job)
1393 b95479a5 Michael Hanselmann
    finally:
1394 b95479a5 Michael Hanselmann
      job.processor_lock.release()
1395 b95479a5 Michael Hanselmann
1396 b95479a5 Michael Hanselmann
  def _RunTaskInner(self, job):
1397 b95479a5 Michael Hanselmann
    """Executes a job.
1398 b95479a5 Michael Hanselmann

1399 b95479a5 Michael Hanselmann
    Must be called with per-job lock acquired.
1400 b95479a5 Michael Hanselmann

1401 b95479a5 Michael Hanselmann
    """
1402 be760ba8 Michael Hanselmann
    queue = job.queue
1403 be760ba8 Michael Hanselmann
    assert queue == self.pool.queue
1404 be760ba8 Michael Hanselmann
1405 0aeeb6e3 Michael Hanselmann
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1406 0aeeb6e3 Michael Hanselmann
    setname_fn(None)
1407 daba67c7 Michael Hanselmann
1408 be760ba8 Michael Hanselmann
    proc = mcpu.Processor(queue.context, job.id)
1409 be760ba8 Michael Hanselmann
1410 0aeeb6e3 Michael Hanselmann
    # Create wrapper for setting thread name
1411 0aeeb6e3 Michael Hanselmann
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1412 0aeeb6e3 Michael Hanselmann
                                    proc.ExecOpCode)
1413 0aeeb6e3 Michael Hanselmann
1414 df5a5730 Michael Hanselmann
    _EvaluateJobProcessorResult(queue.depmgr, job,
1415 df5a5730 Michael Hanselmann
                                _JobProcessor(queue, wrap_execop_fn, job)())
1416 75d81fc8 Michael Hanselmann
1417 0aeeb6e3 Michael Hanselmann
  @staticmethod
1418 0aeeb6e3 Michael Hanselmann
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1419 0aeeb6e3 Michael Hanselmann
    """Updates the worker thread name to include a short summary of the opcode.
1420 0aeeb6e3 Michael Hanselmann

1421 0aeeb6e3 Michael Hanselmann
    @param setname_fn: Callable setting worker thread name
1422 0aeeb6e3 Michael Hanselmann
    @param execop_fn: Callable for executing opcode (usually
1423 0aeeb6e3 Michael Hanselmann
                      L{mcpu.Processor.ExecOpCode})
1424 0aeeb6e3 Michael Hanselmann

1425 0aeeb6e3 Michael Hanselmann
    """
1426 0aeeb6e3 Michael Hanselmann
    setname_fn(op)
1427 0aeeb6e3 Michael Hanselmann
    try:
1428 0aeeb6e3 Michael Hanselmann
      return execop_fn(op, *args, **kwargs)
1429 0aeeb6e3 Michael Hanselmann
    finally:
1430 0aeeb6e3 Michael Hanselmann
      setname_fn(None)
1431 0aeeb6e3 Michael Hanselmann
1432 0aeeb6e3 Michael Hanselmann
  @staticmethod
1433 0aeeb6e3 Michael Hanselmann
  def _GetWorkerName(job, op):
1434 0aeeb6e3 Michael Hanselmann
    """Sets the worker thread name.
1435 0aeeb6e3 Michael Hanselmann

1436 0aeeb6e3 Michael Hanselmann
    @type job: L{_QueuedJob}
1437 0aeeb6e3 Michael Hanselmann
    @type op: L{opcodes.OpCode}
1438 0aeeb6e3 Michael Hanselmann

1439 0aeeb6e3 Michael Hanselmann
    """
1440 0aeeb6e3 Michael Hanselmann
    parts = ["Job%s" % job.id]
1441 0aeeb6e3 Michael Hanselmann
1442 0aeeb6e3 Michael Hanselmann
    if op:
1443 0aeeb6e3 Michael Hanselmann
      parts.append(op.TinySummary())
1444 0aeeb6e3 Michael Hanselmann
1445 0aeeb6e3 Michael Hanselmann
    return "/".join(parts)
1446 0aeeb6e3 Michael Hanselmann
1447 e2715f69 Michael Hanselmann
1448 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
1449 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
1450 ea03467c Iustin Pop

1451 ea03467c Iustin Pop
  """
1452 5bdce580 Michael Hanselmann
  def __init__(self, queue):
1453 0aeeb6e3 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("Jq",
1454 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
1455 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
1456 5bdce580 Michael Hanselmann
    self.queue = queue
1457 e2715f69 Michael Hanselmann
1458 e2715f69 Michael Hanselmann
1459 b95479a5 Michael Hanselmann
class _JobDependencyManager:
1460 b95479a5 Michael Hanselmann
  """Keeps track of job dependencies.
1461 b95479a5 Michael Hanselmann

1462 b95479a5 Michael Hanselmann
  """
1463 b95479a5 Michael Hanselmann
  (WAIT,
1464 b95479a5 Michael Hanselmann
   ERROR,
1465 b95479a5 Michael Hanselmann
   CANCEL,
1466 b95479a5 Michael Hanselmann
   CONTINUE,
1467 b95479a5 Michael Hanselmann
   WRONGSTATUS) = range(1, 6)
1468 b95479a5 Michael Hanselmann
1469 b95479a5 Michael Hanselmann
  def __init__(self, getstatus_fn, enqueue_fn):
1470 b95479a5 Michael Hanselmann
    """Initializes this class.
1471 b95479a5 Michael Hanselmann

1472 b95479a5 Michael Hanselmann
    """
1473 b95479a5 Michael Hanselmann
    self._getstatus_fn = getstatus_fn
1474 b95479a5 Michael Hanselmann
    self._enqueue_fn = enqueue_fn
1475 b95479a5 Michael Hanselmann
1476 b95479a5 Michael Hanselmann
    self._waiters = {}
1477 b95479a5 Michael Hanselmann
    self._lock = locking.SharedLock("JobDepMgr")
1478 b95479a5 Michael Hanselmann
1479 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1480 b459a848 Andrea Spadaccini
  def GetLockInfo(self, requested): # pylint: disable=W0613
1481 fcb21ad7 Michael Hanselmann
    """Retrieves information about waiting jobs.
1482 fcb21ad7 Michael Hanselmann

1483 fcb21ad7 Michael Hanselmann
    @type requested: set
1484 fcb21ad7 Michael Hanselmann
    @param requested: Requested information, see C{query.LQ_*}
1485 fcb21ad7 Michael Hanselmann

1486 fcb21ad7 Michael Hanselmann
    """
1487 fcb21ad7 Michael Hanselmann
    # No need to sort here, that's being done by the lock manager and query
1488 fcb21ad7 Michael Hanselmann
    # library. There are no priorities for notifying jobs, hence all show up as
1489 fcb21ad7 Michael Hanselmann
    # one item under "pending".
1490 fcb21ad7 Michael Hanselmann
    return [("job/%s" % job_id, None, None,
1491 fcb21ad7 Michael Hanselmann
             [("job", [job.id for job in waiters])])
1492 fcb21ad7 Michael Hanselmann
            for job_id, waiters in self._waiters.items()
1493 fcb21ad7 Michael Hanselmann
            if waiters]
1494 fcb21ad7 Michael Hanselmann
1495 fcb21ad7 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1496 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
1497 b95479a5 Michael Hanselmann
    """Checks if a job is waiting.
1498 b95479a5 Michael Hanselmann

1499 b95479a5 Michael Hanselmann
    """
1500 b95479a5 Michael Hanselmann
    return compat.any(job in jobs
1501 b95479a5 Michael Hanselmann
                      for jobs in self._waiters.values())
1502 b95479a5 Michael Hanselmann
1503 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1504 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1505 b95479a5 Michael Hanselmann
    """Checks if a dependency job has the requested status.
1506 b95479a5 Michael Hanselmann

1507 b95479a5 Michael Hanselmann
    If the other job is not yet in a finalized status, the calling job will be
1508 b95479a5 Michael Hanselmann
    notified (re-added to the workerpool) at a later point.
1509 b95479a5 Michael Hanselmann

1510 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1511 b95479a5 Michael Hanselmann
    @param job: Job object
1512 76b62028 Iustin Pop
    @type dep_job_id: int
1513 b95479a5 Michael Hanselmann
    @param dep_job_id: ID of dependency job
1514 b95479a5 Michael Hanselmann
    @type dep_status: list
1515 b95479a5 Michael Hanselmann
    @param dep_status: Required status
1516 b95479a5 Michael Hanselmann

1517 b95479a5 Michael Hanselmann
    """
1518 76b62028 Iustin Pop
    assert ht.TJobId(job.id)
1519 76b62028 Iustin Pop
    assert ht.TJobId(dep_job_id)
1520 b95479a5 Michael Hanselmann
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1521 b95479a5 Michael Hanselmann
1522 b95479a5 Michael Hanselmann
    if job.id == dep_job_id:
1523 b95479a5 Michael Hanselmann
      return (self.ERROR, "Job can't depend on itself")
1524 b95479a5 Michael Hanselmann
1525 b95479a5 Michael Hanselmann
    # Get status of dependency job
1526 b95479a5 Michael Hanselmann
    try:
1527 b95479a5 Michael Hanselmann
      status = self._getstatus_fn(dep_job_id)
1528 b95479a5 Michael Hanselmann
    except errors.JobLost, err:
1529 b95479a5 Michael Hanselmann
      return (self.ERROR, "Dependency error: %s" % err)
1530 b95479a5 Michael Hanselmann
1531 b95479a5 Michael Hanselmann
    assert status in constants.JOB_STATUS_ALL
1532 b95479a5 Michael Hanselmann
1533 b95479a5 Michael Hanselmann
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1534 b95479a5 Michael Hanselmann
1535 b95479a5 Michael Hanselmann
    if status not in constants.JOBS_FINALIZED:
1536 b95479a5 Michael Hanselmann
      # Register for notification and wait for job to finish
1537 b95479a5 Michael Hanselmann
      job_id_waiters.add(job)
1538 b95479a5 Michael Hanselmann
      return (self.WAIT,
1539 b95479a5 Michael Hanselmann
              "Need to wait for job %s, wanted status '%s'" %
1540 b95479a5 Michael Hanselmann
              (dep_job_id, dep_status))
1541 b95479a5 Michael Hanselmann
1542 b95479a5 Michael Hanselmann
    # Remove from waiters list
1543 b95479a5 Michael Hanselmann
    if job in job_id_waiters:
1544 b95479a5 Michael Hanselmann
      job_id_waiters.remove(job)
1545 b95479a5 Michael Hanselmann
1546 b95479a5 Michael Hanselmann
    if (status == constants.JOB_STATUS_CANCELED and
1547 b95479a5 Michael Hanselmann
        constants.JOB_STATUS_CANCELED not in dep_status):
1548 b95479a5 Michael Hanselmann
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1549 b95479a5 Michael Hanselmann
1550 b95479a5 Michael Hanselmann
    elif not dep_status or status in dep_status:
1551 b95479a5 Michael Hanselmann
      return (self.CONTINUE,
1552 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s'" %
1553 b95479a5 Michael Hanselmann
              (dep_job_id, status))
1554 b95479a5 Michael Hanselmann
1555 b95479a5 Michael Hanselmann
    else:
1556 b95479a5 Michael Hanselmann
      return (self.WRONGSTATUS,
1557 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s',"
1558 b95479a5 Michael Hanselmann
              " not one of '%s' as required" %
1559 b95479a5 Michael Hanselmann
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1560 b95479a5 Michael Hanselmann
1561 37d76f1e Michael Hanselmann
  def _RemoveEmptyWaitersUnlocked(self):
1562 37d76f1e Michael Hanselmann
    """Remove all jobs without actual waiters.
1563 37d76f1e Michael Hanselmann

1564 37d76f1e Michael Hanselmann
    """
1565 37d76f1e Michael Hanselmann
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1566 37d76f1e Michael Hanselmann
                   if not waiters]:
1567 37d76f1e Michael Hanselmann
      del self._waiters[job_id]
1568 37d76f1e Michael Hanselmann
1569 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
1570 b95479a5 Michael Hanselmann
    """Notifies all jobs waiting for a certain job ID.
1571 b95479a5 Michael Hanselmann

1572 1316ebc2 Michael Hanselmann
    @attention: Do not call until L{CheckAndRegister} returned a status other
1573 1316ebc2 Michael Hanselmann
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1574 76b62028 Iustin Pop
    @type job_id: int
1575 b95479a5 Michael Hanselmann
    @param job_id: Job ID
1576 b95479a5 Michael Hanselmann

1577 b95479a5 Michael Hanselmann
    """
1578 76b62028 Iustin Pop
    assert ht.TJobId(job_id)
1579 b95479a5 Michael Hanselmann
1580 37d76f1e Michael Hanselmann
    self._lock.acquire()
1581 37d76f1e Michael Hanselmann
    try:
1582 37d76f1e Michael Hanselmann
      self._RemoveEmptyWaitersUnlocked()
1583 37d76f1e Michael Hanselmann
1584 37d76f1e Michael Hanselmann
      jobs = self._waiters.pop(job_id, None)
1585 37d76f1e Michael Hanselmann
    finally:
1586 37d76f1e Michael Hanselmann
      self._lock.release()
1587 37d76f1e Michael Hanselmann
1588 b95479a5 Michael Hanselmann
    if jobs:
1589 b95479a5 Michael Hanselmann
      # Re-add jobs to workerpool
1590 b95479a5 Michael Hanselmann
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1591 b95479a5 Michael Hanselmann
                    len(jobs), job_id)
1592 b95479a5 Michael Hanselmann
      self._enqueue_fn(jobs)
1593 b95479a5 Michael Hanselmann
1594 b95479a5 Michael Hanselmann
1595 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
1596 6c881c52 Iustin Pop
  """Decorator for "public" functions.
1597 ea03467c Iustin Pop

1598 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
1599 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
1600 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
1601 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
1602 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
1603 ea03467c Iustin Pop

1604 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
1605 f1da30e6 Michael Hanselmann

1606 6c881c52 Iustin Pop
  Example::
1607 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
1608 6c881c52 Iustin Pop
    @_RequireOpenQueue
1609 6c881c52 Iustin Pop
    def Example(self):
1610 6c881c52 Iustin Pop
      pass
1611 db37da70 Michael Hanselmann

1612 6c881c52 Iustin Pop
  """
1613 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
1614 b459a848 Andrea Spadaccini
    # pylint: disable=W0212
1615 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
1616 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
1617 6c881c52 Iustin Pop
  return wrapper
1618 db37da70 Michael Hanselmann
1619 db37da70 Michael Hanselmann
1620 c8d0be94 Michael Hanselmann
def _RequireNonDrainedQueue(fn):
1621 c8d0be94 Michael Hanselmann
  """Decorator checking for a non-drained queue.
1622 c8d0be94 Michael Hanselmann

1623 c8d0be94 Michael Hanselmann
  To be used with functions submitting new jobs.
1624 c8d0be94 Michael Hanselmann

1625 c8d0be94 Michael Hanselmann
  """
1626 c8d0be94 Michael Hanselmann
  def wrapper(self, *args, **kwargs):
1627 c8d0be94 Michael Hanselmann
    """Wrapper function.
1628 c8d0be94 Michael Hanselmann

1629 c8d0be94 Michael Hanselmann
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1630 c8d0be94 Michael Hanselmann

1631 c8d0be94 Michael Hanselmann
    """
1632 c8d0be94 Michael Hanselmann
    # Ok when sharing the big job queue lock, as the drain file is created when
1633 c8d0be94 Michael Hanselmann
    # the lock is exclusive.
1634 c8d0be94 Michael Hanselmann
    # Needs access to protected member, pylint: disable=W0212
1635 c8d0be94 Michael Hanselmann
    if self._drained:
1636 c8d0be94 Michael Hanselmann
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1637 6d5ea385 Michael Hanselmann
1638 6d5ea385 Michael Hanselmann
    if not self._accepting_jobs:
1639 6d5ea385 Michael Hanselmann
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1640 6d5ea385 Michael Hanselmann
1641 c8d0be94 Michael Hanselmann
    return fn(self, *args, **kwargs)
1642 c8d0be94 Michael Hanselmann
  return wrapper
1643 c8d0be94 Michael Hanselmann
1644 c8d0be94 Michael Hanselmann
1645 6c881c52 Iustin Pop
class JobQueue(object):
1646 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
1647 db37da70 Michael Hanselmann

1648 6c881c52 Iustin Pop
  """
1649 85f03e0d Michael Hanselmann
  def __init__(self, context):
1650 ea03467c Iustin Pop
    """Constructor for JobQueue.
1651 ea03467c Iustin Pop

1652 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
1653 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
1654 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
1655 ea03467c Iustin Pop
    running).
1656 ea03467c Iustin Pop

1657 ea03467c Iustin Pop
    @type context: GanetiContext
1658 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
1659 ea03467c Iustin Pop
        data and other ganeti objects
1660 ea03467c Iustin Pop

1661 ea03467c Iustin Pop
    """
1662 5bdce580 Michael Hanselmann
    self.context = context
1663 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
1664 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
1665 f1da30e6 Michael Hanselmann
1666 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
1667 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
1668 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
1669 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
1670 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
1671 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
1672 ebb80afa Guido Trotter
1673 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
1674 ebb80afa Guido Trotter
    self.release = self._lock.release
1675 85f03e0d Michael Hanselmann
1676 6d5ea385 Michael Hanselmann
    # Accept jobs by default
1677 6d5ea385 Michael Hanselmann
    self._accepting_jobs = True
1678 6d5ea385 Michael Hanselmann
1679 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
1680 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
1681 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1682 f1da30e6 Michael Hanselmann
1683 04ab05ce Michael Hanselmann
    # Read serial file
1684 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
1685 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
1686 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
1687 c4beba1c Iustin Pop
1688 23752136 Michael Hanselmann
    # Get initial list of nodes
1689 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
1690 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
1691 59303563 Iustin Pop
                       if n.master_candidate)
1692 8e00939c Michael Hanselmann
1693 8e00939c Michael Hanselmann
    # Remove master node
1694 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
1695 23752136 Michael Hanselmann
1696 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
1697 23752136 Michael Hanselmann
1698 6d5ea385 Michael Hanselmann
    self._queue_size = None
1699 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1700 6d5ea385 Michael Hanselmann
    assert ht.TInt(self._queue_size)
1701 ff699aa9 Michael Hanselmann
    self._drained = jstore.CheckDrainFlag()
1702 20571a26 Guido Trotter
1703 b95479a5 Michael Hanselmann
    # Job dependencies
1704 b95479a5 Michael Hanselmann
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1705 b95479a5 Michael Hanselmann
                                        self._EnqueueJobs)
1706 fcb21ad7 Michael Hanselmann
    self.context.glm.AddToLockMonitor(self.depmgr)
1707 b95479a5 Michael Hanselmann
1708 85f03e0d Michael Hanselmann
    # Setup worker pool
1709 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
1710 711b5124 Michael Hanselmann
1711 fcd70b89 Klaus Aehlig
  def _PickupJobUnlocked(self, job_id):
1712 fcd70b89 Klaus Aehlig
    """Load a job from the job queue
1713 fcd70b89 Klaus Aehlig

1714 fcd70b89 Klaus Aehlig
    Pick up a job that already is in the job queue and start/resume it.
1715 fcd70b89 Klaus Aehlig

1716 fcd70b89 Klaus Aehlig
    """
1717 fcd70b89 Klaus Aehlig
    job = self._LoadJobUnlocked(job_id)
1718 fcd70b89 Klaus Aehlig
1719 fcd70b89 Klaus Aehlig
    if job is None:
1720 fcd70b89 Klaus Aehlig
      logging.warning("Job %s could not be read", job_id)
1721 fcd70b89 Klaus Aehlig
      return
1722 fcd70b89 Klaus Aehlig
1723 fcd70b89 Klaus Aehlig
    status = job.CalcStatus()
1724 fcd70b89 Klaus Aehlig
    if status == constants.JOB_STATUS_QUEUED:
1725 fcd70b89 Klaus Aehlig
      self._EnqueueJobsUnlocked([job])
1726 fcd70b89 Klaus Aehlig
      logging.info("Restarting job %s", job.id)
1727 fcd70b89 Klaus Aehlig
1728 fcd70b89 Klaus Aehlig
    elif status in (constants.JOB_STATUS_RUNNING,
1729 fcd70b89 Klaus Aehlig
                    constants.JOB_STATUS_WAITING,
1730 fcd70b89 Klaus Aehlig
                    constants.JOB_STATUS_CANCELING):
1731 fcd70b89 Klaus Aehlig
      logging.warning("Unfinished job %s found: %s", job.id, job)
1732 fcd70b89 Klaus Aehlig
1733 fcd70b89 Klaus Aehlig
      if status == constants.JOB_STATUS_WAITING:
1734 fcd70b89 Klaus Aehlig
        job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1735 fcd70b89 Klaus Aehlig
        self._EnqueueJobsUnlocked([job])
1736 fcd70b89 Klaus Aehlig
        logging.info("Restarting job %s", job.id)
1737 fcd70b89 Klaus Aehlig
      else:
1738 f3ac6f36 Klaus Aehlig
        to_encode = errors.OpExecError("Unclean master daemon shutdown")
1739 fcd70b89 Klaus Aehlig
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1740 f3ac6f36 Klaus Aehlig
                              _EncodeOpError(to_encode))
1741 fcd70b89 Klaus Aehlig
        job.Finalize()
1742 fcd70b89 Klaus Aehlig
1743 fcd70b89 Klaus Aehlig
    self.UpdateJobUnlocked(job)
1744 fcd70b89 Klaus Aehlig
1745 fcd70b89 Klaus Aehlig
  @locking.ssynchronized(_LOCK)
1746 fcd70b89 Klaus Aehlig
  def PickupJob(self, job_id):
1747 fcd70b89 Klaus Aehlig
    self._PickupJobUnlocked(job_id)
1748 fcd70b89 Klaus Aehlig
1749 fb1ffbca Michael Hanselmann
  def _GetRpc(self, address_list):
1750 fb1ffbca Michael Hanselmann
    """Gets RPC runner with context.
1751 fb1ffbca Michael Hanselmann

1752 fb1ffbca Michael Hanselmann
    """
1753 fb1ffbca Michael Hanselmann
    return rpc.JobQueueRunner(self.context, address_list)
1754 fb1ffbca Michael Hanselmann
1755 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1756 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1757 99aabbed Iustin Pop
  def AddNode(self, node):
1758 99aabbed Iustin Pop
    """Register a new node with the queue.
1759 99aabbed Iustin Pop

1760 99aabbed Iustin Pop
    @type node: L{objects.Node}
1761 99aabbed Iustin Pop
    @param node: the node object to be added
1762 99aabbed Iustin Pop

1763 99aabbed Iustin Pop
    """
1764 99aabbed Iustin Pop
    node_name = node.name
1765 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1766 23752136 Michael Hanselmann
1767 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1768 fb1ffbca Michael Hanselmann
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1769 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1770 c8457ce7 Iustin Pop
    if msg:
1771 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1772 c8457ce7 Iustin Pop
                      node_name, msg)
1773 23752136 Michael Hanselmann
1774 59303563 Iustin Pop
    if not node.master_candidate:
1775 59303563 Iustin Pop
      # remove if existing, ignoring errors
1776 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1777 59303563 Iustin Pop
      # and skip the replication of the job ids
1778 59303563 Iustin Pop
      return
1779 59303563 Iustin Pop
1780 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1781 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1782 23752136 Michael Hanselmann
1783 d2e03a33 Michael Hanselmann
    # Upload current serial file
1784 e2b4a7ba Michael Hanselmann
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1785 d2e03a33 Michael Hanselmann
1786 fb1ffbca Michael Hanselmann
    # Static address list
1787 fb1ffbca Michael Hanselmann
    addrs = [node.primary_ip]
1788 fb1ffbca Michael Hanselmann
1789 d2e03a33 Michael Hanselmann
    for file_name in files:
1790 9f774ee8 Michael Hanselmann
      # Read file content
1791 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1792 9f774ee8 Michael Hanselmann
1793 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1794 cffbbae7 Michael Hanselmann
                             file_name, content)
1795 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1796 c8457ce7 Iustin Pop
      if msg:
1797 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1798 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1799 d2e03a33 Michael Hanselmann
1800 be6c403e Michael Hanselmann
    # Set queue drained flag
1801 be6c403e Michael Hanselmann
    result = \
1802 be6c403e Michael Hanselmann
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1803 be6c403e Michael Hanselmann
                                                       self._drained)
1804 be6c403e Michael Hanselmann
    msg = result[node_name].fail_msg
1805 be6c403e Michael Hanselmann
    if msg:
1806 be6c403e Michael Hanselmann
      logging.error("Failed to set queue drained flag on node %s: %s",
1807 be6c403e Michael Hanselmann
                    node_name, msg)
1808 be6c403e Michael Hanselmann
1809 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1810 d2e03a33 Michael Hanselmann
1811 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1812 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1813 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1814 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1815 ea03467c Iustin Pop

1816 ea03467c Iustin Pop
    @type node_name: str
1817 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1818 ea03467c Iustin Pop

1819 ea03467c Iustin Pop
    """
1820 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1821 23752136 Michael Hanselmann
1822 7e950d31 Iustin Pop
  @staticmethod
1823 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1824 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1825 ea03467c Iustin Pop

1826 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1827 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1828 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1829 ea03467c Iustin Pop

1830 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1831 ea03467c Iustin Pop
    @type nodes: list
1832 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1833 ea03467c Iustin Pop
    @type failmsg: str
1834 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1835 ea03467c Iustin Pop

1836 ea03467c Iustin Pop
    """
1837 e74798c1 Michael Hanselmann
    failed = []
1838 e74798c1 Michael Hanselmann
    success = []
1839 e74798c1 Michael Hanselmann
1840 e74798c1 Michael Hanselmann
    for node in nodes:
1841 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1842 c8457ce7 Iustin Pop
      if msg:
1843 e74798c1 Michael Hanselmann
        failed.append(node)
1844 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1845 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1846 c8457ce7 Iustin Pop
      else:
1847 c8457ce7 Iustin Pop
        success.append(node)
1848 e74798c1 Michael Hanselmann
1849 e74798c1 Michael Hanselmann
    # +1 for the master node
1850 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1851 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1852 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1853 e74798c1 Michael Hanselmann
1854 99aabbed Iustin Pop
  def _GetNodeIp(self):
1855 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1856 99aabbed Iustin Pop

1857 ea03467c Iustin Pop
    @rtype: (list, list)
1858 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1859 ea03467c Iustin Pop
        names and the second one with the node addresses
1860 ea03467c Iustin Pop

1861 99aabbed Iustin Pop
    """
1862 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1863 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1864 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1865 99aabbed Iustin Pop
    return name_list, addr_list
1866 99aabbed Iustin Pop
1867 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1868 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1869 8e00939c Michael Hanselmann

1870 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1871 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1872 ea03467c Iustin Pop

1873 ea03467c Iustin Pop
    @type file_name: str
1874 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1875 ea03467c Iustin Pop
    @type data: str
1876 ea03467c Iustin Pop
    @param data: the new contents of the file
1877 4c36bdf5 Guido Trotter
    @type replicate: boolean
1878 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1879 ea03467c Iustin Pop

1880 8e00939c Michael Hanselmann
    """
1881 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1882 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1883 fe05a931 Michele Tartara
                    gid=getents.daemons_gid,
1884 fe05a931 Michele Tartara
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1885 8e00939c Michael Hanselmann
1886 4c36bdf5 Guido Trotter
    if replicate:
1887 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1888 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1889 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1890 23752136 Michael Hanselmann
1891 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1892 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1893 ea03467c Iustin Pop

1894 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1895 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1896 ea03467c Iustin Pop

1897 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1898 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1899 ea03467c Iustin Pop

1900 ea03467c Iustin Pop
    """
1901 dd875d32 Michael Hanselmann
    # Rename them locally
1902 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1903 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1904 abc1f2ce Michael Hanselmann
1905 dd875d32 Michael Hanselmann
    # ... and on all nodes
1906 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1907 fb1ffbca Michael Hanselmann
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1908 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1909 abc1f2ce Michael Hanselmann
1910 85f03e0d Michael Hanselmann
  @staticmethod
1911 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1912 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1913 ea03467c Iustin Pop

1914 ea03467c Iustin Pop
    @type job_id: str
1915 ea03467c Iustin Pop
    @param job_id: the job identifier
1916 ea03467c Iustin Pop
    @rtype: str
1917 ea03467c Iustin Pop
    @return: the path to the job file
1918 ea03467c Iustin Pop

1919 ea03467c Iustin Pop
    """
1920 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1921 f1da30e6 Michael Hanselmann
1922 1410a389 Michael Hanselmann
  @staticmethod
1923 1410a389 Michael Hanselmann
  def _GetArchivedJobPath(job_id):
1924 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1925 ea03467c Iustin Pop

1926 ea03467c Iustin Pop
    @type job_id: str
1927 ea03467c Iustin Pop
    @param job_id: the job identifier
1928 ea03467c Iustin Pop
    @rtype: str
1929 ea03467c Iustin Pop
    @return: the path to the archived job file
1930 ea03467c Iustin Pop

1931 ea03467c Iustin Pop
    """
1932 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1933 1410a389 Michael Hanselmann
                          jstore.GetArchiveDirectory(job_id),
1934 1410a389 Michael Hanselmann
                          "job-%s" % job_id)
1935 0cb94105 Michael Hanselmann
1936 cb66225d Michael Hanselmann
  @staticmethod
1937 0422250e Michael Hanselmann
  def _DetermineJobDirectories(archived):
1938 bb921668 Michael Hanselmann
    """Build list of directories containing job files.
1939 bb921668 Michael Hanselmann

1940 bb921668 Michael Hanselmann
    @type archived: bool
1941 bb921668 Michael Hanselmann
    @param archived: Whether to include directories for archived jobs
1942 bb921668 Michael Hanselmann
    @rtype: list
1943 bb921668 Michael Hanselmann

1944 bb921668 Michael Hanselmann
    """
1945 0422250e Michael Hanselmann
    result = [pathutils.QUEUE_DIR]
1946 0422250e Michael Hanselmann
1947 0422250e Michael Hanselmann
    if archived:
1948 0422250e Michael Hanselmann
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1949 0422250e Michael Hanselmann
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1950 0422250e Michael Hanselmann
                        utils.ListVisibleFiles(archive_path)))
1951 0422250e Michael Hanselmann
1952 0422250e Michael Hanselmann
    return result
1953 0422250e Michael Hanselmann
1954 0422250e Michael Hanselmann
  @classmethod
1955 0422250e Michael Hanselmann
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1956 911a495b Iustin Pop
    """Return all known job IDs.
1957 911a495b Iustin Pop

1958 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1959 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1960 ac0930b9 Iustin Pop
    extra IDs).
1961 ac0930b9 Iustin Pop

1962 85a1c57d Guido Trotter
    @type sort: boolean
1963 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1964 ea03467c Iustin Pop
    @rtype: list
1965 ea03467c Iustin Pop
    @return: the list of job IDs
1966 ea03467c Iustin Pop

1967 911a495b Iustin Pop
    """
1968 85a1c57d Guido Trotter
    jlist = []
1969 0422250e Michael Hanselmann
1970 0422250e Michael Hanselmann
    for path in cls._DetermineJobDirectories(archived):
1971 0422250e Michael Hanselmann
      for filename in utils.ListVisibleFiles(path):
1972 0422250e Michael Hanselmann
        m = constants.JOB_FILE_RE.match(filename)
1973 0422250e Michael Hanselmann
        if m:
1974 0422250e Michael Hanselmann
          jlist.append(int(m.group(1)))
1975 0422250e Michael Hanselmann
1976 85a1c57d Guido Trotter
    if sort:
1977 76b62028 Iustin Pop
      jlist.sort()
1978 f0d874fe Iustin Pop
    return jlist
1979 911a495b Iustin Pop
1980 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1981 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1982 ea03467c Iustin Pop

1983 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1984 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1985 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1986 ea03467c Iustin Pop

1987 76b62028 Iustin Pop
    @type job_id: int
1988 ea03467c Iustin Pop
    @param job_id: the job id
1989 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1990 ea03467c Iustin Pop
    @return: either None or the job object
1991 ea03467c Iustin Pop

1992 ea03467c Iustin Pop
    """
1993 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
1994 5685c1a5 Michael Hanselmann
    if job:
1995 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
1996 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Found read-only job in memcache"
1997 5685c1a5 Michael Hanselmann
      return job
1998 ac0930b9 Iustin Pop
1999 3d6c5566 Guido Trotter
    try:
2000 194c8ca4 Michael Hanselmann
      job = self._LoadJobFromDisk(job_id, False)
2001 aa9f8167 Iustin Pop
      if job is None:
2002 aa9f8167 Iustin Pop
        return job
2003 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
2004 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
2005 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
2006 3d6c5566 Guido Trotter
      if old_path == new_path:
2007 3d6c5566 Guido Trotter
        # job already archived (future case)
2008 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
2009 3d6c5566 Guido Trotter
      else:
2010 3d6c5566 Guido Trotter
        # non-archived case
2011 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
2012 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
2013 3d6c5566 Guido Trotter
      return None
2014 162c8636 Guido Trotter
2015 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Job just loaded is not writable"
2016 c0f6d0d8 Michael Hanselmann
2017 162c8636 Guido Trotter
    self._memcache[job_id] = job
2018 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
2019 162c8636 Guido Trotter
    return job
2020 162c8636 Guido Trotter
2021 c0f6d0d8 Michael Hanselmann
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2022 162c8636 Guido Trotter
    """Load the given job file from disk.
2023 162c8636 Guido Trotter

2024 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2025 162c8636 Guido Trotter

2026 76b62028 Iustin Pop
    @type job_id: int
2027 162c8636 Guido Trotter
    @param job_id: job identifier
2028 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2029 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2030 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
2031 162c8636 Guido Trotter
    @return: either None or the job object
2032 162c8636 Guido Trotter

2033 162c8636 Guido Trotter
    """
2034 8a3cd185 Michael Hanselmann
    path_functions = [(self._GetJobPath, False)]
2035 194c8ca4 Michael Hanselmann
2036 194c8ca4 Michael Hanselmann
    if try_archived:
2037 8a3cd185 Michael Hanselmann
      path_functions.append((self._GetArchivedJobPath, True))
2038 194c8ca4 Michael Hanselmann
2039 194c8ca4 Michael Hanselmann
    raw_data = None
2040 8a3cd185 Michael Hanselmann
    archived = None
2041 194c8ca4 Michael Hanselmann
2042 8a3cd185 Michael Hanselmann
    for (fn, archived) in path_functions:
2043 194c8ca4 Michael Hanselmann
      filepath = fn(job_id)
2044 194c8ca4 Michael Hanselmann
      logging.debug("Loading job from %s", filepath)
2045 194c8ca4 Michael Hanselmann
      try:
2046 194c8ca4 Michael Hanselmann
        raw_data = utils.ReadFile(filepath)
2047 194c8ca4 Michael Hanselmann
      except EnvironmentError, err:
2048 194c8ca4 Michael Hanselmann
        if err.errno != errno.ENOENT:
2049 194c8ca4 Michael Hanselmann
          raise
2050 194c8ca4 Michael Hanselmann
      else:
2051 194c8ca4 Michael Hanselmann
        break
2052 194c8ca4 Michael Hanselmann
2053 194c8ca4 Michael Hanselmann
    if not raw_data:
2054 194c8ca4 Michael Hanselmann
      return None
2055 13998ef2 Michael Hanselmann
2056 c0f6d0d8 Michael Hanselmann
    if writable is None:
2057 8a3cd185 Michael Hanselmann
      writable = not archived
2058 c0f6d0d8 Michael Hanselmann
2059 94ed59a5 Iustin Pop
    try:
2060 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
2061 8a3cd185 Michael Hanselmann
      job = _QueuedJob.Restore(self, data, writable, archived)
2062 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
2063 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
2064 94ed59a5 Iustin Pop
2065 ac0930b9 Iustin Pop
    return job
2066 f1da30e6 Michael Hanselmann
2067 c0f6d0d8 Michael Hanselmann
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2068 0f9c08dc Guido Trotter
    """Load the given job file from disk.
2069 0f9c08dc Guido Trotter

2070 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2071 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
2072 0f9c08dc Guido Trotter
    exception is logged.
2073 0f9c08dc Guido Trotter

2074 76b62028 Iustin Pop
    @type job_id: int
2075 0f9c08dc Guido Trotter
    @param job_id: job identifier
2076 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2077 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2078 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
2079 0f9c08dc Guido Trotter
    @return: either None or the job object
2080 0f9c08dc Guido Trotter

2081 0f9c08dc Guido Trotter
    """
2082 0f9c08dc Guido Trotter
    try:
2083 c0f6d0d8 Michael Hanselmann
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2084 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
2085 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
2086 0f9c08dc Guido Trotter
      return None
2087 0f9c08dc Guido Trotter
2088 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
2089 20571a26 Guido Trotter
    """Update the queue size.
2090 20571a26 Guido Trotter

2091 20571a26 Guido Trotter
    """
2092 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2093 20571a26 Guido Trotter
2094 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2095 20571a26 Guido Trotter
  @_RequireOpenQueue
2096 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
2097 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
2098 3ccafd0e Iustin Pop

2099 ea03467c Iustin Pop
    @type drain_flag: boolean
2100 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
2101 ea03467c Iustin Pop

2102 3ccafd0e Iustin Pop
    """
2103 be6c403e Michael Hanselmann
    # Change flag locally
2104 ff699aa9 Michael Hanselmann
    jstore.SetDrainFlag(drain_flag)
2105 20571a26 Guido Trotter
2106 20571a26 Guido Trotter
    self._drained = drain_flag
2107 20571a26 Guido Trotter
2108 be6c403e Michael Hanselmann
    # ... and on all nodes
2109 be6c403e Michael Hanselmann
    (names, addrs) = self._GetNodeIp()
2110 be6c403e Michael Hanselmann
    result = \
2111 be6c403e Michael Hanselmann
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2112 be6c403e Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
2113 be6c403e Michael Hanselmann
                         "Setting queue drain flag to %s" % drain_flag)
2114 be6c403e Michael Hanselmann
2115 3ccafd0e Iustin Pop
    return True
2116 3ccafd0e Iustin Pop
2117 704b51ff Klaus Aehlig
  @classmethod
2118 704b51ff Klaus Aehlig
  def SubmitJob(cls, ops):
2119 2971c913 Iustin Pop
    """Create and store a new job.
2120 2971c913 Iustin Pop

2121 2971c913 Iustin Pop
    """
2122 704b51ff Klaus Aehlig
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops)
2123 2971c913 Iustin Pop
2124 704b51ff Klaus Aehlig
  @classmethod
2125 704b51ff Klaus Aehlig
  def SubmitJobToDrainedQueue(cls, ops):
2126 346c3037 Klaus Aehlig
    """Forcefully create and store a new job.
2127 346c3037 Klaus Aehlig

2128 346c3037 Klaus Aehlig
    Do so, even if the job queue is drained.
2129 346c3037 Klaus Aehlig

2130 346c3037 Klaus Aehlig
    """
2131 704b51ff Klaus Aehlig
    return luxi.Client(address=pathutils.QUERY_SOCKET)\
2132 704b51ff Klaus Aehlig
        .SubmitJobToDrainedQueue(ops)
2133 346c3037 Klaus Aehlig
2134 704b51ff Klaus Aehlig
  @classmethod
2135 704b51ff Klaus Aehlig
  def SubmitManyJobs(cls, jobs):
2136 2971c913 Iustin Pop
    """Create and store multiple jobs.
2137 2971c913 Iustin Pop

2138 2971c913 Iustin Pop
    """
2139 704b51ff Klaus Aehlig
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
2140 2971c913 Iustin Pop
2141 b247c6fc Michael Hanselmann
  @staticmethod
2142 b247c6fc Michael Hanselmann
  def _FormatSubmitError(msg, ops):
2143 b247c6fc Michael Hanselmann
    """Formats errors which occurred while submitting a job.
2144 b247c6fc Michael Hanselmann

2145 b247c6fc Michael Hanselmann
    """
2146 b247c6fc Michael Hanselmann
    return ("%s; opcodes %s" %
2147 b247c6fc Michael Hanselmann
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2148 b247c6fc Michael Hanselmann
2149 b247c6fc Michael Hanselmann
  @staticmethod
2150 b247c6fc Michael Hanselmann
  def _ResolveJobDependencies(resolve_fn, deps):
2151 b247c6fc Michael Hanselmann
    """Resolves relative job IDs in dependencies.
2152 b247c6fc Michael Hanselmann

2153 b247c6fc Michael Hanselmann
    @type resolve_fn: callable
2154 b247c6fc Michael Hanselmann
    @param resolve_fn: Function to resolve a relative job ID
2155 b247c6fc Michael Hanselmann
    @type deps: list
2156 b247c6fc Michael Hanselmann
    @param deps: Dependencies
2157 4c27b231 Michael Hanselmann
    @rtype: tuple; (boolean, string or list)
2158 4c27b231 Michael Hanselmann
    @return: If successful (first tuple item), the returned list contains
2159 4c27b231 Michael Hanselmann
      resolved job IDs along with the requested status; if not successful,
2160 4c27b231 Michael Hanselmann
      the second element is an error message
2161 b247c6fc Michael Hanselmann

2162 b247c6fc Michael Hanselmann
    """
2163 b247c6fc Michael Hanselmann
    result = []
2164 b247c6fc Michael Hanselmann
2165 b247c6fc Michael Hanselmann
    for (dep_job_id, dep_status) in deps:
2166 b247c6fc Michael Hanselmann
      if ht.TRelativeJobId(dep_job_id):
2167 b247c6fc Michael Hanselmann
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2168 b247c6fc Michael Hanselmann
        try:
2169 b247c6fc Michael Hanselmann
          job_id = resolve_fn(dep_job_id)
2170 b247c6fc Michael Hanselmann
        except IndexError:
2171 b247c6fc Michael Hanselmann
          # Abort
2172 b247c6fc Michael Hanselmann
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2173 b247c6fc Michael Hanselmann
      else:
2174 b247c6fc Michael Hanselmann
        job_id = dep_job_id
2175 b247c6fc Michael Hanselmann
2176 b247c6fc Michael Hanselmann
      result.append((job_id, dep_status))
2177 b247c6fc Michael Hanselmann
2178 b247c6fc Michael Hanselmann
    return (True, result)
2179 b247c6fc Michael Hanselmann
2180 75d81fc8 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2181 7b5c4a69 Michael Hanselmann
  def _EnqueueJobs(self, jobs):
2182 7b5c4a69 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2183 7b5c4a69 Michael Hanselmann

2184 7b5c4a69 Michael Hanselmann
    @type jobs: list
2185 7b5c4a69 Michael Hanselmann
    @param jobs: List of all jobs
2186 7b5c4a69 Michael Hanselmann

2187 7b5c4a69 Michael Hanselmann
    """
2188 75d81fc8 Michael Hanselmann
    return self._EnqueueJobsUnlocked(jobs)
2189 75d81fc8 Michael Hanselmann
2190 75d81fc8 Michael Hanselmann
  def _EnqueueJobsUnlocked(self, jobs):
2191 75d81fc8 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2192 75d81fc8 Michael Hanselmann

2193 75d81fc8 Michael Hanselmann
    @type jobs: list
2194 75d81fc8 Michael Hanselmann
    @param jobs: List of all jobs
2195 75d81fc8 Michael Hanselmann

2196 75d81fc8 Michael Hanselmann
    """
2197 75d81fc8 Michael Hanselmann
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2198 7b5c4a69 Michael Hanselmann
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2199 99fb250b Michael Hanselmann
                             priority=[job.CalcPriority() for job in jobs],
2200 99fb250b Michael Hanselmann
                             task_id=map(_GetIdAttr, jobs))
2201 7b5c4a69 Michael Hanselmann
2202 b95479a5 Michael Hanselmann
  def _GetJobStatusForDependencies(self, job_id):
2203 b95479a5 Michael Hanselmann
    """Gets the status of a job for dependencies.
2204 b95479a5 Michael Hanselmann

2205 76b62028 Iustin Pop
    @type job_id: int
2206 b95479a5 Michael Hanselmann
    @param job_id: Job ID
2207 b95479a5 Michael Hanselmann
    @raise errors.JobLost: If job can't be found
2208 b95479a5 Michael Hanselmann

2209 b95479a5 Michael Hanselmann
    """
2210 b95479a5 Michael Hanselmann
    # Not using in-memory cache as doing so would require an exclusive lock
2211 b95479a5 Michael Hanselmann
2212 b95479a5 Michael Hanselmann
    # Try to load from disk
2213 c0f6d0d8 Michael Hanselmann
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2214 c0f6d0d8 Michael Hanselmann
2215 17385bd2 Andrea Spadaccini
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2216 b95479a5 Michael Hanselmann
2217 b95479a5 Michael Hanselmann
    if job:
2218 b95479a5 Michael Hanselmann
      return job.CalcStatus()
2219 b95479a5 Michael Hanselmann
2220 b95479a5 Michael Hanselmann
    raise errors.JobLost("Job %s not found" % job_id)
2221 b95479a5 Michael Hanselmann
2222 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2223 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
2224 ea03467c Iustin Pop
    """Update a job's on disk storage.
2225 ea03467c Iustin Pop

2226 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
2227 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
2228 ea03467c Iustin Pop
    nodes.
2229 ea03467c Iustin Pop

2230 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
2231 ea03467c Iustin Pop
    @param job: the changed job
2232 4c36bdf5 Guido Trotter
    @type replicate: boolean
2233 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
2234 ea03467c Iustin Pop

2235 ea03467c Iustin Pop
    """
2236 66bd7445 Michael Hanselmann
    if __debug__:
2237 66bd7445 Michael Hanselmann
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2238 66bd7445 Michael Hanselmann
      assert (finalized ^ (job.end_timestamp is None))
2239 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't update read-only job"
2240 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't update archived job"
2241 66bd7445 Michael Hanselmann
2242 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
2243 a182a3ed Michael Hanselmann
    data = serializer.DumpJson(job.Serialize())
2244 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
2245 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
2246 ac0930b9 Iustin Pop
2247 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2248 5c735209 Iustin Pop
                        timeout):
2249 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
2250 6c5a7090 Michael Hanselmann

2251 76b62028 Iustin Pop
    @type job_id: int
2252 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
2253 6c5a7090 Michael Hanselmann
    @type fields: list of strings
2254 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
2255 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
2256 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
2257 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
2258 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
2259 5c735209 Iustin Pop
    @type timeout: float
2260 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
2261 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
2262 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
2263 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
2264 ea03467c Iustin Pop

2265 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
2266 ea03467c Iustin Pop
        we instead return a special value,
2267 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2268 ea03467c Iustin Pop
        as such by the clients
2269 6c5a7090 Michael Hanselmann

2270 6c5a7090 Michael Hanselmann
    """
2271 04569469 Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2272 c0f6d0d8 Michael Hanselmann
                             writable=False)
2273 989a8bee Michael Hanselmann
2274 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
2275 989a8bee Michael Hanselmann
2276 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
2277 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
2278 dfe57c22 Michael Hanselmann
2279 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2280 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2281 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
2282 188c5e0a Michael Hanselmann
    """Cancels a job.
2283 188c5e0a Michael Hanselmann

2284 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
2285 ea03467c Iustin Pop

2286 76b62028 Iustin Pop
    @type job_id: int
2287 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
2288 188c5e0a Michael Hanselmann

2289 188c5e0a Michael Hanselmann
    """
2290 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
2291 188c5e0a Michael Hanselmann
2292 aebd0e4e Michael Hanselmann
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2293 aebd0e4e Michael Hanselmann
2294 4679547e Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2295 4679547e Michael Hanselmann
  @_RequireOpenQueue
2296 4679547e Michael Hanselmann
  def ChangeJobPriority(self, job_id, priority):
2297 4679547e Michael Hanselmann
    """Changes a job's priority.
2298 4679547e Michael Hanselmann

2299 4679547e Michael Hanselmann
    @type job_id: int
2300 4679547e Michael Hanselmann
    @param job_id: ID of the job whose priority should be changed
2301 4679547e Michael Hanselmann
    @type priority: int
2302 4679547e Michael Hanselmann
    @param priority: New priority
2303 4679547e Michael Hanselmann

2304 4679547e Michael Hanselmann
    """
2305 4679547e Michael Hanselmann
    logging.info("Changing priority of job %s to %s", job_id, priority)
2306 4679547e Michael Hanselmann
2307 4679547e Michael Hanselmann
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2308 4679547e Michael Hanselmann
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2309 4679547e Michael Hanselmann
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2310 4679547e Michael Hanselmann
                                (priority, allowed))
2311 4679547e Michael Hanselmann
2312 4679547e Michael Hanselmann
    def fn(job):
2313 4679547e Michael Hanselmann
      (success, msg) = job.ChangePriority(priority)
2314 4679547e Michael Hanselmann
2315 4679547e Michael Hanselmann
      if success:
2316 4679547e Michael Hanselmann
        try:
2317 4679547e Michael Hanselmann
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2318 4679547e Michael Hanselmann
        except workerpool.NoSuchTask:
2319 4679547e Michael Hanselmann
          logging.debug("Job %s is not in workerpool at this time", job.id)
2320 4679547e Michael Hanselmann
2321 4679547e Michael Hanselmann
      return (success, msg)
2322 4679547e Michael Hanselmann
2323 4679547e Michael Hanselmann
    return self._ModifyJobUnlocked(job_id, fn)
2324 4679547e Michael Hanselmann
2325 aebd0e4e Michael Hanselmann
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2326 aebd0e4e Michael Hanselmann
    """Modifies a job.
2327 aebd0e4e Michael Hanselmann

2328 aebd0e4e Michael Hanselmann
    @type job_id: int
2329 aebd0e4e Michael Hanselmann
    @param job_id: Job ID
2330 aebd0e4e Michael Hanselmann
    @type mod_fn: callable
2331 aebd0e4e Michael Hanselmann
    @param mod_fn: Modifying function, receiving job object as parameter,
2332 aebd0e4e Michael Hanselmann
      returning tuple of (status boolean, message string)
2333 aebd0e4e Michael Hanselmann

2334 aebd0e4e Michael Hanselmann
    """
2335 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2336 188c5e0a Michael Hanselmann
    if not job:
2337 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2338 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
2339 fbf0262f Michael Hanselmann
2340 aebd0e4e Michael Hanselmann
    assert job.writable, "Can't modify read-only job"
2341 aebd0e4e Michael Hanselmann
    assert not job.archived, "Can't modify archived job"
2342 c0f6d0d8 Michael Hanselmann
2343 aebd0e4e Michael Hanselmann
    (success, msg) = mod_fn(job)
2344 188c5e0a Michael Hanselmann
2345 099b2870 Michael Hanselmann
    if success:
2346 66bd7445 Michael Hanselmann
      # If the job was finalized (e.g. cancelled), this is the final write
2347 66bd7445 Michael Hanselmann
      # allowed. The job can be archived anytime.
2348 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
2349 fbf0262f Michael Hanselmann
2350 099b2870 Michael Hanselmann
    return (success, msg)
2351 fbf0262f Michael Hanselmann
2352 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
2353 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
2354 d7fd1f28 Michael Hanselmann
    """Archives jobs.
2355 c609f802 Michael Hanselmann

2356 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
2357 25e7b43f Iustin Pop
    @param jobs: Job objects
2358 d7fd1f28 Michael Hanselmann
    @rtype: int
2359 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
2360 c609f802 Michael Hanselmann

2361 c609f802 Michael Hanselmann
    """
2362 d7fd1f28 Michael Hanselmann
    archive_jobs = []
2363 d7fd1f28 Michael Hanselmann
    rename_files = []
2364 d7fd1f28 Michael Hanselmann
    for job in jobs:
2365 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't archive read-only job"
2366 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't cancel archived job"
2367 c0f6d0d8 Michael Hanselmann
2368 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2369 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
2370 d7fd1f28 Michael Hanselmann
        continue
2371 c609f802 Michael Hanselmann
2372 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
2373 c609f802 Michael Hanselmann
2374 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
2375 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
2376 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
2377 c609f802 Michael Hanselmann
2378 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
2379 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
2380 f1da30e6 Michael Hanselmann
2381 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
2382 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
2383 d7fd1f28 Michael Hanselmann
2384 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2385 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
2386 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
2387 20571a26 Guido Trotter
    # archived jobs to fix this.
2388 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
2389 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
2390 78d12585 Michael Hanselmann
2391 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2392 07cd723a Iustin Pop
  @_RequireOpenQueue
2393 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
2394 07cd723a Iustin Pop
    """Archives a job.
2395 07cd723a Iustin Pop

2396 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2397 ea03467c Iustin Pop

2398 76b62028 Iustin Pop
    @type job_id: int
2399 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
2400 78d12585 Michael Hanselmann
    @rtype: bool
2401 78d12585 Michael Hanselmann
    @return: Whether job was archived
2402 07cd723a Iustin Pop

2403 07cd723a Iustin Pop
    """
2404 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
2405 78d12585 Michael Hanselmann
2406 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2407 78d12585 Michael Hanselmann
    if not job:
2408 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2409 78d12585 Michael Hanselmann
      return False
2410 78d12585 Michael Hanselmann
2411 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
2412 07cd723a Iustin Pop
2413 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2414 07cd723a Iustin Pop
  @_RequireOpenQueue
2415 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
2416 07cd723a Iustin Pop
    """Archives all jobs based on age.
2417 07cd723a Iustin Pop

2418 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
2419 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
2420 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
2421 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
2422 07cd723a Iustin Pop

2423 07cd723a Iustin Pop
    @type age: int
2424 07cd723a Iustin Pop
    @param age: the minimum age in seconds
2425 07cd723a Iustin Pop

2426 07cd723a Iustin Pop
    """
2427 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
2428 07cd723a Iustin Pop
2429 07cd723a Iustin Pop
    now = time.time()
2430 f8ad5591 Michael Hanselmann
    end_time = now + timeout
2431 f8ad5591 Michael Hanselmann
    archived_count = 0
2432 f8ad5591 Michael Hanselmann
    last_touched = 0
2433 f8ad5591 Michael Hanselmann
2434 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
2435 d7fd1f28 Michael Hanselmann
    pending = []
2436 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
2437 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
2438 f8ad5591 Michael Hanselmann
2439 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
2440 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
2441 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
2442 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
2443 f8ad5591 Michael Hanselmann
        break
2444 f8ad5591 Michael Hanselmann
2445 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
2446 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
2447 f8ad5591 Michael Hanselmann
      if job:
2448 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
2449 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
2450 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
2451 f8ad5591 Michael Hanselmann
          else:
2452 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
2453 07cd723a Iustin Pop
        else:
2454 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
2455 f8ad5591 Michael Hanselmann
2456 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
2457 d7fd1f28 Michael Hanselmann
          pending.append(job)
2458 d7fd1f28 Michael Hanselmann
2459 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
2460 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
2461 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
2462 d7fd1f28 Michael Hanselmann
            pending = []
2463 f8ad5591 Michael Hanselmann
2464 d7fd1f28 Michael Hanselmann
    if pending:
2465 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
2466 07cd723a Iustin Pop
2467 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
2468 07cd723a Iustin Pop
2469 e07f7f7a Michael Hanselmann
  def _Query(self, fields, qfilter):
2470 e07f7f7a Michael Hanselmann
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2471 e07f7f7a Michael Hanselmann
                       namefield="id")
2472 e07f7f7a Michael Hanselmann
2473 0422250e Michael Hanselmann
    # Archived jobs are only looked at if the "archived" field is referenced
2474 0422250e Michael Hanselmann
    # either as a requested field or in the filter. By default archived jobs
2475 0422250e Michael Hanselmann
    # are ignored.
2476 0422250e Michael Hanselmann
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2477 0422250e Michael Hanselmann
2478 e07f7f7a Michael Hanselmann
    job_ids = qobj.RequestedNames()
2479 e07f7f7a Michael Hanselmann
2480 e07f7f7a Michael Hanselmann
    list_all = (job_ids is None)
2481 e07f7f7a Michael Hanselmann
2482 e07f7f7a Michael Hanselmann
    if list_all:
2483 e07f7f7a Michael Hanselmann
      # Since files are added to/removed from the queue atomically, there's no
2484 e07f7f7a Michael Hanselmann
      # risk of getting the job ids in an inconsistent state.
2485 0422250e Michael Hanselmann
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2486 e07f7f7a Michael Hanselmann
2487 e07f7f7a Michael Hanselmann
    jobs = []
2488 e07f7f7a Michael Hanselmann
2489 e07f7f7a Michael Hanselmann
    for job_id in job_ids:
2490 e07f7f7a Michael Hanselmann
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2491 e07f7f7a Michael Hanselmann
      if job is not None or not list_all:
2492 e07f7f7a Michael Hanselmann
        jobs.append((job_id, job))
2493 e07f7f7a Michael Hanselmann
2494 e07f7f7a Michael Hanselmann
    return (qobj, jobs, list_all)
2495 e07f7f7a Michael Hanselmann
2496 e07f7f7a Michael Hanselmann
  def QueryJobs(self, fields, qfilter):
2497 e07f7f7a Michael Hanselmann
    """Returns a list of jobs in queue.
2498 e07f7f7a Michael Hanselmann

2499 e07f7f7a Michael Hanselmann
    @type fields: sequence
2500 e07f7f7a Michael Hanselmann
    @param fields: List of wanted fields
2501 e07f7f7a Michael Hanselmann
    @type qfilter: None or query2 filter (list)
2502 e07f7f7a Michael Hanselmann
    @param qfilter: Query filter
2503 e07f7f7a Michael Hanselmann

2504 e07f7f7a Michael Hanselmann
    """
2505 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2506 e07f7f7a Michael Hanselmann
2507 76b62028 Iustin Pop
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2508 e07f7f7a Michael Hanselmann
2509 e07f7f7a Michael Hanselmann
  def OldStyleQueryJobs(self, job_ids, fields):
2510 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
2511 e2715f69 Michael Hanselmann

2512 ea03467c Iustin Pop
    @type job_ids: list
2513 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
2514 ea03467c Iustin Pop
    @type fields: list
2515 ea03467c Iustin Pop
    @param fields: names of fields to return
2516 ea03467c Iustin Pop
    @rtype: list
2517 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
2518 ea03467c Iustin Pop
        the requested fields
2519 e2715f69 Michael Hanselmann

2520 e2715f69 Michael Hanselmann
    """
2521 76b62028 Iustin Pop
    # backwards compat:
2522 76b62028 Iustin Pop
    job_ids = [int(jid) for jid in job_ids]
2523 e07f7f7a Michael Hanselmann
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2524 e2715f69 Michael Hanselmann
2525 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2526 e2715f69 Michael Hanselmann
2527 76b62028 Iustin Pop
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2528 e2715f69 Michael Hanselmann
2529 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2530 6d5ea385 Michael Hanselmann
  def PrepareShutdown(self):
2531 6d5ea385 Michael Hanselmann
    """Prepare to stop the job queue.
2532 6d5ea385 Michael Hanselmann

2533 6d5ea385 Michael Hanselmann
    Disables execution of jobs in the workerpool and returns whether there are
2534 6d5ea385 Michael Hanselmann
    any jobs currently running. If the latter is the case, the job queue is not
2535 6d5ea385 Michael Hanselmann
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2536 6d5ea385 Michael Hanselmann
    be called without interfering with any job. Queued and unfinished jobs will
2537 6d5ea385 Michael Hanselmann
    be resumed next time.
2538 6d5ea385 Michael Hanselmann

2539 6d5ea385 Michael Hanselmann
    Once this function has been called no new job submissions will be accepted
2540 6d5ea385 Michael Hanselmann
    (see L{_RequireNonDrainedQueue}).
2541 6d5ea385 Michael Hanselmann

2542 6d5ea385 Michael Hanselmann
    @rtype: bool
2543 6d5ea385 Michael Hanselmann
    @return: Whether there are any running jobs
2544 6d5ea385 Michael Hanselmann

2545 6d5ea385 Michael Hanselmann
    """
2546 6d5ea385 Michael Hanselmann
    if self._accepting_jobs:
2547 6d5ea385 Michael Hanselmann
      self._accepting_jobs = False
2548 6d5ea385 Michael Hanselmann
2549 6d5ea385 Michael Hanselmann
      # Tell worker pool to stop processing pending tasks
2550 6d5ea385 Michael Hanselmann
      self._wpool.SetActive(False)
2551 6d5ea385 Michael Hanselmann
2552 6d5ea385 Michael Hanselmann
    return self._wpool.HasRunningTasks()
2553 6d5ea385 Michael Hanselmann
2554 942e2262 Michael Hanselmann
  def AcceptingJobsUnlocked(self):
2555 942e2262 Michael Hanselmann
    """Returns whether jobs are accepted.
2556 942e2262 Michael Hanselmann

2557 942e2262 Michael Hanselmann
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2558 942e2262 Michael Hanselmann
    queue is shutting down.
2559 942e2262 Michael Hanselmann

2560 942e2262 Michael Hanselmann
    @rtype: bool
2561 942e2262 Michael Hanselmann

2562 942e2262 Michael Hanselmann
    """
2563 942e2262 Michael Hanselmann
    return self._accepting_jobs
2564 942e2262 Michael Hanselmann
2565 6d5ea385 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2566 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2567 e2715f69 Michael Hanselmann
  def Shutdown(self):
2568 e2715f69 Michael Hanselmann
    """Stops the job queue.
2569 e2715f69 Michael Hanselmann

2570 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
2571 ea03467c Iustin Pop

2572 e2715f69 Michael Hanselmann
    """
2573 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
2574 85f03e0d Michael Hanselmann
2575 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
2576 a71f9c7d Guido Trotter
    self._queue_filelock = None