Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 653bc0f1

History | View | Annotate | Download (75.7 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 95a4e33f Hrvoje Ribicic
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 e2715f69 Michael Hanselmann
import logging
33 f1da30e6 Michael Hanselmann
import errno
34 f1048938 Iustin Pop
import time
35 5685c1a5 Michael Hanselmann
import weakref
36 b95479a5 Michael Hanselmann
import threading
37 dfc8824a Michael Hanselmann
import itertools
38 99fb250b Michael Hanselmann
import operator
39 498ae1cc Iustin Pop
40 6c2549d6 Guido Trotter
try:
41 b459a848 Andrea Spadaccini
  # pylint: disable=E0611
42 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
43 6c2549d6 Guido Trotter
except ImportError:
44 6c2549d6 Guido Trotter
  import pyinotify
45 6c2549d6 Guido Trotter
46 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
47 e2715f69 Michael Hanselmann
from ganeti import constants
48 f1da30e6 Michael Hanselmann
from ganeti import serializer
49 e2715f69 Michael Hanselmann
from ganeti import workerpool
50 99bd4f0a Guido Trotter
from ganeti import locking
51 704b51ff Klaus Aehlig
from ganeti import luxi
52 f1da30e6 Michael Hanselmann
from ganeti import opcodes
53 580b1fdd Jose A. Lopes
from ganeti import opcodes_base
54 7a1ecaed Iustin Pop
from ganeti import errors
55 e2715f69 Michael Hanselmann
from ganeti import mcpu
56 7996a135 Iustin Pop
from ganeti import utils
57 04ab05ce Michael Hanselmann
from ganeti import jstore
58 4869595d Petr Pudlak
import ganeti.rpc.node as rpc
59 82b22e19 René Nussbaumer
from ganeti import runtime
60 a744b676 Manuel Franceschini
from ganeti import netutils
61 989a8bee Michael Hanselmann
from ganeti import compat
62 b95479a5 Michael Hanselmann
from ganeti import ht
63 a06c6ae8 Michael Hanselmann
from ganeti import query
64 a06c6ae8 Michael Hanselmann
from ganeti import qlang
65 e2b4a7ba Michael Hanselmann
from ganeti import pathutils
66 cffbbae7 Michael Hanselmann
from ganeti import vcluster
67 e2715f69 Michael Hanselmann
68 fbf0262f Michael Hanselmann
69 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
70 e2715f69 Michael Hanselmann
71 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
72 ebb80afa Guido Trotter
_LOCK = "_lock"
73 ebb80afa Guido Trotter
_QUEUE = "_queue"
74 99bd4f0a Guido Trotter
75 99fb250b Michael Hanselmann
#: Retrieves "id" attribute
76 99fb250b Michael Hanselmann
_GetIdAttr = operator.attrgetter("id")
77 99fb250b Michael Hanselmann
78 498ae1cc Iustin Pop
79 9728ae5d Iustin Pop
class CancelJob(Exception):
80 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
81 fbf0262f Michael Hanselmann

82 fbf0262f Michael Hanselmann
  """
83 fbf0262f Michael Hanselmann
84 fbf0262f Michael Hanselmann
85 942e2262 Michael Hanselmann
class QueueShutdown(Exception):
86 942e2262 Michael Hanselmann
  """Special exception to abort a job when the job queue is shutting down.
87 942e2262 Michael Hanselmann

88 942e2262 Michael Hanselmann
  """
89 942e2262 Michael Hanselmann
90 942e2262 Michael Hanselmann
91 70552c46 Michael Hanselmann
def TimeStampNow():
92 ea03467c Iustin Pop
  """Returns the current timestamp.
93 ea03467c Iustin Pop

94 ea03467c Iustin Pop
  @rtype: tuple
95 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
96 ea03467c Iustin Pop

97 ea03467c Iustin Pop
  """
98 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
99 70552c46 Michael Hanselmann
100 70552c46 Michael Hanselmann
101 cffbbae7 Michael Hanselmann
def _CallJqUpdate(runner, names, file_name, content):
102 cffbbae7 Michael Hanselmann
  """Updates job queue file after virtualizing filename.
103 cffbbae7 Michael Hanselmann

104 cffbbae7 Michael Hanselmann
  """
105 cffbbae7 Michael Hanselmann
  virt_file_name = vcluster.MakeVirtualPath(file_name)
106 cffbbae7 Michael Hanselmann
  return runner.call_jobqueue_update(names, virt_file_name, content)
107 cffbbae7 Michael Hanselmann
108 cffbbae7 Michael Hanselmann
109 a06c6ae8 Michael Hanselmann
class _SimpleJobQuery:
110 a06c6ae8 Michael Hanselmann
  """Wrapper for job queries.
111 a06c6ae8 Michael Hanselmann

112 a06c6ae8 Michael Hanselmann
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
113 a06c6ae8 Michael Hanselmann

114 a06c6ae8 Michael Hanselmann
  """
115 a06c6ae8 Michael Hanselmann
  def __init__(self, fields):
116 a06c6ae8 Michael Hanselmann
    """Initializes this class.
117 a06c6ae8 Michael Hanselmann

118 a06c6ae8 Michael Hanselmann
    """
119 a06c6ae8 Michael Hanselmann
    self._query = query.Query(query.JOB_FIELDS, fields)
120 a06c6ae8 Michael Hanselmann
121 a06c6ae8 Michael Hanselmann
  def __call__(self, job):
122 a06c6ae8 Michael Hanselmann
    """Executes a job query using cached field list.
123 a06c6ae8 Michael Hanselmann

124 a06c6ae8 Michael Hanselmann
    """
125 a06c6ae8 Michael Hanselmann
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
126 a06c6ae8 Michael Hanselmann
127 a06c6ae8 Michael Hanselmann
128 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
129 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
130 e2715f69 Michael Hanselmann

131 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
132 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
133 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
134 ea03467c Iustin Pop
  @ivar status: the current status
135 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
136 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
137 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
138 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
139 f1048938 Iustin Pop

140 e2715f69 Michael Hanselmann
  """
141 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
142 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
143 66d895a8 Iustin Pop
               "__weakref__"]
144 66d895a8 Iustin Pop
145 85f03e0d Michael Hanselmann
  def __init__(self, op):
146 66abb9ff Michael Hanselmann
    """Initializes instances of this class.
147 ea03467c Iustin Pop

148 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
149 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
150 ea03467c Iustin Pop

151 ea03467c Iustin Pop
    """
152 85f03e0d Michael Hanselmann
    self.input = op
153 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
154 85f03e0d Michael Hanselmann
    self.result = None
155 85f03e0d Michael Hanselmann
    self.log = []
156 70552c46 Michael Hanselmann
    self.start_timestamp = None
157 b9b5abcb Iustin Pop
    self.exec_timestamp = None
158 70552c46 Michael Hanselmann
    self.end_timestamp = None
159 f1da30e6 Michael Hanselmann
160 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
161 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
162 8f5c488d Michael Hanselmann
163 f1da30e6 Michael Hanselmann
  @classmethod
164 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
165 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
166 ea03467c Iustin Pop

167 ea03467c Iustin Pop
    @type state: dict
168 ea03467c Iustin Pop
    @param state: the serialized state
169 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
170 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
171 ea03467c Iustin Pop

172 ea03467c Iustin Pop
    """
173 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
174 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
175 85f03e0d Michael Hanselmann
    obj.status = state["status"]
176 85f03e0d Michael Hanselmann
    obj.result = state["result"]
177 85f03e0d Michael Hanselmann
    obj.log = state["log"]
178 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
179 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
180 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
181 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
182 f1da30e6 Michael Hanselmann
    return obj
183 f1da30e6 Michael Hanselmann
184 f1da30e6 Michael Hanselmann
  def Serialize(self):
185 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
186 ea03467c Iustin Pop

187 ea03467c Iustin Pop
    @rtype: dict
188 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
189 ea03467c Iustin Pop

190 ea03467c Iustin Pop
    """
191 6c5a7090 Michael Hanselmann
    return {
192 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
193 6c5a7090 Michael Hanselmann
      "status": self.status,
194 6c5a7090 Michael Hanselmann
      "result": self.result,
195 6c5a7090 Michael Hanselmann
      "log": self.log,
196 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
197 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
198 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
199 8f5c488d Michael Hanselmann
      "priority": self.priority,
200 6c5a7090 Michael Hanselmann
      }
201 f1048938 Iustin Pop
202 e2715f69 Michael Hanselmann
203 e2715f69 Michael Hanselmann
class _QueuedJob(object):
204 e2715f69 Michael Hanselmann
  """In-memory job representation.
205 e2715f69 Michael Hanselmann

206 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
207 ea03467c Iustin Pop
  be taken care of by users of this class.
208 ea03467c Iustin Pop

209 ea03467c Iustin Pop
  @type queue: L{JobQueue}
210 ea03467c Iustin Pop
  @ivar queue: the parent queue
211 ea03467c Iustin Pop
  @ivar id: the job ID
212 ea03467c Iustin Pop
  @type ops: list
213 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
214 ea03467c Iustin Pop
  @type log_serial: int
215 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
216 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
217 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
218 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
219 c0f6d0d8 Michael Hanselmann
  @ivar writable: Whether the job is allowed to be modified
220 e2715f69 Michael Hanselmann

221 e2715f69 Michael Hanselmann
  """
222 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
223 26d3fd2f Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
224 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
225 8a3cd185 Michael Hanselmann
               "__weakref__", "processor_lock", "writable", "archived"]
226 66d895a8 Iustin Pop
227 653bc0f1 Michele Tartara
  def AddReasons(self):
228 e0f2bf1e Michele Tartara
    """Extend the reason trail
229 e0f2bf1e Michele Tartara

230 e0f2bf1e Michele Tartara
    Add the reason for all the opcodes of this job to be executed.
231 e0f2bf1e Michele Tartara

232 e0f2bf1e Michele Tartara
    """
233 e0f2bf1e Michele Tartara
    count = 0
234 e0f2bf1e Michele Tartara
    for queued_op in self.ops:
235 e0f2bf1e Michele Tartara
      op = queued_op.input
236 580b1fdd Jose A. Lopes
      reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__)
237 e0f2bf1e Michele Tartara
      reason_text = "job=%d;index=%d" % (self.id, count)
238 e0f2bf1e Michele Tartara
      reason = getattr(op, "reason", [])
239 e0f2bf1e Michele Tartara
      reason.append((reason_src, reason_text, utils.EpochNano()))
240 e0f2bf1e Michele Tartara
      op.reason = reason
241 e0f2bf1e Michele Tartara
      count = count + 1
242 e0f2bf1e Michele Tartara
243 c0f6d0d8 Michael Hanselmann
  def __init__(self, queue, job_id, ops, writable):
244 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
245 ea03467c Iustin Pop

246 ea03467c Iustin Pop
    @type queue: L{JobQueue}
247 ea03467c Iustin Pop
    @param queue: our parent queue
248 ea03467c Iustin Pop
    @type job_id: job_id
249 ea03467c Iustin Pop
    @param job_id: our job id
250 ea03467c Iustin Pop
    @type ops: list
251 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
252 ea03467c Iustin Pop
        in _QueuedOpCodes
253 c0f6d0d8 Michael Hanselmann
    @type writable: bool
254 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
255 ea03467c Iustin Pop

256 ea03467c Iustin Pop
    """
257 e2715f69 Michael Hanselmann
    if not ops:
258 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
259 e2715f69 Michael Hanselmann
260 85f03e0d Michael Hanselmann
    self.queue = queue
261 76b62028 Iustin Pop
    self.id = int(job_id)
262 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
263 653bc0f1 Michele Tartara
    self.AddReasons()
264 6c5a7090 Michael Hanselmann
    self.log_serial = 0
265 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
266 c56ec146 Iustin Pop
    self.start_timestamp = None
267 c56ec146 Iustin Pop
    self.end_timestamp = None
268 8a3cd185 Michael Hanselmann
    self.archived = False
269 6c5a7090 Michael Hanselmann
270 c0f6d0d8 Michael Hanselmann
    self._InitInMemory(self, writable)
271 fa4aa6b4 Michael Hanselmann
272 8a3cd185 Michael Hanselmann
    assert not self.archived, "New jobs can not be marked as archived"
273 8a3cd185 Michael Hanselmann
274 fa4aa6b4 Michael Hanselmann
  @staticmethod
275 c0f6d0d8 Michael Hanselmann
  def _InitInMemory(obj, writable):
276 fa4aa6b4 Michael Hanselmann
    """Initializes in-memory variables.
277 fa4aa6b4 Michael Hanselmann

278 fa4aa6b4 Michael Hanselmann
    """
279 c0f6d0d8 Michael Hanselmann
    obj.writable = writable
280 03b63608 Michael Hanselmann
    obj.ops_iter = None
281 26d3fd2f Michael Hanselmann
    obj.cur_opctx = None
282 f8a4adfa Michael Hanselmann
283 f8a4adfa Michael Hanselmann
    # Read-only jobs are not processed and therefore don't need a lock
284 f8a4adfa Michael Hanselmann
    if writable:
285 f8a4adfa Michael Hanselmann
      obj.processor_lock = threading.Lock()
286 f8a4adfa Michael Hanselmann
    else:
287 f8a4adfa Michael Hanselmann
      obj.processor_lock = None
288 be760ba8 Michael Hanselmann
289 9fa2e150 Michael Hanselmann
  def __repr__(self):
290 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
291 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
292 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
293 9fa2e150 Michael Hanselmann
294 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
295 9fa2e150 Michael Hanselmann
296 f1da30e6 Michael Hanselmann
  @classmethod
297 8a3cd185 Michael Hanselmann
  def Restore(cls, queue, state, writable, archived):
298 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
299 ea03467c Iustin Pop

300 ea03467c Iustin Pop
    @type queue: L{JobQueue}
301 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
302 ea03467c Iustin Pop
    @type state: dict
303 ea03467c Iustin Pop
    @param state: the serialized state
304 c0f6d0d8 Michael Hanselmann
    @type writable: bool
305 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
306 8a3cd185 Michael Hanselmann
    @type archived: bool
307 8a3cd185 Michael Hanselmann
    @param archived: Whether job was already archived
308 ea03467c Iustin Pop
    @rtype: _JobQueue
309 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
310 ea03467c Iustin Pop

311 ea03467c Iustin Pop
    """
312 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
313 85f03e0d Michael Hanselmann
    obj.queue = queue
314 76b62028 Iustin Pop
    obj.id = int(state["id"])
315 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
316 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
317 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
318 8a3cd185 Michael Hanselmann
    obj.archived = archived
319 6c5a7090 Michael Hanselmann
320 6c5a7090 Michael Hanselmann
    obj.ops = []
321 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
322 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
323 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
324 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
325 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
326 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
327 6c5a7090 Michael Hanselmann
328 c0f6d0d8 Michael Hanselmann
    cls._InitInMemory(obj, writable)
329 be760ba8 Michael Hanselmann
330 f1da30e6 Michael Hanselmann
    return obj
331 f1da30e6 Michael Hanselmann
332 f1da30e6 Michael Hanselmann
  def Serialize(self):
333 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
334 ea03467c Iustin Pop

335 ea03467c Iustin Pop
    @rtype: dict
336 ea03467c Iustin Pop
    @return: the serialized state
337 ea03467c Iustin Pop

338 ea03467c Iustin Pop
    """
339 f1da30e6 Michael Hanselmann
    return {
340 f1da30e6 Michael Hanselmann
      "id": self.id,
341 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
342 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
343 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
344 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
345 f1da30e6 Michael Hanselmann
      }
346 f1da30e6 Michael Hanselmann
347 85f03e0d Michael Hanselmann
  def CalcStatus(self):
348 ea03467c Iustin Pop
    """Compute the status of this job.
349 ea03467c Iustin Pop

350 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
351 ea03467c Iustin Pop
    based on their status, computes the job status.
352 ea03467c Iustin Pop

353 ea03467c Iustin Pop
    The algorithm is:
354 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
355 ea03467c Iustin Pop
        status will be the same
356 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
357 ea03467c Iustin Pop
          - waitlock
358 fbf0262f Michael Hanselmann
          - canceling
359 ea03467c Iustin Pop
          - running
360 ea03467c Iustin Pop

361 ea03467c Iustin Pop
        will determine the job status
362 ea03467c Iustin Pop

363 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
364 ea03467c Iustin Pop
        and the job status will be the same
365 ea03467c Iustin Pop

366 ea03467c Iustin Pop
    @return: the job status
367 ea03467c Iustin Pop

368 ea03467c Iustin Pop
    """
369 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
370 e2715f69 Michael Hanselmann
371 e2715f69 Michael Hanselmann
    all_success = True
372 85f03e0d Michael Hanselmann
    for op in self.ops:
373 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
374 e2715f69 Michael Hanselmann
        continue
375 e2715f69 Michael Hanselmann
376 e2715f69 Michael Hanselmann
      all_success = False
377 e2715f69 Michael Hanselmann
378 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
379 e2715f69 Michael Hanselmann
        pass
380 47099cd1 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING:
381 47099cd1 Michael Hanselmann
        status = constants.JOB_STATUS_WAITING
382 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
383 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
384 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
385 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
386 fbf0262f Michael Hanselmann
        break
387 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
388 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
389 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
390 f1da30e6 Michael Hanselmann
        break
391 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
392 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
393 4cb1d919 Michael Hanselmann
        break
394 e2715f69 Michael Hanselmann
395 e2715f69 Michael Hanselmann
    if all_success:
396 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
397 e2715f69 Michael Hanselmann
398 e2715f69 Michael Hanselmann
    return status
399 e2715f69 Michael Hanselmann
400 8f5c488d Michael Hanselmann
  def CalcPriority(self):
401 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
402 8f5c488d Michael Hanselmann

403 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
404 8f5c488d Michael Hanselmann
    priority is used.
405 8f5c488d Michael Hanselmann

406 8f5c488d Michael Hanselmann
    @rtype: int
407 8f5c488d Michael Hanselmann

408 8f5c488d Michael Hanselmann
    """
409 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
410 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
411 8f5c488d Michael Hanselmann
412 8f5c488d Michael Hanselmann
    if not priorities:
413 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
414 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
415 8f5c488d Michael Hanselmann
416 8f5c488d Michael Hanselmann
    return min(priorities)
417 8f5c488d Michael Hanselmann
418 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
419 ea03467c Iustin Pop
    """Selectively returns the log entries.
420 ea03467c Iustin Pop

421 ea03467c Iustin Pop
    @type newer_than: None or int
422 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
423 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
424 ea03467c Iustin Pop
        than this value
425 ea03467c Iustin Pop
    @rtype: list
426 ea03467c Iustin Pop
    @return: the list of the log entries selected
427 ea03467c Iustin Pop

428 ea03467c Iustin Pop
    """
429 6c5a7090 Michael Hanselmann
    if newer_than is None:
430 6c5a7090 Michael Hanselmann
      serial = -1
431 6c5a7090 Michael Hanselmann
    else:
432 6c5a7090 Michael Hanselmann
      serial = newer_than
433 6c5a7090 Michael Hanselmann
434 6c5a7090 Michael Hanselmann
    entries = []
435 6c5a7090 Michael Hanselmann
    for op in self.ops:
436 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
437 6c5a7090 Michael Hanselmann
438 6c5a7090 Michael Hanselmann
    return entries
439 6c5a7090 Michael Hanselmann
440 6a290889 Guido Trotter
  def GetInfo(self, fields):
441 6a290889 Guido Trotter
    """Returns information about a job.
442 6a290889 Guido Trotter

443 6a290889 Guido Trotter
    @type fields: list
444 6a290889 Guido Trotter
    @param fields: names of fields to return
445 6a290889 Guido Trotter
    @rtype: list
446 6a290889 Guido Trotter
    @return: list with one element for each field
447 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
448 6a290889 Guido Trotter
        has been passed
449 6a290889 Guido Trotter

450 6a290889 Guido Trotter
    """
451 a06c6ae8 Michael Hanselmann
    return _SimpleJobQuery(fields)(self)
452 6a290889 Guido Trotter
453 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
454 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
455 34327f51 Iustin Pop

456 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
457 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
458 34327f51 Iustin Pop
    finalised are not changed.
459 34327f51 Iustin Pop

460 34327f51 Iustin Pop
    @param status: a given opcode status
461 34327f51 Iustin Pop
    @param result: the opcode result
462 34327f51 Iustin Pop

463 34327f51 Iustin Pop
    """
464 747f6113 Michael Hanselmann
    not_marked = True
465 747f6113 Michael Hanselmann
    for op in self.ops:
466 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
467 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
468 747f6113 Michael Hanselmann
        continue
469 747f6113 Michael Hanselmann
      op.status = status
470 747f6113 Michael Hanselmann
      op.result = result
471 747f6113 Michael Hanselmann
      not_marked = False
472 34327f51 Iustin Pop
473 66bd7445 Michael Hanselmann
  def Finalize(self):
474 66bd7445 Michael Hanselmann
    """Marks the job as finalized.
475 66bd7445 Michael Hanselmann

476 66bd7445 Michael Hanselmann
    """
477 66bd7445 Michael Hanselmann
    self.end_timestamp = TimeStampNow()
478 66bd7445 Michael Hanselmann
479 099b2870 Michael Hanselmann
  def Cancel(self):
480 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
481 a0d2fe2c Michael Hanselmann

482 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
483 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
484 a0d2fe2c Michael Hanselmann
      as canceling and a text message
485 a0d2fe2c Michael Hanselmann

486 a0d2fe2c Michael Hanselmann
    """
487 099b2870 Michael Hanselmann
    status = self.CalcStatus()
488 099b2870 Michael Hanselmann
489 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
490 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
491 099b2870 Michael Hanselmann
                             "Job canceled by request")
492 66bd7445 Michael Hanselmann
      self.Finalize()
493 86b16e9d Michael Hanselmann
      return (True, "Job %s canceled" % self.id)
494 099b2870 Michael Hanselmann
495 47099cd1 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITING:
496 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
497 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
498 86b16e9d Michael Hanselmann
      return (True, "Job %s will be canceled" % self.id)
499 099b2870 Michael Hanselmann
500 86b16e9d Michael Hanselmann
    else:
501 86b16e9d Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
502 86b16e9d Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
503 099b2870 Michael Hanselmann
504 4679547e Michael Hanselmann
  def ChangePriority(self, priority):
505 4679547e Michael Hanselmann
    """Changes the job priority.
506 4679547e Michael Hanselmann

507 4679547e Michael Hanselmann
    @type priority: int
508 4679547e Michael Hanselmann
    @param priority: New priority
509 4679547e Michael Hanselmann
    @rtype: tuple; (bool, string)
510 4679547e Michael Hanselmann
    @return: Boolean describing whether job's priority was successfully changed
511 4679547e Michael Hanselmann
      and a text message
512 4679547e Michael Hanselmann

513 4679547e Michael Hanselmann
    """
514 4679547e Michael Hanselmann
    status = self.CalcStatus()
515 4679547e Michael Hanselmann
516 4679547e Michael Hanselmann
    if status in constants.JOBS_FINALIZED:
517 4679547e Michael Hanselmann
      return (False, "Job %s is finished" % self.id)
518 4679547e Michael Hanselmann
    elif status == constants.JOB_STATUS_CANCELING:
519 4679547e Michael Hanselmann
      return (False, "Job %s is cancelling" % self.id)
520 4679547e Michael Hanselmann
    else:
521 4679547e Michael Hanselmann
      assert status in (constants.JOB_STATUS_QUEUED,
522 4679547e Michael Hanselmann
                        constants.JOB_STATUS_WAITING,
523 4679547e Michael Hanselmann
                        constants.JOB_STATUS_RUNNING)
524 4679547e Michael Hanselmann
525 4679547e Michael Hanselmann
      changed = False
526 4679547e Michael Hanselmann
      for op in self.ops:
527 4679547e Michael Hanselmann
        if (op.status == constants.OP_STATUS_RUNNING or
528 4679547e Michael Hanselmann
            op.status in constants.OPS_FINALIZED):
529 4679547e Michael Hanselmann
          assert not changed, \
530 4679547e Michael Hanselmann
            ("Found opcode for which priority should not be changed after"
531 4679547e Michael Hanselmann
             " priority has been changed for previous opcodes")
532 4679547e Michael Hanselmann
          continue
533 4679547e Michael Hanselmann
534 4679547e Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
535 4679547e Michael Hanselmann
                             constants.OP_STATUS_WAITING)
536 4679547e Michael Hanselmann
537 4679547e Michael Hanselmann
        changed = True
538 4679547e Michael Hanselmann
539 3c631ea2 Michael Hanselmann
        # Set new priority (doesn't modify opcode input)
540 4679547e Michael Hanselmann
        op.priority = priority
541 4679547e Michael Hanselmann
542 4679547e Michael Hanselmann
      if changed:
543 4679547e Michael Hanselmann
        return (True, ("Priorities of pending opcodes for job %s have been"
544 4679547e Michael Hanselmann
                       " changed to %s" % (self.id, priority)))
545 4679547e Michael Hanselmann
      else:
546 4679547e Michael Hanselmann
        return (False, "Job %s had no pending opcodes" % self.id)
547 4679547e Michael Hanselmann
548 f1048938 Iustin Pop
549 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
550 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
551 031a3e57 Michael Hanselmann
    """Initializes this class.
552 ea03467c Iustin Pop

553 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
554 031a3e57 Michael Hanselmann
    @param queue: Job queue
555 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
556 031a3e57 Michael Hanselmann
    @param job: Job object
557 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
558 031a3e57 Michael Hanselmann
    @param op: OpCode
559 031a3e57 Michael Hanselmann

560 031a3e57 Michael Hanselmann
    """
561 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
562 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
563 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
564 031a3e57 Michael Hanselmann
565 031a3e57 Michael Hanselmann
    self._queue = queue
566 031a3e57 Michael Hanselmann
    self._job = job
567 031a3e57 Michael Hanselmann
    self._op = op
568 031a3e57 Michael Hanselmann
569 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
570 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
571 dc1e2262 Michael Hanselmann

572 dc1e2262 Michael Hanselmann
    """
573 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
574 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
575 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
576 dc1e2262 Michael Hanselmann
      raise CancelJob()
577 dc1e2262 Michael Hanselmann
578 942e2262 Michael Hanselmann
    # See if queue is shutting down
579 942e2262 Michael Hanselmann
    if not self._queue.AcceptingJobsUnlocked():
580 942e2262 Michael Hanselmann
      logging.debug("Queue is shutting down")
581 942e2262 Michael Hanselmann
      raise QueueShutdown()
582 942e2262 Michael Hanselmann
583 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
584 031a3e57 Michael Hanselmann
  def NotifyStart(self):
585 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
586 e92376d7 Iustin Pop

587 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
588 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
589 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
590 47099cd1 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
591 e92376d7 Iustin Pop

592 e92376d7 Iustin Pop
    """
593 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
594 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
595 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
596 fbf0262f Michael Hanselmann
597 271daef8 Iustin Pop
    # Cancel here if we were asked to
598 dc1e2262 Michael Hanselmann
    self._CheckCancel()
599 fbf0262f Michael Hanselmann
600 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
601 9bdab621 Michael Hanselmann
602 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
603 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
604 271daef8 Iustin Pop
605 271daef8 Iustin Pop
    # And finally replicate the job status
606 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
607 031a3e57 Michael Hanselmann
608 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
609 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
610 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
611 9bf5e01f Guido Trotter

612 9bf5e01f Guido Trotter
    """
613 9bf5e01f Guido Trotter
    self._job.log_serial += 1
614 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
615 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
616 9bf5e01f Guido Trotter
617 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
618 031a3e57 Michael Hanselmann
    """Append a log entry.
619 031a3e57 Michael Hanselmann

620 031a3e57 Michael Hanselmann
    """
621 031a3e57 Michael Hanselmann
    assert len(args) < 3
622 031a3e57 Michael Hanselmann
623 031a3e57 Michael Hanselmann
    if len(args) == 1:
624 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
625 031a3e57 Michael Hanselmann
      log_msg = args[0]
626 031a3e57 Michael Hanselmann
    else:
627 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
628 031a3e57 Michael Hanselmann
629 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
630 031a3e57 Michael Hanselmann
    # precision.
631 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
632 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
633 031a3e57 Michael Hanselmann
634 e4e59de8 Michael Hanselmann
  def CurrentPriority(self):
635 e4e59de8 Michael Hanselmann
    """Returns current priority for opcode.
636 ef2df7d3 Michael Hanselmann

637 ef2df7d3 Michael Hanselmann
    """
638 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
639 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
640 dc1e2262 Michael Hanselmann
641 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
642 dc1e2262 Michael Hanselmann
    self._CheckCancel()
643 dc1e2262 Michael Hanselmann
644 e4e59de8 Michael Hanselmann
    return self._op.priority
645 e4e59de8 Michael Hanselmann
646 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
647 6a373640 Michael Hanselmann
    """Submits jobs for processing.
648 6a373640 Michael Hanselmann

649 6a373640 Michael Hanselmann
    See L{JobQueue.SubmitManyJobs}.
650 6a373640 Michael Hanselmann

651 6a373640 Michael Hanselmann
    """
652 6a373640 Michael Hanselmann
    # Locking is done in job queue
653 6a373640 Michael Hanselmann
    return self._queue.SubmitManyJobs(jobs)
654 6a373640 Michael Hanselmann
655 031a3e57 Michael Hanselmann
656 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
657 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
658 989a8bee Michael Hanselmann
    """Initializes this class.
659 6c2549d6 Guido Trotter

660 989a8bee Michael Hanselmann
    @type fields: list of strings
661 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
662 989a8bee Michael Hanselmann
    @type prev_job_info: string
663 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
664 989a8bee Michael Hanselmann
    @type prev_log_serial: string
665 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
666 6c2549d6 Guido Trotter

667 989a8bee Michael Hanselmann
    """
668 dc2879ea Michael Hanselmann
    self._squery = _SimpleJobQuery(fields)
669 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
670 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
671 6c2549d6 Guido Trotter
672 989a8bee Michael Hanselmann
  def __call__(self, job):
673 989a8bee Michael Hanselmann
    """Checks whether job has changed.
674 6c2549d6 Guido Trotter

675 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
676 989a8bee Michael Hanselmann
    @param job: Job object
677 6c2549d6 Guido Trotter

678 6c2549d6 Guido Trotter
    """
679 c0f6d0d8 Michael Hanselmann
    assert not job.writable, "Expected read-only job"
680 c0f6d0d8 Michael Hanselmann
681 989a8bee Michael Hanselmann
    status = job.CalcStatus()
682 dc2879ea Michael Hanselmann
    job_info = self._squery(job)
683 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
684 6c2549d6 Guido Trotter
685 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
686 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
687 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
688 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
689 6c2549d6 Guido Trotter
    # significantly different.
690 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
691 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
692 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
693 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
694 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
695 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
696 6c2549d6 Guido Trotter
697 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
698 6c2549d6 Guido Trotter
    # no changes.
699 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
700 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
701 47099cd1 Michael Hanselmann
                       constants.JOB_STATUS_WAITING) or
702 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
703 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
704 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
705 989a8bee Michael Hanselmann
      return (job_info, log_entries)
706 6c2549d6 Guido Trotter
707 989a8bee Michael Hanselmann
    return None
708 989a8bee Michael Hanselmann
709 989a8bee Michael Hanselmann
710 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
711 383477e9 Michael Hanselmann
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
712 989a8bee Michael Hanselmann
    """Initializes this class.
713 989a8bee Michael Hanselmann

714 989a8bee Michael Hanselmann
    @type filename: string
715 989a8bee Michael Hanselmann
    @param filename: Path to job file
716 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
717 6c2549d6 Guido Trotter

718 989a8bee Michael Hanselmann
    """
719 383477e9 Michael Hanselmann
    self._wm = _inotify_wm_cls()
720 989a8bee Michael Hanselmann
    self._inotify_handler = \
721 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
722 989a8bee Michael Hanselmann
    self._notifier = \
723 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
724 989a8bee Michael Hanselmann
    try:
725 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
726 989a8bee Michael Hanselmann
    except Exception:
727 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
728 989a8bee Michael Hanselmann
      self._notifier.stop()
729 989a8bee Michael Hanselmann
      raise
730 989a8bee Michael Hanselmann
731 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
732 989a8bee Michael Hanselmann
    """Callback for inotify.
733 989a8bee Michael Hanselmann

734 989a8bee Michael Hanselmann
    """
735 6c2549d6 Guido Trotter
    if not notifier_enabled:
736 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
737 989a8bee Michael Hanselmann
738 989a8bee Michael Hanselmann
  def Wait(self, timeout):
739 989a8bee Michael Hanselmann
    """Waits for the job file to change.
740 989a8bee Michael Hanselmann

741 989a8bee Michael Hanselmann
    @type timeout: float
742 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
743 989a8bee Michael Hanselmann
    @return: Whether there have been events
744 989a8bee Michael Hanselmann

745 989a8bee Michael Hanselmann
    """
746 989a8bee Michael Hanselmann
    assert timeout >= 0
747 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
748 989a8bee Michael Hanselmann
    if have_events:
749 989a8bee Michael Hanselmann
      self._notifier.read_events()
750 989a8bee Michael Hanselmann
    self._notifier.process_events()
751 989a8bee Michael Hanselmann
    return have_events
752 989a8bee Michael Hanselmann
753 989a8bee Michael Hanselmann
  def Close(self):
754 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
755 989a8bee Michael Hanselmann

756 989a8bee Michael Hanselmann
    """
757 989a8bee Michael Hanselmann
    self._notifier.stop()
758 989a8bee Michael Hanselmann
759 989a8bee Michael Hanselmann
760 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
761 383477e9 Michael Hanselmann
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
762 989a8bee Michael Hanselmann
    """Initializes this class.
763 989a8bee Michael Hanselmann

764 989a8bee Michael Hanselmann
    @type filename: string
765 989a8bee Michael Hanselmann
    @param filename: Path to job file
766 989a8bee Michael Hanselmann

767 989a8bee Michael Hanselmann
    """
768 989a8bee Michael Hanselmann
    self._filewaiter = None
769 989a8bee Michael Hanselmann
    self._filename = filename
770 383477e9 Michael Hanselmann
    self._waiter_cls = _waiter_cls
771 6c2549d6 Guido Trotter
772 989a8bee Michael Hanselmann
  def Wait(self, timeout):
773 989a8bee Michael Hanselmann
    """Waits for a job to change.
774 6c2549d6 Guido Trotter

775 989a8bee Michael Hanselmann
    @type timeout: float
776 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
777 989a8bee Michael Hanselmann
    @return: Whether there have been events
778 989a8bee Michael Hanselmann

779 989a8bee Michael Hanselmann
    """
780 989a8bee Michael Hanselmann
    if self._filewaiter:
781 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
782 989a8bee Michael Hanselmann
783 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
784 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
785 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
786 989a8bee Michael Hanselmann
    # race condition.
787 383477e9 Michael Hanselmann
    self._filewaiter = self._waiter_cls(self._filename)
788 989a8bee Michael Hanselmann
789 989a8bee Michael Hanselmann
    return True
790 989a8bee Michael Hanselmann
791 989a8bee Michael Hanselmann
  def Close(self):
792 989a8bee Michael Hanselmann
    """Closes underlying waiter.
793 989a8bee Michael Hanselmann

794 989a8bee Michael Hanselmann
    """
795 989a8bee Michael Hanselmann
    if self._filewaiter:
796 989a8bee Michael Hanselmann
      self._filewaiter.Close()
797 989a8bee Michael Hanselmann
798 989a8bee Michael Hanselmann
799 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
800 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
801 989a8bee Michael Hanselmann

802 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
803 989a8bee Michael Hanselmann
  the current job status has changed.
804 989a8bee Michael Hanselmann

805 989a8bee Michael Hanselmann
  """
806 989a8bee Michael Hanselmann
  @staticmethod
807 dfc8824a Michael Hanselmann
  def _CheckForChanges(counter, job_load_fn, check_fn):
808 dfc8824a Michael Hanselmann
    if counter.next() > 0:
809 dfc8824a Michael Hanselmann
      # If this isn't the first check the job is given some more time to change
810 dfc8824a Michael Hanselmann
      # again. This gives better performance for jobs generating many
811 dfc8824a Michael Hanselmann
      # changes/messages.
812 dfc8824a Michael Hanselmann
      time.sleep(0.1)
813 dfc8824a Michael Hanselmann
814 989a8bee Michael Hanselmann
    job = job_load_fn()
815 989a8bee Michael Hanselmann
    if not job:
816 989a8bee Michael Hanselmann
      raise errors.JobLost()
817 989a8bee Michael Hanselmann
818 989a8bee Michael Hanselmann
    result = check_fn(job)
819 989a8bee Michael Hanselmann
    if result is None:
820 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
821 989a8bee Michael Hanselmann
822 989a8bee Michael Hanselmann
    return result
823 989a8bee Michael Hanselmann
824 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
825 383477e9 Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout,
826 383477e9 Michael Hanselmann
               _waiter_cls=_JobChangesWaiter):
827 989a8bee Michael Hanselmann
    """Waits for changes on a job.
828 989a8bee Michael Hanselmann

829 989a8bee Michael Hanselmann
    @type filename: string
830 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
831 989a8bee Michael Hanselmann
    @type job_load_fn: callable
832 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
833 989a8bee Michael Hanselmann
    @type fields: list of strings
834 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
835 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
836 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
837 989a8bee Michael Hanselmann
    @type prev_log_serial: int
838 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
839 989a8bee Michael Hanselmann
    @type timeout: float
840 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
841 989a8bee Michael Hanselmann

842 989a8bee Michael Hanselmann
    """
843 dfc8824a Michael Hanselmann
    counter = itertools.count()
844 6c2549d6 Guido Trotter
    try:
845 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
846 383477e9 Michael Hanselmann
      waiter = _waiter_cls(filename)
847 989a8bee Michael Hanselmann
      try:
848 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
849 dfc8824a Michael Hanselmann
                                          counter, job_load_fn, check_fn),
850 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
851 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
852 989a8bee Michael Hanselmann
      finally:
853 989a8bee Michael Hanselmann
        waiter.Close()
854 383477e9 Michael Hanselmann
    except errors.JobLost:
855 6c2549d6 Guido Trotter
      return None
856 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
857 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
858 6c2549d6 Guido Trotter
859 6c2549d6 Guido Trotter
860 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
861 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
862 6760e4ed Michael Hanselmann

863 6760e4ed Michael Hanselmann
  """
864 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
865 6760e4ed Michael Hanselmann
    to_encode = err
866 6760e4ed Michael Hanselmann
  else:
867 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
868 6760e4ed Michael Hanselmann
869 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
870 6760e4ed Michael Hanselmann
871 6760e4ed Michael Hanselmann
872 26d3fd2f Michael Hanselmann
class _TimeoutStrategyWrapper:
873 26d3fd2f Michael Hanselmann
  def __init__(self, fn):
874 26d3fd2f Michael Hanselmann
    """Initializes this class.
875 26d3fd2f Michael Hanselmann

876 26d3fd2f Michael Hanselmann
    """
877 26d3fd2f Michael Hanselmann
    self._fn = fn
878 26d3fd2f Michael Hanselmann
    self._next = None
879 26d3fd2f Michael Hanselmann
880 26d3fd2f Michael Hanselmann
  def _Advance(self):
881 26d3fd2f Michael Hanselmann
    """Gets the next timeout if necessary.
882 26d3fd2f Michael Hanselmann

883 26d3fd2f Michael Hanselmann
    """
884 26d3fd2f Michael Hanselmann
    if self._next is None:
885 26d3fd2f Michael Hanselmann
      self._next = self._fn()
886 26d3fd2f Michael Hanselmann
887 26d3fd2f Michael Hanselmann
  def Peek(self):
888 26d3fd2f Michael Hanselmann
    """Returns the next timeout.
889 26d3fd2f Michael Hanselmann

890 26d3fd2f Michael Hanselmann
    """
891 26d3fd2f Michael Hanselmann
    self._Advance()
892 26d3fd2f Michael Hanselmann
    return self._next
893 26d3fd2f Michael Hanselmann
894 26d3fd2f Michael Hanselmann
  def Next(self):
895 26d3fd2f Michael Hanselmann
    """Returns the current timeout and advances the internal state.
896 26d3fd2f Michael Hanselmann

897 26d3fd2f Michael Hanselmann
    """
898 26d3fd2f Michael Hanselmann
    self._Advance()
899 26d3fd2f Michael Hanselmann
    result = self._next
900 26d3fd2f Michael Hanselmann
    self._next = None
901 26d3fd2f Michael Hanselmann
    return result
902 26d3fd2f Michael Hanselmann
903 26d3fd2f Michael Hanselmann
904 b80cc518 Michael Hanselmann
class _OpExecContext:
905 26d3fd2f Michael Hanselmann
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
906 b80cc518 Michael Hanselmann
    """Initializes this class.
907 b80cc518 Michael Hanselmann

908 b80cc518 Michael Hanselmann
    """
909 b80cc518 Michael Hanselmann
    self.op = op
910 b80cc518 Michael Hanselmann
    self.index = index
911 b80cc518 Michael Hanselmann
    self.log_prefix = log_prefix
912 b80cc518 Michael Hanselmann
    self.summary = op.input.Summary()
913 b80cc518 Michael Hanselmann
914 b95479a5 Michael Hanselmann
    # Create local copy to modify
915 580b1fdd Jose A. Lopes
    if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
916 b95479a5 Michael Hanselmann
      self.jobdeps = op.input.depends[:]
917 b95479a5 Michael Hanselmann
    else:
918 b95479a5 Michael Hanselmann
      self.jobdeps = None
919 b95479a5 Michael Hanselmann
920 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = timeout_strategy_factory
921 26d3fd2f Michael Hanselmann
    self._ResetTimeoutStrategy()
922 26d3fd2f Michael Hanselmann
923 26d3fd2f Michael Hanselmann
  def _ResetTimeoutStrategy(self):
924 26d3fd2f Michael Hanselmann
    """Creates a new timeout strategy.
925 26d3fd2f Michael Hanselmann

926 26d3fd2f Michael Hanselmann
    """
927 26d3fd2f Michael Hanselmann
    self._timeout_strategy = \
928 26d3fd2f Michael Hanselmann
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
929 26d3fd2f Michael Hanselmann
930 26d3fd2f Michael Hanselmann
  def CheckPriorityIncrease(self):
931 26d3fd2f Michael Hanselmann
    """Checks whether priority can and should be increased.
932 26d3fd2f Michael Hanselmann

933 26d3fd2f Michael Hanselmann
    Called when locks couldn't be acquired.
934 26d3fd2f Michael Hanselmann

935 26d3fd2f Michael Hanselmann
    """
936 26d3fd2f Michael Hanselmann
    op = self.op
937 26d3fd2f Michael Hanselmann
938 26d3fd2f Michael Hanselmann
    # Exhausted all retries and next round should not use blocking acquire
939 26d3fd2f Michael Hanselmann
    # for locks?
940 26d3fd2f Michael Hanselmann
    if (self._timeout_strategy.Peek() is None and
941 26d3fd2f Michael Hanselmann
        op.priority > constants.OP_PRIO_HIGHEST):
942 26d3fd2f Michael Hanselmann
      logging.debug("Increasing priority")
943 26d3fd2f Michael Hanselmann
      op.priority -= 1
944 26d3fd2f Michael Hanselmann
      self._ResetTimeoutStrategy()
945 26d3fd2f Michael Hanselmann
      return True
946 26d3fd2f Michael Hanselmann
947 26d3fd2f Michael Hanselmann
    return False
948 26d3fd2f Michael Hanselmann
949 26d3fd2f Michael Hanselmann
  def GetNextLockTimeout(self):
950 26d3fd2f Michael Hanselmann
    """Returns the next lock acquire timeout.
951 26d3fd2f Michael Hanselmann

952 26d3fd2f Michael Hanselmann
    """
953 26d3fd2f Michael Hanselmann
    return self._timeout_strategy.Next()
954 26d3fd2f Michael Hanselmann
955 b80cc518 Michael Hanselmann
956 be760ba8 Michael Hanselmann
class _JobProcessor(object):
957 75d81fc8 Michael Hanselmann
  (DEFER,
958 75d81fc8 Michael Hanselmann
   WAITDEP,
959 75d81fc8 Michael Hanselmann
   FINISHED) = range(1, 4)
960 75d81fc8 Michael Hanselmann
961 26d3fd2f Michael Hanselmann
  def __init__(self, queue, opexec_fn, job,
962 26d3fd2f Michael Hanselmann
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
963 be760ba8 Michael Hanselmann
    """Initializes this class.
964 be760ba8 Michael Hanselmann

965 be760ba8 Michael Hanselmann
    """
966 be760ba8 Michael Hanselmann
    self.queue = queue
967 be760ba8 Michael Hanselmann
    self.opexec_fn = opexec_fn
968 be760ba8 Michael Hanselmann
    self.job = job
969 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = _timeout_strategy_factory
970 be760ba8 Michael Hanselmann
971 be760ba8 Michael Hanselmann
  @staticmethod
972 26d3fd2f Michael Hanselmann
  def _FindNextOpcode(job, timeout_strategy_factory):
973 be760ba8 Michael Hanselmann
    """Locates the next opcode to run.
974 be760ba8 Michael Hanselmann

975 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
976 be760ba8 Michael Hanselmann
    @param job: Job object
977 26d3fd2f Michael Hanselmann
    @param timeout_strategy_factory: Callable to create new timeout strategy
978 be760ba8 Michael Hanselmann

979 be760ba8 Michael Hanselmann
    """
980 be760ba8 Michael Hanselmann
    # Create some sort of a cache to speed up locating next opcode for future
981 be760ba8 Michael Hanselmann
    # lookups
982 be760ba8 Michael Hanselmann
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
983 be760ba8 Michael Hanselmann
    # pending and one for processed ops.
984 03b63608 Michael Hanselmann
    if job.ops_iter is None:
985 03b63608 Michael Hanselmann
      job.ops_iter = enumerate(job.ops)
986 be760ba8 Michael Hanselmann
987 be760ba8 Michael Hanselmann
    # Find next opcode to run
988 be760ba8 Michael Hanselmann
    while True:
989 be760ba8 Michael Hanselmann
      try:
990 03b63608 Michael Hanselmann
        (idx, op) = job.ops_iter.next()
991 be760ba8 Michael Hanselmann
      except StopIteration:
992 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for a finished job")
993 be760ba8 Michael Hanselmann
994 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_RUNNING:
995 be760ba8 Michael Hanselmann
        # Found an opcode already marked as running
996 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for job marked as running")
997 be760ba8 Michael Hanselmann
998 26d3fd2f Michael Hanselmann
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
999 26d3fd2f Michael Hanselmann
                             timeout_strategy_factory)
1000 be760ba8 Michael Hanselmann
1001 66bd7445 Michael Hanselmann
      if op.status not in constants.OPS_FINALIZED:
1002 66bd7445 Michael Hanselmann
        return opctx
1003 be760ba8 Michael Hanselmann
1004 66bd7445 Michael Hanselmann
      # This is a job that was partially completed before master daemon
1005 66bd7445 Michael Hanselmann
      # shutdown, so it can be expected that some opcodes are already
1006 66bd7445 Michael Hanselmann
      # completed successfully (if any did error out, then the whole job
1007 66bd7445 Michael Hanselmann
      # should have been aborted and not resubmitted for processing).
1008 66bd7445 Michael Hanselmann
      logging.info("%s: opcode %s already processed, skipping",
1009 66bd7445 Michael Hanselmann
                   opctx.log_prefix, opctx.summary)
1010 be760ba8 Michael Hanselmann
1011 be760ba8 Michael Hanselmann
  @staticmethod
1012 be760ba8 Michael Hanselmann
  def _MarkWaitlock(job, op):
1013 be760ba8 Michael Hanselmann
    """Marks an opcode as waiting for locks.
1014 be760ba8 Michael Hanselmann

1015 be760ba8 Michael Hanselmann
    The job's start timestamp is also set if necessary.
1016 be760ba8 Michael Hanselmann

1017 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
1018 be760ba8 Michael Hanselmann
    @param job: Job object
1019 a38e8674 Michael Hanselmann
    @type op: L{_QueuedOpCode}
1020 a38e8674 Michael Hanselmann
    @param op: Opcode object
1021 be760ba8 Michael Hanselmann

1022 be760ba8 Michael Hanselmann
    """
1023 be760ba8 Michael Hanselmann
    assert op in job.ops
1024 5fd6b694 Michael Hanselmann
    assert op.status in (constants.OP_STATUS_QUEUED,
1025 47099cd1 Michael Hanselmann
                         constants.OP_STATUS_WAITING)
1026 5fd6b694 Michael Hanselmann
1027 5fd6b694 Michael Hanselmann
    update = False
1028 be760ba8 Michael Hanselmann
1029 be760ba8 Michael Hanselmann
    op.result = None
1030 5fd6b694 Michael Hanselmann
1031 5fd6b694 Michael Hanselmann
    if op.status == constants.OP_STATUS_QUEUED:
1032 47099cd1 Michael Hanselmann
      op.status = constants.OP_STATUS_WAITING
1033 5fd6b694 Michael Hanselmann
      update = True
1034 5fd6b694 Michael Hanselmann
1035 5fd6b694 Michael Hanselmann
    if op.start_timestamp is None:
1036 5fd6b694 Michael Hanselmann
      op.start_timestamp = TimeStampNow()
1037 5fd6b694 Michael Hanselmann
      update = True
1038 be760ba8 Michael Hanselmann
1039 be760ba8 Michael Hanselmann
    if job.start_timestamp is None:
1040 be760ba8 Michael Hanselmann
      job.start_timestamp = op.start_timestamp
1041 5fd6b694 Michael Hanselmann
      update = True
1042 5fd6b694 Michael Hanselmann
1043 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1044 5fd6b694 Michael Hanselmann
1045 5fd6b694 Michael Hanselmann
    return update
1046 be760ba8 Michael Hanselmann
1047 b95479a5 Michael Hanselmann
  @staticmethod
1048 b95479a5 Michael Hanselmann
  def _CheckDependencies(queue, job, opctx):
1049 b95479a5 Michael Hanselmann
    """Checks if an opcode has dependencies and if so, processes them.
1050 b95479a5 Michael Hanselmann

1051 b95479a5 Michael Hanselmann
    @type queue: L{JobQueue}
1052 b95479a5 Michael Hanselmann
    @param queue: Queue object
1053 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1054 b95479a5 Michael Hanselmann
    @param job: Job object
1055 b95479a5 Michael Hanselmann
    @type opctx: L{_OpExecContext}
1056 b95479a5 Michael Hanselmann
    @param opctx: Opcode execution context
1057 b95479a5 Michael Hanselmann
    @rtype: bool
1058 b95479a5 Michael Hanselmann
    @return: Whether opcode will be re-scheduled by dependency tracker
1059 b95479a5 Michael Hanselmann

1060 b95479a5 Michael Hanselmann
    """
1061 b95479a5 Michael Hanselmann
    op = opctx.op
1062 b95479a5 Michael Hanselmann
1063 b95479a5 Michael Hanselmann
    result = False
1064 b95479a5 Michael Hanselmann
1065 b95479a5 Michael Hanselmann
    while opctx.jobdeps:
1066 b95479a5 Michael Hanselmann
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1067 b95479a5 Michael Hanselmann
1068 b95479a5 Michael Hanselmann
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1069 b95479a5 Michael Hanselmann
                                                          dep_status)
1070 b95479a5 Michael Hanselmann
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1071 b95479a5 Michael Hanselmann
1072 b95479a5 Michael Hanselmann
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1073 b95479a5 Michael Hanselmann
1074 b95479a5 Michael Hanselmann
      if depresult == _JobDependencyManager.CONTINUE:
1075 b95479a5 Michael Hanselmann
        # Remove dependency and continue
1076 b95479a5 Michael Hanselmann
        opctx.jobdeps.pop(0)
1077 b95479a5 Michael Hanselmann
1078 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.WAIT:
1079 b95479a5 Michael Hanselmann
        # Need to wait for notification, dependency tracker will re-add job
1080 b95479a5 Michael Hanselmann
        # to workerpool
1081 b95479a5 Michael Hanselmann
        result = True
1082 b95479a5 Michael Hanselmann
        break
1083 b95479a5 Michael Hanselmann
1084 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.CANCEL:
1085 b95479a5 Michael Hanselmann
        # Job was cancelled, cancel this job as well
1086 b95479a5 Michael Hanselmann
        job.Cancel()
1087 b95479a5 Michael Hanselmann
        assert op.status == constants.OP_STATUS_CANCELING
1088 b95479a5 Michael Hanselmann
        break
1089 b95479a5 Michael Hanselmann
1090 b95479a5 Michael Hanselmann
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1091 b95479a5 Michael Hanselmann
                         _JobDependencyManager.ERROR):
1092 b95479a5 Michael Hanselmann
        # Job failed or there was an error, this job must fail
1093 b95479a5 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1094 b95479a5 Michael Hanselmann
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1095 b95479a5 Michael Hanselmann
        break
1096 b95479a5 Michael Hanselmann
1097 b95479a5 Michael Hanselmann
      else:
1098 b95479a5 Michael Hanselmann
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1099 b95479a5 Michael Hanselmann
                                     depresult)
1100 b95479a5 Michael Hanselmann
1101 b95479a5 Michael Hanselmann
    return result
1102 b95479a5 Michael Hanselmann
1103 b80cc518 Michael Hanselmann
  def _ExecOpCodeUnlocked(self, opctx):
1104 be760ba8 Michael Hanselmann
    """Processes one opcode and returns the result.
1105 be760ba8 Michael Hanselmann

1106 be760ba8 Michael Hanselmann
    """
1107 b80cc518 Michael Hanselmann
    op = opctx.op
1108 b80cc518 Michael Hanselmann
1109 e6e17529 Hrvoje Ribicic
    assert op.status in (constants.OP_STATUS_WAITING,
1110 e6e17529 Hrvoje Ribicic
                         constants.OP_STATUS_CANCELING)
1111 e6e17529 Hrvoje Ribicic
1112 e6e17529 Hrvoje Ribicic
    # The very last check if the job was cancelled before trying to execute
1113 e6e17529 Hrvoje Ribicic
    if op.status == constants.OP_STATUS_CANCELING:
1114 e6e17529 Hrvoje Ribicic
      return (constants.OP_STATUS_CANCELING, None)
1115 be760ba8 Michael Hanselmann
1116 26d3fd2f Michael Hanselmann
    timeout = opctx.GetNextLockTimeout()
1117 26d3fd2f Michael Hanselmann
1118 be760ba8 Michael Hanselmann
    try:
1119 be760ba8 Michael Hanselmann
      # Make sure not to hold queue lock while calling ExecOpCode
1120 be760ba8 Michael Hanselmann
      result = self.opexec_fn(op.input,
1121 26d3fd2f Michael Hanselmann
                              _OpExecCallbacks(self.queue, self.job, op),
1122 e4e59de8 Michael Hanselmann
                              timeout=timeout)
1123 26d3fd2f Michael Hanselmann
    except mcpu.LockAcquireTimeout:
1124 26d3fd2f Michael Hanselmann
      assert timeout is not None, "Received timeout for blocking acquire"
1125 26d3fd2f Michael Hanselmann
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1126 9e49dfc5 Michael Hanselmann
1127 47099cd1 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_WAITING,
1128 9e49dfc5 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1129 9e49dfc5 Michael Hanselmann
1130 9e49dfc5 Michael Hanselmann
      # Was job cancelled while we were waiting for the lock?
1131 9e49dfc5 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELING:
1132 9e49dfc5 Michael Hanselmann
        return (constants.OP_STATUS_CANCELING, None)
1133 9e49dfc5 Michael Hanselmann
1134 942e2262 Michael Hanselmann
      # Queue is shutting down, return to queued
1135 942e2262 Michael Hanselmann
      if not self.queue.AcceptingJobsUnlocked():
1136 942e2262 Michael Hanselmann
        return (constants.OP_STATUS_QUEUED, None)
1137 942e2262 Michael Hanselmann
1138 5fd6b694 Michael Hanselmann
      # Stay in waitlock while trying to re-acquire lock
1139 47099cd1 Michael Hanselmann
      return (constants.OP_STATUS_WAITING, None)
1140 be760ba8 Michael Hanselmann
    except CancelJob:
1141 b80cc518 Michael Hanselmann
      logging.exception("%s: Canceling job", opctx.log_prefix)
1142 be760ba8 Michael Hanselmann
      assert op.status == constants.OP_STATUS_CANCELING
1143 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_CANCELING, None)
1144 942e2262 Michael Hanselmann
1145 942e2262 Michael Hanselmann
    except QueueShutdown:
1146 942e2262 Michael Hanselmann
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1147 942e2262 Michael Hanselmann
1148 942e2262 Michael Hanselmann
      assert op.status == constants.OP_STATUS_WAITING
1149 942e2262 Michael Hanselmann
1150 942e2262 Michael Hanselmann
      # Job hadn't been started yet, so it should return to the queue
1151 942e2262 Michael Hanselmann
      return (constants.OP_STATUS_QUEUED, None)
1152 942e2262 Michael Hanselmann
1153 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1154 b80cc518 Michael Hanselmann
      logging.exception("%s: Caught exception in %s",
1155 b80cc518 Michael Hanselmann
                        opctx.log_prefix, opctx.summary)
1156 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1157 be760ba8 Michael Hanselmann
    else:
1158 b80cc518 Michael Hanselmann
      logging.debug("%s: %s successful",
1159 b80cc518 Michael Hanselmann
                    opctx.log_prefix, opctx.summary)
1160 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_SUCCESS, result)
1161 be760ba8 Michael Hanselmann
1162 26d3fd2f Michael Hanselmann
  def __call__(self, _nextop_fn=None):
1163 be760ba8 Michael Hanselmann
    """Continues execution of a job.
1164 be760ba8 Michael Hanselmann

1165 26d3fd2f Michael Hanselmann
    @param _nextop_fn: Callback function for tests
1166 75d81fc8 Michael Hanselmann
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1167 75d81fc8 Michael Hanselmann
      be deferred and C{WAITDEP} if the dependency manager
1168 75d81fc8 Michael Hanselmann
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1169 be760ba8 Michael Hanselmann

1170 be760ba8 Michael Hanselmann
    """
1171 be760ba8 Michael Hanselmann
    queue = self.queue
1172 be760ba8 Michael Hanselmann
    job = self.job
1173 be760ba8 Michael Hanselmann
1174 be760ba8 Michael Hanselmann
    logging.debug("Processing job %s", job.id)
1175 be760ba8 Michael Hanselmann
1176 be760ba8 Michael Hanselmann
    queue.acquire(shared=1)
1177 be760ba8 Michael Hanselmann
    try:
1178 be760ba8 Michael Hanselmann
      opcount = len(job.ops)
1179 be760ba8 Michael Hanselmann
1180 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Expected writable job"
1181 c0f6d0d8 Michael Hanselmann
1182 66bd7445 Michael Hanselmann
      # Don't do anything for finalized jobs
1183 66bd7445 Michael Hanselmann
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1184 75d81fc8 Michael Hanselmann
        return self.FINISHED
1185 66bd7445 Michael Hanselmann
1186 26d3fd2f Michael Hanselmann
      # Is a previous opcode still pending?
1187 26d3fd2f Michael Hanselmann
      if job.cur_opctx:
1188 26d3fd2f Michael Hanselmann
        opctx = job.cur_opctx
1189 5fd6b694 Michael Hanselmann
        job.cur_opctx = None
1190 26d3fd2f Michael Hanselmann
      else:
1191 26d3fd2f Michael Hanselmann
        if __debug__ and _nextop_fn:
1192 26d3fd2f Michael Hanselmann
          _nextop_fn()
1193 26d3fd2f Michael Hanselmann
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1194 26d3fd2f Michael Hanselmann
1195 b80cc518 Michael Hanselmann
      op = opctx.op
1196 be760ba8 Michael Hanselmann
1197 be760ba8 Michael Hanselmann
      # Consistency check
1198 be760ba8 Michael Hanselmann
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1199 66bd7445 Michael Hanselmann
                                     constants.OP_STATUS_CANCELING)
1200 5fd6b694 Michael Hanselmann
                        for i in job.ops[opctx.index + 1:])
1201 be760ba8 Michael Hanselmann
1202 be760ba8 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_QUEUED,
1203 47099cd1 Michael Hanselmann
                           constants.OP_STATUS_WAITING,
1204 66bd7445 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1205 be760ba8 Michael Hanselmann
1206 26d3fd2f Michael Hanselmann
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1207 26d3fd2f Michael Hanselmann
              op.priority >= constants.OP_PRIO_HIGHEST)
1208 26d3fd2f Michael Hanselmann
1209 b95479a5 Michael Hanselmann
      waitjob = None
1210 b95479a5 Michael Hanselmann
1211 66bd7445 Michael Hanselmann
      if op.status != constants.OP_STATUS_CANCELING:
1212 30c945d0 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
1213 47099cd1 Michael Hanselmann
                             constants.OP_STATUS_WAITING)
1214 30c945d0 Michael Hanselmann
1215 be760ba8 Michael Hanselmann
        # Prepare to start opcode
1216 5fd6b694 Michael Hanselmann
        if self._MarkWaitlock(job, op):
1217 5fd6b694 Michael Hanselmann
          # Write to disk
1218 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1219 be760ba8 Michael Hanselmann
1220 47099cd1 Michael Hanselmann
        assert op.status == constants.OP_STATUS_WAITING
1221 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1222 5fd6b694 Michael Hanselmann
        assert job.start_timestamp and op.start_timestamp
1223 b95479a5 Michael Hanselmann
        assert waitjob is None
1224 b95479a5 Michael Hanselmann
1225 b95479a5 Michael Hanselmann
        # Check if waiting for a job is necessary
1226 b95479a5 Michael Hanselmann
        waitjob = self._CheckDependencies(queue, job, opctx)
1227 be760ba8 Michael Hanselmann
1228 47099cd1 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_WAITING,
1229 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_CANCELING,
1230 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_ERROR)
1231 be760ba8 Michael Hanselmann
1232 b95479a5 Michael Hanselmann
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1233 b95479a5 Michael Hanselmann
                                         constants.OP_STATUS_ERROR)):
1234 b95479a5 Michael Hanselmann
          logging.info("%s: opcode %s waiting for locks",
1235 b95479a5 Michael Hanselmann
                       opctx.log_prefix, opctx.summary)
1236 be760ba8 Michael Hanselmann
1237 b95479a5 Michael Hanselmann
          assert not opctx.jobdeps, "Not all dependencies were removed"
1238 b95479a5 Michael Hanselmann
1239 b95479a5 Michael Hanselmann
          queue.release()
1240 b95479a5 Michael Hanselmann
          try:
1241 b95479a5 Michael Hanselmann
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1242 b95479a5 Michael Hanselmann
          finally:
1243 b95479a5 Michael Hanselmann
            queue.acquire(shared=1)
1244 b95479a5 Michael Hanselmann
1245 b95479a5 Michael Hanselmann
          op.status = op_status
1246 b95479a5 Michael Hanselmann
          op.result = op_result
1247 b95479a5 Michael Hanselmann
1248 b95479a5 Michael Hanselmann
          assert not waitjob
1249 be760ba8 Michael Hanselmann
1250 942e2262 Michael Hanselmann
        if op.status in (constants.OP_STATUS_WAITING,
1251 942e2262 Michael Hanselmann
                         constants.OP_STATUS_QUEUED):
1252 942e2262 Michael Hanselmann
          # waiting: Couldn't get locks in time
1253 942e2262 Michael Hanselmann
          # queued: Queue is shutting down
1254 26d3fd2f Michael Hanselmann
          assert not op.end_timestamp
1255 be760ba8 Michael Hanselmann
        else:
1256 26d3fd2f Michael Hanselmann
          # Finalize opcode
1257 26d3fd2f Michael Hanselmann
          op.end_timestamp = TimeStampNow()
1258 be760ba8 Michael Hanselmann
1259 26d3fd2f Michael Hanselmann
          if op.status == constants.OP_STATUS_CANCELING:
1260 26d3fd2f Michael Hanselmann
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1261 26d3fd2f Michael Hanselmann
                                  for i in job.ops[opctx.index:])
1262 26d3fd2f Michael Hanselmann
          else:
1263 26d3fd2f Michael Hanselmann
            assert op.status in constants.OPS_FINALIZED
1264 be760ba8 Michael Hanselmann
1265 942e2262 Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
1266 942e2262 Michael Hanselmann
        # Queue is shutting down
1267 942e2262 Michael Hanselmann
        assert not waitjob
1268 942e2262 Michael Hanselmann
1269 942e2262 Michael Hanselmann
        finalize = False
1270 942e2262 Michael Hanselmann
1271 942e2262 Michael Hanselmann
        # Reset context
1272 942e2262 Michael Hanselmann
        job.cur_opctx = None
1273 942e2262 Michael Hanselmann
1274 942e2262 Michael Hanselmann
        # In no case must the status be finalized here
1275 942e2262 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1276 942e2262 Michael Hanselmann
1277 942e2262 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1278 be760ba8 Michael Hanselmann
        finalize = False
1279 be760ba8 Michael Hanselmann
1280 b95479a5 Michael Hanselmann
        if not waitjob and opctx.CheckPriorityIncrease():
1281 5fd6b694 Michael Hanselmann
          # Priority was changed, need to update on-disk file
1282 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1283 be760ba8 Michael Hanselmann
1284 26d3fd2f Michael Hanselmann
        # Keep around for another round
1285 26d3fd2f Michael Hanselmann
        job.cur_opctx = opctx
1286 be760ba8 Michael Hanselmann
1287 26d3fd2f Michael Hanselmann
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1288 26d3fd2f Michael Hanselmann
                op.priority >= constants.OP_PRIO_HIGHEST)
1289 be760ba8 Michael Hanselmann
1290 26d3fd2f Michael Hanselmann
        # In no case must the status be finalized here
1291 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1292 be760ba8 Michael Hanselmann
1293 be760ba8 Michael Hanselmann
      else:
1294 26d3fd2f Michael Hanselmann
        # Ensure all opcodes so far have been successful
1295 26d3fd2f Michael Hanselmann
        assert (opctx.index == 0 or
1296 26d3fd2f Michael Hanselmann
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1297 26d3fd2f Michael Hanselmann
                           for i in job.ops[:opctx.index]))
1298 26d3fd2f Michael Hanselmann
1299 26d3fd2f Michael Hanselmann
        # Reset context
1300 26d3fd2f Michael Hanselmann
        job.cur_opctx = None
1301 26d3fd2f Michael Hanselmann
1302 26d3fd2f Michael Hanselmann
        if op.status == constants.OP_STATUS_SUCCESS:
1303 26d3fd2f Michael Hanselmann
          finalize = False
1304 26d3fd2f Michael Hanselmann
1305 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_ERROR:
1306 26d3fd2f Michael Hanselmann
          # Ensure failed opcode has an exception as its result
1307 26d3fd2f Michael Hanselmann
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1308 26d3fd2f Michael Hanselmann
1309 26d3fd2f Michael Hanselmann
          to_encode = errors.OpExecError("Preceding opcode failed")
1310 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1311 26d3fd2f Michael Hanselmann
                                _EncodeOpError(to_encode))
1312 26d3fd2f Michael Hanselmann
          finalize = True
1313 be760ba8 Michael Hanselmann
1314 26d3fd2f Michael Hanselmann
          # Consistency check
1315 26d3fd2f Michael Hanselmann
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1316 26d3fd2f Michael Hanselmann
                            errors.GetEncodedError(i.result)
1317 26d3fd2f Michael Hanselmann
                            for i in job.ops[opctx.index:])
1318 be760ba8 Michael Hanselmann
1319 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELING:
1320 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1321 26d3fd2f Michael Hanselmann
                                "Job canceled by request")
1322 26d3fd2f Michael Hanselmann
          finalize = True
1323 26d3fd2f Michael Hanselmann
1324 26d3fd2f Michael Hanselmann
        else:
1325 26d3fd2f Michael Hanselmann
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1326 26d3fd2f Michael Hanselmann
1327 66bd7445 Michael Hanselmann
        if opctx.index == (opcount - 1):
1328 66bd7445 Michael Hanselmann
          # Finalize on last opcode
1329 66bd7445 Michael Hanselmann
          finalize = True
1330 66bd7445 Michael Hanselmann
1331 66bd7445 Michael Hanselmann
        if finalize:
1332 26d3fd2f Michael Hanselmann
          # All opcodes have been run, finalize job
1333 66bd7445 Michael Hanselmann
          job.Finalize()
1334 26d3fd2f Michael Hanselmann
1335 26d3fd2f Michael Hanselmann
        # Write to disk. If the job status is final, this is the final write
1336 26d3fd2f Michael Hanselmann
        # allowed. Once the file has been written, it can be archived anytime.
1337 26d3fd2f Michael Hanselmann
        queue.UpdateJobUnlocked(job)
1338 be760ba8 Michael Hanselmann
1339 b95479a5 Michael Hanselmann
        assert not waitjob
1340 b95479a5 Michael Hanselmann
1341 66bd7445 Michael Hanselmann
        if finalize:
1342 26d3fd2f Michael Hanselmann
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1343 75d81fc8 Michael Hanselmann
          return self.FINISHED
1344 be760ba8 Michael Hanselmann
1345 b95479a5 Michael Hanselmann
      assert not waitjob or queue.depmgr.JobWaiting(job)
1346 b95479a5 Michael Hanselmann
1347 75d81fc8 Michael Hanselmann
      if waitjob:
1348 75d81fc8 Michael Hanselmann
        return self.WAITDEP
1349 75d81fc8 Michael Hanselmann
      else:
1350 75d81fc8 Michael Hanselmann
        return self.DEFER
1351 be760ba8 Michael Hanselmann
    finally:
1352 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Job became read-only while being processed"
1353 be760ba8 Michael Hanselmann
      queue.release()
1354 be760ba8 Michael Hanselmann
1355 be760ba8 Michael Hanselmann
1356 df5a5730 Michael Hanselmann
def _EvaluateJobProcessorResult(depmgr, job, result):
1357 df5a5730 Michael Hanselmann
  """Looks at a result from L{_JobProcessor} for a job.
1358 df5a5730 Michael Hanselmann

1359 df5a5730 Michael Hanselmann
  To be used in a L{_JobQueueWorker}.
1360 df5a5730 Michael Hanselmann

1361 df5a5730 Michael Hanselmann
  """
1362 df5a5730 Michael Hanselmann
  if result == _JobProcessor.FINISHED:
1363 df5a5730 Michael Hanselmann
    # Notify waiting jobs
1364 df5a5730 Michael Hanselmann
    depmgr.NotifyWaiters(job.id)
1365 df5a5730 Michael Hanselmann
1366 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.DEFER:
1367 df5a5730 Michael Hanselmann
    # Schedule again
1368 df5a5730 Michael Hanselmann
    raise workerpool.DeferTask(priority=job.CalcPriority())
1369 df5a5730 Michael Hanselmann
1370 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.WAITDEP:
1371 df5a5730 Michael Hanselmann
    # No-op, dependency manager will re-schedule
1372 df5a5730 Michael Hanselmann
    pass
1373 df5a5730 Michael Hanselmann
1374 df5a5730 Michael Hanselmann
  else:
1375 df5a5730 Michael Hanselmann
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1376 df5a5730 Michael Hanselmann
                                 (result, ))
1377 df5a5730 Michael Hanselmann
1378 df5a5730 Michael Hanselmann
1379 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
1380 031a3e57 Michael Hanselmann
  """The actual job workers.
1381 031a3e57 Michael Hanselmann

1382 031a3e57 Michael Hanselmann
  """
1383 b459a848 Andrea Spadaccini
  def RunTask(self, job): # pylint: disable=W0221
1384 e2715f69 Michael Hanselmann
    """Job executor.
1385 e2715f69 Michael Hanselmann

1386 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1387 ea03467c Iustin Pop
    @param job: the job to be processed
1388 ea03467c Iustin Pop

1389 e2715f69 Michael Hanselmann
    """
1390 f8a4adfa Michael Hanselmann
    assert job.writable, "Expected writable job"
1391 f8a4adfa Michael Hanselmann
1392 b95479a5 Michael Hanselmann
    # Ensure only one worker is active on a single job. If a job registers for
1393 b95479a5 Michael Hanselmann
    # a dependency job, and the other job notifies before the first worker is
1394 b95479a5 Michael Hanselmann
    # done, the job can end up in the tasklist more than once.
1395 b95479a5 Michael Hanselmann
    job.processor_lock.acquire()
1396 b95479a5 Michael Hanselmann
    try:
1397 b95479a5 Michael Hanselmann
      return self._RunTaskInner(job)
1398 b95479a5 Michael Hanselmann
    finally:
1399 b95479a5 Michael Hanselmann
      job.processor_lock.release()
1400 b95479a5 Michael Hanselmann
1401 b95479a5 Michael Hanselmann
  def _RunTaskInner(self, job):
1402 b95479a5 Michael Hanselmann
    """Executes a job.
1403 b95479a5 Michael Hanselmann

1404 b95479a5 Michael Hanselmann
    Must be called with per-job lock acquired.
1405 b95479a5 Michael Hanselmann

1406 b95479a5 Michael Hanselmann
    """
1407 be760ba8 Michael Hanselmann
    queue = job.queue
1408 be760ba8 Michael Hanselmann
    assert queue == self.pool.queue
1409 be760ba8 Michael Hanselmann
1410 0aeeb6e3 Michael Hanselmann
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1411 0aeeb6e3 Michael Hanselmann
    setname_fn(None)
1412 daba67c7 Michael Hanselmann
1413 be760ba8 Michael Hanselmann
    proc = mcpu.Processor(queue.context, job.id)
1414 be760ba8 Michael Hanselmann
1415 0aeeb6e3 Michael Hanselmann
    # Create wrapper for setting thread name
1416 0aeeb6e3 Michael Hanselmann
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1417 0aeeb6e3 Michael Hanselmann
                                    proc.ExecOpCode)
1418 0aeeb6e3 Michael Hanselmann
1419 df5a5730 Michael Hanselmann
    _EvaluateJobProcessorResult(queue.depmgr, job,
1420 df5a5730 Michael Hanselmann
                                _JobProcessor(queue, wrap_execop_fn, job)())
1421 75d81fc8 Michael Hanselmann
1422 0aeeb6e3 Michael Hanselmann
  @staticmethod
1423 0aeeb6e3 Michael Hanselmann
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1424 0aeeb6e3 Michael Hanselmann
    """Updates the worker thread name to include a short summary of the opcode.
1425 0aeeb6e3 Michael Hanselmann

1426 0aeeb6e3 Michael Hanselmann
    @param setname_fn: Callable setting worker thread name
1427 0aeeb6e3 Michael Hanselmann
    @param execop_fn: Callable for executing opcode (usually
1428 0aeeb6e3 Michael Hanselmann
                      L{mcpu.Processor.ExecOpCode})
1429 0aeeb6e3 Michael Hanselmann

1430 0aeeb6e3 Michael Hanselmann
    """
1431 0aeeb6e3 Michael Hanselmann
    setname_fn(op)
1432 0aeeb6e3 Michael Hanselmann
    try:
1433 0aeeb6e3 Michael Hanselmann
      return execop_fn(op, *args, **kwargs)
1434 0aeeb6e3 Michael Hanselmann
    finally:
1435 0aeeb6e3 Michael Hanselmann
      setname_fn(None)
1436 0aeeb6e3 Michael Hanselmann
1437 0aeeb6e3 Michael Hanselmann
  @staticmethod
1438 0aeeb6e3 Michael Hanselmann
  def _GetWorkerName(job, op):
1439 0aeeb6e3 Michael Hanselmann
    """Sets the worker thread name.
1440 0aeeb6e3 Michael Hanselmann

1441 0aeeb6e3 Michael Hanselmann
    @type job: L{_QueuedJob}
1442 0aeeb6e3 Michael Hanselmann
    @type op: L{opcodes.OpCode}
1443 0aeeb6e3 Michael Hanselmann

1444 0aeeb6e3 Michael Hanselmann
    """
1445 0aeeb6e3 Michael Hanselmann
    parts = ["Job%s" % job.id]
1446 0aeeb6e3 Michael Hanselmann
1447 0aeeb6e3 Michael Hanselmann
    if op:
1448 0aeeb6e3 Michael Hanselmann
      parts.append(op.TinySummary())
1449 0aeeb6e3 Michael Hanselmann
1450 0aeeb6e3 Michael Hanselmann
    return "/".join(parts)
1451 0aeeb6e3 Michael Hanselmann
1452 e2715f69 Michael Hanselmann
1453 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
1454 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
1455 ea03467c Iustin Pop

1456 ea03467c Iustin Pop
  """
1457 5bdce580 Michael Hanselmann
  def __init__(self, queue):
1458 0aeeb6e3 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("Jq",
1459 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
1460 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
1461 5bdce580 Michael Hanselmann
    self.queue = queue
1462 e2715f69 Michael Hanselmann
1463 e2715f69 Michael Hanselmann
1464 b95479a5 Michael Hanselmann
class _JobDependencyManager:
1465 b95479a5 Michael Hanselmann
  """Keeps track of job dependencies.
1466 b95479a5 Michael Hanselmann

1467 b95479a5 Michael Hanselmann
  """
1468 b95479a5 Michael Hanselmann
  (WAIT,
1469 b95479a5 Michael Hanselmann
   ERROR,
1470 b95479a5 Michael Hanselmann
   CANCEL,
1471 b95479a5 Michael Hanselmann
   CONTINUE,
1472 b95479a5 Michael Hanselmann
   WRONGSTATUS) = range(1, 6)
1473 b95479a5 Michael Hanselmann
1474 b95479a5 Michael Hanselmann
  def __init__(self, getstatus_fn, enqueue_fn):
1475 b95479a5 Michael Hanselmann
    """Initializes this class.
1476 b95479a5 Michael Hanselmann

1477 b95479a5 Michael Hanselmann
    """
1478 b95479a5 Michael Hanselmann
    self._getstatus_fn = getstatus_fn
1479 b95479a5 Michael Hanselmann
    self._enqueue_fn = enqueue_fn
1480 b95479a5 Michael Hanselmann
1481 b95479a5 Michael Hanselmann
    self._waiters = {}
1482 b95479a5 Michael Hanselmann
    self._lock = locking.SharedLock("JobDepMgr")
1483 b95479a5 Michael Hanselmann
1484 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1485 b459a848 Andrea Spadaccini
  def GetLockInfo(self, requested): # pylint: disable=W0613
1486 fcb21ad7 Michael Hanselmann
    """Retrieves information about waiting jobs.
1487 fcb21ad7 Michael Hanselmann

1488 fcb21ad7 Michael Hanselmann
    @type requested: set
1489 fcb21ad7 Michael Hanselmann
    @param requested: Requested information, see C{query.LQ_*}
1490 fcb21ad7 Michael Hanselmann

1491 fcb21ad7 Michael Hanselmann
    """
1492 fcb21ad7 Michael Hanselmann
    # No need to sort here, that's being done by the lock manager and query
1493 fcb21ad7 Michael Hanselmann
    # library. There are no priorities for notifying jobs, hence all show up as
1494 fcb21ad7 Michael Hanselmann
    # one item under "pending".
1495 fcb21ad7 Michael Hanselmann
    return [("job/%s" % job_id, None, None,
1496 fcb21ad7 Michael Hanselmann
             [("job", [job.id for job in waiters])])
1497 fcb21ad7 Michael Hanselmann
            for job_id, waiters in self._waiters.items()
1498 fcb21ad7 Michael Hanselmann
            if waiters]
1499 fcb21ad7 Michael Hanselmann
1500 fcb21ad7 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1501 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
1502 b95479a5 Michael Hanselmann
    """Checks if a job is waiting.
1503 b95479a5 Michael Hanselmann

1504 b95479a5 Michael Hanselmann
    """
1505 b95479a5 Michael Hanselmann
    return compat.any(job in jobs
1506 b95479a5 Michael Hanselmann
                      for jobs in self._waiters.values())
1507 b95479a5 Michael Hanselmann
1508 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1509 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1510 b95479a5 Michael Hanselmann
    """Checks if a dependency job has the requested status.
1511 b95479a5 Michael Hanselmann

1512 b95479a5 Michael Hanselmann
    If the other job is not yet in a finalized status, the calling job will be
1513 b95479a5 Michael Hanselmann
    notified (re-added to the workerpool) at a later point.
1514 b95479a5 Michael Hanselmann

1515 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1516 b95479a5 Michael Hanselmann
    @param job: Job object
1517 76b62028 Iustin Pop
    @type dep_job_id: int
1518 b95479a5 Michael Hanselmann
    @param dep_job_id: ID of dependency job
1519 b95479a5 Michael Hanselmann
    @type dep_status: list
1520 b95479a5 Michael Hanselmann
    @param dep_status: Required status
1521 b95479a5 Michael Hanselmann

1522 b95479a5 Michael Hanselmann
    """
1523 76b62028 Iustin Pop
    assert ht.TJobId(job.id)
1524 76b62028 Iustin Pop
    assert ht.TJobId(dep_job_id)
1525 b95479a5 Michael Hanselmann
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1526 b95479a5 Michael Hanselmann
1527 b95479a5 Michael Hanselmann
    if job.id == dep_job_id:
1528 b95479a5 Michael Hanselmann
      return (self.ERROR, "Job can't depend on itself")
1529 b95479a5 Michael Hanselmann
1530 b95479a5 Michael Hanselmann
    # Get status of dependency job
1531 b95479a5 Michael Hanselmann
    try:
1532 b95479a5 Michael Hanselmann
      status = self._getstatus_fn(dep_job_id)
1533 b95479a5 Michael Hanselmann
    except errors.JobLost, err:
1534 b95479a5 Michael Hanselmann
      return (self.ERROR, "Dependency error: %s" % err)
1535 b95479a5 Michael Hanselmann
1536 b95479a5 Michael Hanselmann
    assert status in constants.JOB_STATUS_ALL
1537 b95479a5 Michael Hanselmann
1538 b95479a5 Michael Hanselmann
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1539 b95479a5 Michael Hanselmann
1540 b95479a5 Michael Hanselmann
    if status not in constants.JOBS_FINALIZED:
1541 b95479a5 Michael Hanselmann
      # Register for notification and wait for job to finish
1542 b95479a5 Michael Hanselmann
      job_id_waiters.add(job)
1543 b95479a5 Michael Hanselmann
      return (self.WAIT,
1544 b95479a5 Michael Hanselmann
              "Need to wait for job %s, wanted status '%s'" %
1545 b95479a5 Michael Hanselmann
              (dep_job_id, dep_status))
1546 b95479a5 Michael Hanselmann
1547 b95479a5 Michael Hanselmann
    # Remove from waiters list
1548 b95479a5 Michael Hanselmann
    if job in job_id_waiters:
1549 b95479a5 Michael Hanselmann
      job_id_waiters.remove(job)
1550 b95479a5 Michael Hanselmann
1551 b95479a5 Michael Hanselmann
    if (status == constants.JOB_STATUS_CANCELED and
1552 b95479a5 Michael Hanselmann
        constants.JOB_STATUS_CANCELED not in dep_status):
1553 b95479a5 Michael Hanselmann
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1554 b95479a5 Michael Hanselmann
1555 b95479a5 Michael Hanselmann
    elif not dep_status or status in dep_status:
1556 b95479a5 Michael Hanselmann
      return (self.CONTINUE,
1557 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s'" %
1558 b95479a5 Michael Hanselmann
              (dep_job_id, status))
1559 b95479a5 Michael Hanselmann
1560 b95479a5 Michael Hanselmann
    else:
1561 b95479a5 Michael Hanselmann
      return (self.WRONGSTATUS,
1562 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s',"
1563 b95479a5 Michael Hanselmann
              " not one of '%s' as required" %
1564 b95479a5 Michael Hanselmann
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1565 b95479a5 Michael Hanselmann
1566 37d76f1e Michael Hanselmann
  def _RemoveEmptyWaitersUnlocked(self):
1567 37d76f1e Michael Hanselmann
    """Remove all jobs without actual waiters.
1568 37d76f1e Michael Hanselmann

1569 37d76f1e Michael Hanselmann
    """
1570 37d76f1e Michael Hanselmann
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1571 37d76f1e Michael Hanselmann
                   if not waiters]:
1572 37d76f1e Michael Hanselmann
      del self._waiters[job_id]
1573 37d76f1e Michael Hanselmann
1574 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
1575 b95479a5 Michael Hanselmann
    """Notifies all jobs waiting for a certain job ID.
1576 b95479a5 Michael Hanselmann

1577 1316ebc2 Michael Hanselmann
    @attention: Do not call until L{CheckAndRegister} returned a status other
1578 1316ebc2 Michael Hanselmann
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1579 76b62028 Iustin Pop
    @type job_id: int
1580 b95479a5 Michael Hanselmann
    @param job_id: Job ID
1581 b95479a5 Michael Hanselmann

1582 b95479a5 Michael Hanselmann
    """
1583 76b62028 Iustin Pop
    assert ht.TJobId(job_id)
1584 b95479a5 Michael Hanselmann
1585 37d76f1e Michael Hanselmann
    self._lock.acquire()
1586 37d76f1e Michael Hanselmann
    try:
1587 37d76f1e Michael Hanselmann
      self._RemoveEmptyWaitersUnlocked()
1588 37d76f1e Michael Hanselmann
1589 37d76f1e Michael Hanselmann
      jobs = self._waiters.pop(job_id, None)
1590 37d76f1e Michael Hanselmann
    finally:
1591 37d76f1e Michael Hanselmann
      self._lock.release()
1592 37d76f1e Michael Hanselmann
1593 b95479a5 Michael Hanselmann
    if jobs:
1594 b95479a5 Michael Hanselmann
      # Re-add jobs to workerpool
1595 b95479a5 Michael Hanselmann
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1596 b95479a5 Michael Hanselmann
                    len(jobs), job_id)
1597 b95479a5 Michael Hanselmann
      self._enqueue_fn(jobs)
1598 b95479a5 Michael Hanselmann
1599 b95479a5 Michael Hanselmann
1600 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
1601 6c881c52 Iustin Pop
  """Decorator for "public" functions.
1602 ea03467c Iustin Pop

1603 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
1604 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
1605 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
1606 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
1607 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
1608 ea03467c Iustin Pop

1609 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
1610 f1da30e6 Michael Hanselmann

1611 6c881c52 Iustin Pop
  Example::
1612 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
1613 6c881c52 Iustin Pop
    @_RequireOpenQueue
1614 6c881c52 Iustin Pop
    def Example(self):
1615 6c881c52 Iustin Pop
      pass
1616 db37da70 Michael Hanselmann

1617 6c881c52 Iustin Pop
  """
1618 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
1619 b459a848 Andrea Spadaccini
    # pylint: disable=W0212
1620 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
1621 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
1622 6c881c52 Iustin Pop
  return wrapper
1623 db37da70 Michael Hanselmann
1624 db37da70 Michael Hanselmann
1625 c8d0be94 Michael Hanselmann
def _RequireNonDrainedQueue(fn):
1626 c8d0be94 Michael Hanselmann
  """Decorator checking for a non-drained queue.
1627 c8d0be94 Michael Hanselmann

1628 c8d0be94 Michael Hanselmann
  To be used with functions submitting new jobs.
1629 c8d0be94 Michael Hanselmann

1630 c8d0be94 Michael Hanselmann
  """
1631 c8d0be94 Michael Hanselmann
  def wrapper(self, *args, **kwargs):
1632 c8d0be94 Michael Hanselmann
    """Wrapper function.
1633 c8d0be94 Michael Hanselmann

1634 c8d0be94 Michael Hanselmann
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1635 c8d0be94 Michael Hanselmann

1636 c8d0be94 Michael Hanselmann
    """
1637 c8d0be94 Michael Hanselmann
    # Ok when sharing the big job queue lock, as the drain file is created when
1638 c8d0be94 Michael Hanselmann
    # the lock is exclusive.
1639 c8d0be94 Michael Hanselmann
    # Needs access to protected member, pylint: disable=W0212
1640 c8d0be94 Michael Hanselmann
    if self._drained:
1641 c8d0be94 Michael Hanselmann
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1642 6d5ea385 Michael Hanselmann
1643 6d5ea385 Michael Hanselmann
    if not self._accepting_jobs:
1644 6d5ea385 Michael Hanselmann
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1645 6d5ea385 Michael Hanselmann
1646 c8d0be94 Michael Hanselmann
    return fn(self, *args, **kwargs)
1647 c8d0be94 Michael Hanselmann
  return wrapper
1648 c8d0be94 Michael Hanselmann
1649 c8d0be94 Michael Hanselmann
1650 6c881c52 Iustin Pop
class JobQueue(object):
1651 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
1652 db37da70 Michael Hanselmann

1653 6c881c52 Iustin Pop
  """
1654 85f03e0d Michael Hanselmann
  def __init__(self, context):
1655 ea03467c Iustin Pop
    """Constructor for JobQueue.
1656 ea03467c Iustin Pop

1657 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
1658 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
1659 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
1660 ea03467c Iustin Pop
    running).
1661 ea03467c Iustin Pop

1662 ea03467c Iustin Pop
    @type context: GanetiContext
1663 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
1664 ea03467c Iustin Pop
        data and other ganeti objects
1665 ea03467c Iustin Pop

1666 ea03467c Iustin Pop
    """
1667 5bdce580 Michael Hanselmann
    self.context = context
1668 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
1669 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
1670 f1da30e6 Michael Hanselmann
1671 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
1672 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
1673 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
1674 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
1675 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
1676 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
1677 ebb80afa Guido Trotter
1678 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
1679 ebb80afa Guido Trotter
    self.release = self._lock.release
1680 85f03e0d Michael Hanselmann
1681 6d5ea385 Michael Hanselmann
    # Accept jobs by default
1682 6d5ea385 Michael Hanselmann
    self._accepting_jobs = True
1683 6d5ea385 Michael Hanselmann
1684 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
1685 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
1686 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1687 f1da30e6 Michael Hanselmann
1688 04ab05ce Michael Hanselmann
    # Read serial file
1689 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
1690 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
1691 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
1692 c4beba1c Iustin Pop
1693 23752136 Michael Hanselmann
    # Get initial list of nodes
1694 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
1695 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
1696 59303563 Iustin Pop
                       if n.master_candidate)
1697 8e00939c Michael Hanselmann
1698 8e00939c Michael Hanselmann
    # Remove master node
1699 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
1700 23752136 Michael Hanselmann
1701 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
1702 23752136 Michael Hanselmann
1703 6d5ea385 Michael Hanselmann
    self._queue_size = None
1704 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1705 6d5ea385 Michael Hanselmann
    assert ht.TInt(self._queue_size)
1706 ff699aa9 Michael Hanselmann
    self._drained = jstore.CheckDrainFlag()
1707 20571a26 Guido Trotter
1708 b95479a5 Michael Hanselmann
    # Job dependencies
1709 b95479a5 Michael Hanselmann
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1710 b95479a5 Michael Hanselmann
                                        self._EnqueueJobs)
1711 fcb21ad7 Michael Hanselmann
    self.context.glm.AddToLockMonitor(self.depmgr)
1712 b95479a5 Michael Hanselmann
1713 85f03e0d Michael Hanselmann
    # Setup worker pool
1714 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
1715 711b5124 Michael Hanselmann
1716 fcd70b89 Klaus Aehlig
  def _PickupJobUnlocked(self, job_id):
1717 fcd70b89 Klaus Aehlig
    """Load a job from the job queue
1718 fcd70b89 Klaus Aehlig

1719 fcd70b89 Klaus Aehlig
    Pick up a job that already is in the job queue and start/resume it.
1720 fcd70b89 Klaus Aehlig

1721 fcd70b89 Klaus Aehlig
    """
1722 fcd70b89 Klaus Aehlig
    job = self._LoadJobUnlocked(job_id)
1723 fcd70b89 Klaus Aehlig
1724 fcd70b89 Klaus Aehlig
    if job is None:
1725 fcd70b89 Klaus Aehlig
      logging.warning("Job %s could not be read", job_id)
1726 fcd70b89 Klaus Aehlig
      return
1727 fcd70b89 Klaus Aehlig
1728 fcd70b89 Klaus Aehlig
    status = job.CalcStatus()
1729 fcd70b89 Klaus Aehlig
    if status == constants.JOB_STATUS_QUEUED:
1730 fcd70b89 Klaus Aehlig
      self._EnqueueJobsUnlocked([job])
1731 fcd70b89 Klaus Aehlig
      logging.info("Restarting job %s", job.id)
1732 fcd70b89 Klaus Aehlig
1733 fcd70b89 Klaus Aehlig
    elif status in (constants.JOB_STATUS_RUNNING,
1734 fcd70b89 Klaus Aehlig
                    constants.JOB_STATUS_WAITING,
1735 fcd70b89 Klaus Aehlig
                    constants.JOB_STATUS_CANCELING):
1736 fcd70b89 Klaus Aehlig
      logging.warning("Unfinished job %s found: %s", job.id, job)
1737 fcd70b89 Klaus Aehlig
1738 fcd70b89 Klaus Aehlig
      if status == constants.JOB_STATUS_WAITING:
1739 fcd70b89 Klaus Aehlig
        job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1740 fcd70b89 Klaus Aehlig
        self._EnqueueJobsUnlocked([job])
1741 fcd70b89 Klaus Aehlig
        logging.info("Restarting job %s", job.id)
1742 fcd70b89 Klaus Aehlig
      else:
1743 f3ac6f36 Klaus Aehlig
        to_encode = errors.OpExecError("Unclean master daemon shutdown")
1744 fcd70b89 Klaus Aehlig
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1745 f3ac6f36 Klaus Aehlig
                              _EncodeOpError(to_encode))
1746 fcd70b89 Klaus Aehlig
        job.Finalize()
1747 fcd70b89 Klaus Aehlig
1748 fcd70b89 Klaus Aehlig
    self.UpdateJobUnlocked(job)
1749 fcd70b89 Klaus Aehlig
1750 fcd70b89 Klaus Aehlig
  @locking.ssynchronized(_LOCK)
1751 fcd70b89 Klaus Aehlig
  def PickupJob(self, job_id):
1752 fcd70b89 Klaus Aehlig
    self._PickupJobUnlocked(job_id)
1753 fcd70b89 Klaus Aehlig
1754 fb1ffbca Michael Hanselmann
  def _GetRpc(self, address_list):
1755 fb1ffbca Michael Hanselmann
    """Gets RPC runner with context.
1756 fb1ffbca Michael Hanselmann

1757 fb1ffbca Michael Hanselmann
    """
1758 fb1ffbca Michael Hanselmann
    return rpc.JobQueueRunner(self.context, address_list)
1759 fb1ffbca Michael Hanselmann
1760 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1761 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1762 99aabbed Iustin Pop
  def AddNode(self, node):
1763 99aabbed Iustin Pop
    """Register a new node with the queue.
1764 99aabbed Iustin Pop

1765 99aabbed Iustin Pop
    @type node: L{objects.Node}
1766 99aabbed Iustin Pop
    @param node: the node object to be added
1767 99aabbed Iustin Pop

1768 99aabbed Iustin Pop
    """
1769 99aabbed Iustin Pop
    node_name = node.name
1770 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1771 23752136 Michael Hanselmann
1772 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1773 fb1ffbca Michael Hanselmann
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1774 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1775 c8457ce7 Iustin Pop
    if msg:
1776 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1777 c8457ce7 Iustin Pop
                      node_name, msg)
1778 23752136 Michael Hanselmann
1779 59303563 Iustin Pop
    if not node.master_candidate:
1780 59303563 Iustin Pop
      # remove if existing, ignoring errors
1781 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1782 59303563 Iustin Pop
      # and skip the replication of the job ids
1783 59303563 Iustin Pop
      return
1784 59303563 Iustin Pop
1785 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1786 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1787 23752136 Michael Hanselmann
1788 d2e03a33 Michael Hanselmann
    # Upload current serial file
1789 e2b4a7ba Michael Hanselmann
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1790 d2e03a33 Michael Hanselmann
1791 fb1ffbca Michael Hanselmann
    # Static address list
1792 fb1ffbca Michael Hanselmann
    addrs = [node.primary_ip]
1793 fb1ffbca Michael Hanselmann
1794 d2e03a33 Michael Hanselmann
    for file_name in files:
1795 9f774ee8 Michael Hanselmann
      # Read file content
1796 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1797 9f774ee8 Michael Hanselmann
1798 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1799 cffbbae7 Michael Hanselmann
                             file_name, content)
1800 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1801 c8457ce7 Iustin Pop
      if msg:
1802 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1803 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1804 d2e03a33 Michael Hanselmann
1805 be6c403e Michael Hanselmann
    # Set queue drained flag
1806 be6c403e Michael Hanselmann
    result = \
1807 be6c403e Michael Hanselmann
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1808 be6c403e Michael Hanselmann
                                                       self._drained)
1809 be6c403e Michael Hanselmann
    msg = result[node_name].fail_msg
1810 be6c403e Michael Hanselmann
    if msg:
1811 be6c403e Michael Hanselmann
      logging.error("Failed to set queue drained flag on node %s: %s",
1812 be6c403e Michael Hanselmann
                    node_name, msg)
1813 be6c403e Michael Hanselmann
1814 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1815 d2e03a33 Michael Hanselmann
1816 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1817 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1818 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1819 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1820 ea03467c Iustin Pop

1821 ea03467c Iustin Pop
    @type node_name: str
1822 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1823 ea03467c Iustin Pop

1824 ea03467c Iustin Pop
    """
1825 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1826 23752136 Michael Hanselmann
1827 7e950d31 Iustin Pop
  @staticmethod
1828 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1829 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1830 ea03467c Iustin Pop

1831 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1832 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1833 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1834 ea03467c Iustin Pop

1835 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1836 ea03467c Iustin Pop
    @type nodes: list
1837 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1838 ea03467c Iustin Pop
    @type failmsg: str
1839 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1840 ea03467c Iustin Pop

1841 ea03467c Iustin Pop
    """
1842 e74798c1 Michael Hanselmann
    failed = []
1843 e74798c1 Michael Hanselmann
    success = []
1844 e74798c1 Michael Hanselmann
1845 e74798c1 Michael Hanselmann
    for node in nodes:
1846 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1847 c8457ce7 Iustin Pop
      if msg:
1848 e74798c1 Michael Hanselmann
        failed.append(node)
1849 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1850 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1851 c8457ce7 Iustin Pop
      else:
1852 c8457ce7 Iustin Pop
        success.append(node)
1853 e74798c1 Michael Hanselmann
1854 e74798c1 Michael Hanselmann
    # +1 for the master node
1855 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1856 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1857 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1858 e74798c1 Michael Hanselmann
1859 99aabbed Iustin Pop
  def _GetNodeIp(self):
1860 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1861 99aabbed Iustin Pop

1862 ea03467c Iustin Pop
    @rtype: (list, list)
1863 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1864 ea03467c Iustin Pop
        names and the second one with the node addresses
1865 ea03467c Iustin Pop

1866 99aabbed Iustin Pop
    """
1867 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1868 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1869 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1870 99aabbed Iustin Pop
    return name_list, addr_list
1871 99aabbed Iustin Pop
1872 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1873 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1874 8e00939c Michael Hanselmann

1875 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1876 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1877 ea03467c Iustin Pop

1878 ea03467c Iustin Pop
    @type file_name: str
1879 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1880 ea03467c Iustin Pop
    @type data: str
1881 ea03467c Iustin Pop
    @param data: the new contents of the file
1882 4c36bdf5 Guido Trotter
    @type replicate: boolean
1883 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1884 ea03467c Iustin Pop

1885 8e00939c Michael Hanselmann
    """
1886 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1887 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1888 fe05a931 Michele Tartara
                    gid=getents.daemons_gid,
1889 fe05a931 Michele Tartara
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1890 8e00939c Michael Hanselmann
1891 4c36bdf5 Guido Trotter
    if replicate:
1892 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1893 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1894 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1895 23752136 Michael Hanselmann
1896 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1897 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1898 ea03467c Iustin Pop

1899 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1900 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1901 ea03467c Iustin Pop

1902 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1903 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1904 ea03467c Iustin Pop

1905 ea03467c Iustin Pop
    """
1906 dd875d32 Michael Hanselmann
    # Rename them locally
1907 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1908 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1909 abc1f2ce Michael Hanselmann
1910 dd875d32 Michael Hanselmann
    # ... and on all nodes
1911 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1912 fb1ffbca Michael Hanselmann
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1913 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1914 abc1f2ce Michael Hanselmann
1915 85f03e0d Michael Hanselmann
  @staticmethod
1916 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1917 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1918 ea03467c Iustin Pop

1919 ea03467c Iustin Pop
    @type job_id: str
1920 ea03467c Iustin Pop
    @param job_id: the job identifier
1921 ea03467c Iustin Pop
    @rtype: str
1922 ea03467c Iustin Pop
    @return: the path to the job file
1923 ea03467c Iustin Pop

1924 ea03467c Iustin Pop
    """
1925 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1926 f1da30e6 Michael Hanselmann
1927 1410a389 Michael Hanselmann
  @staticmethod
1928 1410a389 Michael Hanselmann
  def _GetArchivedJobPath(job_id):
1929 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1930 ea03467c Iustin Pop

1931 ea03467c Iustin Pop
    @type job_id: str
1932 ea03467c Iustin Pop
    @param job_id: the job identifier
1933 ea03467c Iustin Pop
    @rtype: str
1934 ea03467c Iustin Pop
    @return: the path to the archived job file
1935 ea03467c Iustin Pop

1936 ea03467c Iustin Pop
    """
1937 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1938 1410a389 Michael Hanselmann
                          jstore.GetArchiveDirectory(job_id),
1939 1410a389 Michael Hanselmann
                          "job-%s" % job_id)
1940 0cb94105 Michael Hanselmann
1941 cb66225d Michael Hanselmann
  @staticmethod
1942 0422250e Michael Hanselmann
  def _DetermineJobDirectories(archived):
1943 bb921668 Michael Hanselmann
    """Build list of directories containing job files.
1944 bb921668 Michael Hanselmann

1945 bb921668 Michael Hanselmann
    @type archived: bool
1946 bb921668 Michael Hanselmann
    @param archived: Whether to include directories for archived jobs
1947 bb921668 Michael Hanselmann
    @rtype: list
1948 bb921668 Michael Hanselmann

1949 bb921668 Michael Hanselmann
    """
1950 0422250e Michael Hanselmann
    result = [pathutils.QUEUE_DIR]
1951 0422250e Michael Hanselmann
1952 0422250e Michael Hanselmann
    if archived:
1953 0422250e Michael Hanselmann
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1954 0422250e Michael Hanselmann
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1955 0422250e Michael Hanselmann
                        utils.ListVisibleFiles(archive_path)))
1956 0422250e Michael Hanselmann
1957 0422250e Michael Hanselmann
    return result
1958 0422250e Michael Hanselmann
1959 0422250e Michael Hanselmann
  @classmethod
1960 0422250e Michael Hanselmann
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1961 911a495b Iustin Pop
    """Return all known job IDs.
1962 911a495b Iustin Pop

1963 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1964 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1965 ac0930b9 Iustin Pop
    extra IDs).
1966 ac0930b9 Iustin Pop

1967 85a1c57d Guido Trotter
    @type sort: boolean
1968 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1969 ea03467c Iustin Pop
    @rtype: list
1970 ea03467c Iustin Pop
    @return: the list of job IDs
1971 ea03467c Iustin Pop

1972 911a495b Iustin Pop
    """
1973 85a1c57d Guido Trotter
    jlist = []
1974 0422250e Michael Hanselmann
1975 0422250e Michael Hanselmann
    for path in cls._DetermineJobDirectories(archived):
1976 0422250e Michael Hanselmann
      for filename in utils.ListVisibleFiles(path):
1977 0422250e Michael Hanselmann
        m = constants.JOB_FILE_RE.match(filename)
1978 0422250e Michael Hanselmann
        if m:
1979 0422250e Michael Hanselmann
          jlist.append(int(m.group(1)))
1980 0422250e Michael Hanselmann
1981 85a1c57d Guido Trotter
    if sort:
1982 76b62028 Iustin Pop
      jlist.sort()
1983 f0d874fe Iustin Pop
    return jlist
1984 911a495b Iustin Pop
1985 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1986 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1987 ea03467c Iustin Pop

1988 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1989 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1990 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1991 ea03467c Iustin Pop

1992 76b62028 Iustin Pop
    @type job_id: int
1993 ea03467c Iustin Pop
    @param job_id: the job id
1994 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1995 ea03467c Iustin Pop
    @return: either None or the job object
1996 ea03467c Iustin Pop

1997 ea03467c Iustin Pop
    """
1998 95a4e33f Hrvoje Ribicic
    assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
1999 95a4e33f Hrvoje Ribicic
2000 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
2001 5685c1a5 Michael Hanselmann
    if job:
2002 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
2003 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Found read-only job in memcache"
2004 5685c1a5 Michael Hanselmann
      return job
2005 ac0930b9 Iustin Pop
2006 3d6c5566 Guido Trotter
    try:
2007 194c8ca4 Michael Hanselmann
      job = self._LoadJobFromDisk(job_id, False)
2008 aa9f8167 Iustin Pop
      if job is None:
2009 aa9f8167 Iustin Pop
        return job
2010 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
2011 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
2012 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
2013 3d6c5566 Guido Trotter
      if old_path == new_path:
2014 3d6c5566 Guido Trotter
        # job already archived (future case)
2015 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
2016 3d6c5566 Guido Trotter
      else:
2017 3d6c5566 Guido Trotter
        # non-archived case
2018 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
2019 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
2020 3d6c5566 Guido Trotter
      return None
2021 162c8636 Guido Trotter
2022 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Job just loaded is not writable"
2023 c0f6d0d8 Michael Hanselmann
2024 162c8636 Guido Trotter
    self._memcache[job_id] = job
2025 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
2026 162c8636 Guido Trotter
    return job
2027 162c8636 Guido Trotter
2028 c0f6d0d8 Michael Hanselmann
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2029 162c8636 Guido Trotter
    """Load the given job file from disk.
2030 162c8636 Guido Trotter

2031 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2032 162c8636 Guido Trotter

2033 76b62028 Iustin Pop
    @type job_id: int
2034 162c8636 Guido Trotter
    @param job_id: job identifier
2035 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2036 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2037 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
2038 162c8636 Guido Trotter
    @return: either None or the job object
2039 162c8636 Guido Trotter

2040 162c8636 Guido Trotter
    """
2041 8a3cd185 Michael Hanselmann
    path_functions = [(self._GetJobPath, False)]
2042 194c8ca4 Michael Hanselmann
2043 194c8ca4 Michael Hanselmann
    if try_archived:
2044 8a3cd185 Michael Hanselmann
      path_functions.append((self._GetArchivedJobPath, True))
2045 194c8ca4 Michael Hanselmann
2046 194c8ca4 Michael Hanselmann
    raw_data = None
2047 8a3cd185 Michael Hanselmann
    archived = None
2048 194c8ca4 Michael Hanselmann
2049 8a3cd185 Michael Hanselmann
    for (fn, archived) in path_functions:
2050 194c8ca4 Michael Hanselmann
      filepath = fn(job_id)
2051 194c8ca4 Michael Hanselmann
      logging.debug("Loading job from %s", filepath)
2052 194c8ca4 Michael Hanselmann
      try:
2053 194c8ca4 Michael Hanselmann
        raw_data = utils.ReadFile(filepath)
2054 194c8ca4 Michael Hanselmann
      except EnvironmentError, err:
2055 194c8ca4 Michael Hanselmann
        if err.errno != errno.ENOENT:
2056 194c8ca4 Michael Hanselmann
          raise
2057 194c8ca4 Michael Hanselmann
      else:
2058 194c8ca4 Michael Hanselmann
        break
2059 194c8ca4 Michael Hanselmann
2060 194c8ca4 Michael Hanselmann
    if not raw_data:
2061 194c8ca4 Michael Hanselmann
      return None
2062 13998ef2 Michael Hanselmann
2063 c0f6d0d8 Michael Hanselmann
    if writable is None:
2064 8a3cd185 Michael Hanselmann
      writable = not archived
2065 c0f6d0d8 Michael Hanselmann
2066 94ed59a5 Iustin Pop
    try:
2067 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
2068 8a3cd185 Michael Hanselmann
      job = _QueuedJob.Restore(self, data, writable, archived)
2069 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
2070 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
2071 94ed59a5 Iustin Pop
2072 ac0930b9 Iustin Pop
    return job
2073 f1da30e6 Michael Hanselmann
2074 c0f6d0d8 Michael Hanselmann
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2075 0f9c08dc Guido Trotter
    """Load the given job file from disk.
2076 0f9c08dc Guido Trotter

2077 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2078 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
2079 0f9c08dc Guido Trotter
    exception is logged.
2080 0f9c08dc Guido Trotter

2081 76b62028 Iustin Pop
    @type job_id: int
2082 0f9c08dc Guido Trotter
    @param job_id: job identifier
2083 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2084 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2085 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
2086 0f9c08dc Guido Trotter
    @return: either None or the job object
2087 0f9c08dc Guido Trotter

2088 0f9c08dc Guido Trotter
    """
2089 0f9c08dc Guido Trotter
    try:
2090 c0f6d0d8 Michael Hanselmann
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2091 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
2092 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
2093 0f9c08dc Guido Trotter
      return None
2094 0f9c08dc Guido Trotter
2095 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
2096 20571a26 Guido Trotter
    """Update the queue size.
2097 20571a26 Guido Trotter

2098 20571a26 Guido Trotter
    """
2099 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2100 20571a26 Guido Trotter
2101 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2102 20571a26 Guido Trotter
  @_RequireOpenQueue
2103 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
2104 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
2105 3ccafd0e Iustin Pop

2106 ea03467c Iustin Pop
    @type drain_flag: boolean
2107 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
2108 ea03467c Iustin Pop

2109 3ccafd0e Iustin Pop
    """
2110 be6c403e Michael Hanselmann
    # Change flag locally
2111 ff699aa9 Michael Hanselmann
    jstore.SetDrainFlag(drain_flag)
2112 20571a26 Guido Trotter
2113 20571a26 Guido Trotter
    self._drained = drain_flag
2114 20571a26 Guido Trotter
2115 be6c403e Michael Hanselmann
    # ... and on all nodes
2116 be6c403e Michael Hanselmann
    (names, addrs) = self._GetNodeIp()
2117 be6c403e Michael Hanselmann
    result = \
2118 be6c403e Michael Hanselmann
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2119 be6c403e Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
2120 be6c403e Michael Hanselmann
                         "Setting queue drain flag to %s" % drain_flag)
2121 be6c403e Michael Hanselmann
2122 3ccafd0e Iustin Pop
    return True
2123 3ccafd0e Iustin Pop
2124 704b51ff Klaus Aehlig
  @classmethod
2125 704b51ff Klaus Aehlig
  def SubmitJob(cls, ops):
2126 2971c913 Iustin Pop
    """Create and store a new job.
2127 2971c913 Iustin Pop

2128 2971c913 Iustin Pop
    """
2129 704b51ff Klaus Aehlig
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops)
2130 2971c913 Iustin Pop
2131 704b51ff Klaus Aehlig
  @classmethod
2132 704b51ff Klaus Aehlig
  def SubmitJobToDrainedQueue(cls, ops):
2133 346c3037 Klaus Aehlig
    """Forcefully create and store a new job.
2134 346c3037 Klaus Aehlig

2135 346c3037 Klaus Aehlig
    Do so, even if the job queue is drained.
2136 346c3037 Klaus Aehlig

2137 346c3037 Klaus Aehlig
    """
2138 704b51ff Klaus Aehlig
    return luxi.Client(address=pathutils.QUERY_SOCKET)\
2139 704b51ff Klaus Aehlig
        .SubmitJobToDrainedQueue(ops)
2140 346c3037 Klaus Aehlig
2141 704b51ff Klaus Aehlig
  @classmethod
2142 704b51ff Klaus Aehlig
  def SubmitManyJobs(cls, jobs):
2143 2971c913 Iustin Pop
    """Create and store multiple jobs.
2144 2971c913 Iustin Pop

2145 2971c913 Iustin Pop
    """
2146 704b51ff Klaus Aehlig
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
2147 2971c913 Iustin Pop
2148 b247c6fc Michael Hanselmann
  @staticmethod
2149 b247c6fc Michael Hanselmann
  def _FormatSubmitError(msg, ops):
2150 b247c6fc Michael Hanselmann
    """Formats errors which occurred while submitting a job.
2151 b247c6fc Michael Hanselmann

2152 b247c6fc Michael Hanselmann
    """
2153 b247c6fc Michael Hanselmann
    return ("%s; opcodes %s" %
2154 b247c6fc Michael Hanselmann
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2155 b247c6fc Michael Hanselmann
2156 b247c6fc Michael Hanselmann
  @staticmethod
2157 b247c6fc Michael Hanselmann
  def _ResolveJobDependencies(resolve_fn, deps):
2158 b247c6fc Michael Hanselmann
    """Resolves relative job IDs in dependencies.
2159 b247c6fc Michael Hanselmann

2160 b247c6fc Michael Hanselmann
    @type resolve_fn: callable
2161 b247c6fc Michael Hanselmann
    @param resolve_fn: Function to resolve a relative job ID
2162 b247c6fc Michael Hanselmann
    @type deps: list
2163 b247c6fc Michael Hanselmann
    @param deps: Dependencies
2164 4c27b231 Michael Hanselmann
    @rtype: tuple; (boolean, string or list)
2165 4c27b231 Michael Hanselmann
    @return: If successful (first tuple item), the returned list contains
2166 4c27b231 Michael Hanselmann
      resolved job IDs along with the requested status; if not successful,
2167 4c27b231 Michael Hanselmann
      the second element is an error message
2168 b247c6fc Michael Hanselmann

2169 b247c6fc Michael Hanselmann
    """
2170 b247c6fc Michael Hanselmann
    result = []
2171 b247c6fc Michael Hanselmann
2172 b247c6fc Michael Hanselmann
    for (dep_job_id, dep_status) in deps:
2173 b247c6fc Michael Hanselmann
      if ht.TRelativeJobId(dep_job_id):
2174 b247c6fc Michael Hanselmann
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2175 b247c6fc Michael Hanselmann
        try:
2176 b247c6fc Michael Hanselmann
          job_id = resolve_fn(dep_job_id)
2177 b247c6fc Michael Hanselmann
        except IndexError:
2178 b247c6fc Michael Hanselmann
          # Abort
2179 b247c6fc Michael Hanselmann
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2180 b247c6fc Michael Hanselmann
      else:
2181 b247c6fc Michael Hanselmann
        job_id = dep_job_id
2182 b247c6fc Michael Hanselmann
2183 b247c6fc Michael Hanselmann
      result.append((job_id, dep_status))
2184 b247c6fc Michael Hanselmann
2185 b247c6fc Michael Hanselmann
    return (True, result)
2186 b247c6fc Michael Hanselmann
2187 75d81fc8 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2188 7b5c4a69 Michael Hanselmann
  def _EnqueueJobs(self, jobs):
2189 7b5c4a69 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2190 7b5c4a69 Michael Hanselmann

2191 7b5c4a69 Michael Hanselmann
    @type jobs: list
2192 7b5c4a69 Michael Hanselmann
    @param jobs: List of all jobs
2193 7b5c4a69 Michael Hanselmann

2194 7b5c4a69 Michael Hanselmann
    """
2195 75d81fc8 Michael Hanselmann
    return self._EnqueueJobsUnlocked(jobs)
2196 75d81fc8 Michael Hanselmann
2197 75d81fc8 Michael Hanselmann
  def _EnqueueJobsUnlocked(self, jobs):
2198 75d81fc8 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2199 75d81fc8 Michael Hanselmann

2200 75d81fc8 Michael Hanselmann
    @type jobs: list
2201 75d81fc8 Michael Hanselmann
    @param jobs: List of all jobs
2202 75d81fc8 Michael Hanselmann

2203 75d81fc8 Michael Hanselmann
    """
2204 75d81fc8 Michael Hanselmann
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2205 7b5c4a69 Michael Hanselmann
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2206 99fb250b Michael Hanselmann
                             priority=[job.CalcPriority() for job in jobs],
2207 99fb250b Michael Hanselmann
                             task_id=map(_GetIdAttr, jobs))
2208 7b5c4a69 Michael Hanselmann
2209 b95479a5 Michael Hanselmann
  def _GetJobStatusForDependencies(self, job_id):
2210 b95479a5 Michael Hanselmann
    """Gets the status of a job for dependencies.
2211 b95479a5 Michael Hanselmann

2212 76b62028 Iustin Pop
    @type job_id: int
2213 b95479a5 Michael Hanselmann
    @param job_id: Job ID
2214 b95479a5 Michael Hanselmann
    @raise errors.JobLost: If job can't be found
2215 b95479a5 Michael Hanselmann

2216 b95479a5 Michael Hanselmann
    """
2217 b95479a5 Michael Hanselmann
    # Not using in-memory cache as doing so would require an exclusive lock
2218 b95479a5 Michael Hanselmann
2219 b95479a5 Michael Hanselmann
    # Try to load from disk
2220 c0f6d0d8 Michael Hanselmann
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2221 c0f6d0d8 Michael Hanselmann
2222 17385bd2 Andrea Spadaccini
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2223 b95479a5 Michael Hanselmann
2224 b95479a5 Michael Hanselmann
    if job:
2225 b95479a5 Michael Hanselmann
      return job.CalcStatus()
2226 b95479a5 Michael Hanselmann
2227 b95479a5 Michael Hanselmann
    raise errors.JobLost("Job %s not found" % job_id)
2228 b95479a5 Michael Hanselmann
2229 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2230 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
2231 ea03467c Iustin Pop
    """Update a job's on disk storage.
2232 ea03467c Iustin Pop

2233 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
2234 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
2235 ea03467c Iustin Pop
    nodes.
2236 ea03467c Iustin Pop

2237 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
2238 ea03467c Iustin Pop
    @param job: the changed job
2239 4c36bdf5 Guido Trotter
    @type replicate: boolean
2240 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
2241 ea03467c Iustin Pop

2242 ea03467c Iustin Pop
    """
2243 66bd7445 Michael Hanselmann
    if __debug__:
2244 66bd7445 Michael Hanselmann
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2245 66bd7445 Michael Hanselmann
      assert (finalized ^ (job.end_timestamp is None))
2246 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't update read-only job"
2247 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't update archived job"
2248 66bd7445 Michael Hanselmann
2249 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
2250 a182a3ed Michael Hanselmann
    data = serializer.DumpJson(job.Serialize())
2251 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
2252 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
2253 ac0930b9 Iustin Pop
2254 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2255 5c735209 Iustin Pop
                        timeout):
2256 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
2257 6c5a7090 Michael Hanselmann

2258 76b62028 Iustin Pop
    @type job_id: int
2259 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
2260 6c5a7090 Michael Hanselmann
    @type fields: list of strings
2261 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
2262 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
2263 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
2264 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
2265 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
2266 5c735209 Iustin Pop
    @type timeout: float
2267 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
2268 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
2269 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
2270 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
2271 ea03467c Iustin Pop

2272 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
2273 ea03467c Iustin Pop
        we instead return a special value,
2274 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2275 ea03467c Iustin Pop
        as such by the clients
2276 6c5a7090 Michael Hanselmann

2277 6c5a7090 Michael Hanselmann
    """
2278 04569469 Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2279 c0f6d0d8 Michael Hanselmann
                             writable=False)
2280 989a8bee Michael Hanselmann
2281 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
2282 989a8bee Michael Hanselmann
2283 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
2284 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
2285 dfe57c22 Michael Hanselmann
2286 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2287 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2288 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
2289 188c5e0a Michael Hanselmann
    """Cancels a job.
2290 188c5e0a Michael Hanselmann

2291 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
2292 ea03467c Iustin Pop

2293 76b62028 Iustin Pop
    @type job_id: int
2294 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
2295 188c5e0a Michael Hanselmann

2296 188c5e0a Michael Hanselmann
    """
2297 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
2298 188c5e0a Michael Hanselmann
2299 aebd0e4e Michael Hanselmann
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2300 aebd0e4e Michael Hanselmann
2301 4679547e Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2302 4679547e Michael Hanselmann
  @_RequireOpenQueue
2303 4679547e Michael Hanselmann
  def ChangeJobPriority(self, job_id, priority):
2304 4679547e Michael Hanselmann
    """Changes a job's priority.
2305 4679547e Michael Hanselmann

2306 4679547e Michael Hanselmann
    @type job_id: int
2307 4679547e Michael Hanselmann
    @param job_id: ID of the job whose priority should be changed
2308 4679547e Michael Hanselmann
    @type priority: int
2309 4679547e Michael Hanselmann
    @param priority: New priority
2310 4679547e Michael Hanselmann

2311 4679547e Michael Hanselmann
    """
2312 4679547e Michael Hanselmann
    logging.info("Changing priority of job %s to %s", job_id, priority)
2313 4679547e Michael Hanselmann
2314 4679547e Michael Hanselmann
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2315 4679547e Michael Hanselmann
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2316 4679547e Michael Hanselmann
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2317 4679547e Michael Hanselmann
                                (priority, allowed))
2318 4679547e Michael Hanselmann
2319 4679547e Michael Hanselmann
    def fn(job):
2320 4679547e Michael Hanselmann
      (success, msg) = job.ChangePriority(priority)
2321 4679547e Michael Hanselmann
2322 4679547e Michael Hanselmann
      if success:
2323 4679547e Michael Hanselmann
        try:
2324 4679547e Michael Hanselmann
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2325 4679547e Michael Hanselmann
        except workerpool.NoSuchTask:
2326 4679547e Michael Hanselmann
          logging.debug("Job %s is not in workerpool at this time", job.id)
2327 4679547e Michael Hanselmann
2328 4679547e Michael Hanselmann
      return (success, msg)
2329 4679547e Michael Hanselmann
2330 4679547e Michael Hanselmann
    return self._ModifyJobUnlocked(job_id, fn)
2331 4679547e Michael Hanselmann
2332 aebd0e4e Michael Hanselmann
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2333 aebd0e4e Michael Hanselmann
    """Modifies a job.
2334 aebd0e4e Michael Hanselmann

2335 aebd0e4e Michael Hanselmann
    @type job_id: int
2336 aebd0e4e Michael Hanselmann
    @param job_id: Job ID
2337 aebd0e4e Michael Hanselmann
    @type mod_fn: callable
2338 aebd0e4e Michael Hanselmann
    @param mod_fn: Modifying function, receiving job object as parameter,
2339 aebd0e4e Michael Hanselmann
      returning tuple of (status boolean, message string)
2340 aebd0e4e Michael Hanselmann

2341 aebd0e4e Michael Hanselmann
    """
2342 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2343 188c5e0a Michael Hanselmann
    if not job:
2344 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2345 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
2346 fbf0262f Michael Hanselmann
2347 aebd0e4e Michael Hanselmann
    assert job.writable, "Can't modify read-only job"
2348 aebd0e4e Michael Hanselmann
    assert not job.archived, "Can't modify archived job"
2349 c0f6d0d8 Michael Hanselmann
2350 aebd0e4e Michael Hanselmann
    (success, msg) = mod_fn(job)
2351 188c5e0a Michael Hanselmann
2352 099b2870 Michael Hanselmann
    if success:
2353 66bd7445 Michael Hanselmann
      # If the job was finalized (e.g. cancelled), this is the final write
2354 66bd7445 Michael Hanselmann
      # allowed. The job can be archived anytime.
2355 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
2356 fbf0262f Michael Hanselmann
2357 099b2870 Michael Hanselmann
    return (success, msg)
2358 fbf0262f Michael Hanselmann
2359 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
2360 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
2361 d7fd1f28 Michael Hanselmann
    """Archives jobs.
2362 c609f802 Michael Hanselmann

2363 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
2364 25e7b43f Iustin Pop
    @param jobs: Job objects
2365 d7fd1f28 Michael Hanselmann
    @rtype: int
2366 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
2367 c609f802 Michael Hanselmann

2368 c609f802 Michael Hanselmann
    """
2369 d7fd1f28 Michael Hanselmann
    archive_jobs = []
2370 d7fd1f28 Michael Hanselmann
    rename_files = []
2371 d7fd1f28 Michael Hanselmann
    for job in jobs:
2372 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't archive read-only job"
2373 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't cancel archived job"
2374 c0f6d0d8 Michael Hanselmann
2375 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2376 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
2377 d7fd1f28 Michael Hanselmann
        continue
2378 c609f802 Michael Hanselmann
2379 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
2380 c609f802 Michael Hanselmann
2381 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
2382 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
2383 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
2384 c609f802 Michael Hanselmann
2385 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
2386 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
2387 f1da30e6 Michael Hanselmann
2388 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
2389 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
2390 d7fd1f28 Michael Hanselmann
2391 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2392 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
2393 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
2394 20571a26 Guido Trotter
    # archived jobs to fix this.
2395 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
2396 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
2397 78d12585 Michael Hanselmann
2398 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2399 07cd723a Iustin Pop
  @_RequireOpenQueue
2400 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
2401 07cd723a Iustin Pop
    """Archives a job.
2402 07cd723a Iustin Pop

2403 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2404 ea03467c Iustin Pop

2405 76b62028 Iustin Pop
    @type job_id: int
2406 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
2407 78d12585 Michael Hanselmann
    @rtype: bool
2408 78d12585 Michael Hanselmann
    @return: Whether job was archived
2409 07cd723a Iustin Pop

2410 07cd723a Iustin Pop
    """
2411 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
2412 78d12585 Michael Hanselmann
2413 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2414 78d12585 Michael Hanselmann
    if not job:
2415 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2416 78d12585 Michael Hanselmann
      return False
2417 78d12585 Michael Hanselmann
2418 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
2419 07cd723a Iustin Pop
2420 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2421 07cd723a Iustin Pop
  @_RequireOpenQueue
2422 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
2423 07cd723a Iustin Pop
    """Archives all jobs based on age.
2424 07cd723a Iustin Pop

2425 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
2426 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
2427 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
2428 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
2429 07cd723a Iustin Pop

2430 07cd723a Iustin Pop
    @type age: int
2431 07cd723a Iustin Pop
    @param age: the minimum age in seconds
2432 07cd723a Iustin Pop

2433 07cd723a Iustin Pop
    """
2434 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
2435 07cd723a Iustin Pop
2436 07cd723a Iustin Pop
    now = time.time()
2437 f8ad5591 Michael Hanselmann
    end_time = now + timeout
2438 f8ad5591 Michael Hanselmann
    archived_count = 0
2439 f8ad5591 Michael Hanselmann
    last_touched = 0
2440 f8ad5591 Michael Hanselmann
2441 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
2442 d7fd1f28 Michael Hanselmann
    pending = []
2443 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
2444 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
2445 f8ad5591 Michael Hanselmann
2446 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
2447 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
2448 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
2449 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
2450 f8ad5591 Michael Hanselmann
        break
2451 f8ad5591 Michael Hanselmann
2452 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
2453 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
2454 f8ad5591 Michael Hanselmann
      if job:
2455 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
2456 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
2457 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
2458 f8ad5591 Michael Hanselmann
          else:
2459 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
2460 07cd723a Iustin Pop
        else:
2461 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
2462 f8ad5591 Michael Hanselmann
2463 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
2464 d7fd1f28 Michael Hanselmann
          pending.append(job)
2465 d7fd1f28 Michael Hanselmann
2466 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
2467 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
2468 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
2469 d7fd1f28 Michael Hanselmann
            pending = []
2470 f8ad5591 Michael Hanselmann
2471 d7fd1f28 Michael Hanselmann
    if pending:
2472 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
2473 07cd723a Iustin Pop
2474 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
2475 07cd723a Iustin Pop
2476 e07f7f7a Michael Hanselmann
  def _Query(self, fields, qfilter):
2477 e07f7f7a Michael Hanselmann
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2478 e07f7f7a Michael Hanselmann
                       namefield="id")
2479 e07f7f7a Michael Hanselmann
2480 0422250e Michael Hanselmann
    # Archived jobs are only looked at if the "archived" field is referenced
2481 0422250e Michael Hanselmann
    # either as a requested field or in the filter. By default archived jobs
2482 0422250e Michael Hanselmann
    # are ignored.
2483 0422250e Michael Hanselmann
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2484 0422250e Michael Hanselmann
2485 e07f7f7a Michael Hanselmann
    job_ids = qobj.RequestedNames()
2486 e07f7f7a Michael Hanselmann
2487 e07f7f7a Michael Hanselmann
    list_all = (job_ids is None)
2488 e07f7f7a Michael Hanselmann
2489 e07f7f7a Michael Hanselmann
    if list_all:
2490 e07f7f7a Michael Hanselmann
      # Since files are added to/removed from the queue atomically, there's no
2491 e07f7f7a Michael Hanselmann
      # risk of getting the job ids in an inconsistent state.
2492 0422250e Michael Hanselmann
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2493 e07f7f7a Michael Hanselmann
2494 e07f7f7a Michael Hanselmann
    jobs = []
2495 e07f7f7a Michael Hanselmann
2496 e07f7f7a Michael Hanselmann
    for job_id in job_ids:
2497 e07f7f7a Michael Hanselmann
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2498 e07f7f7a Michael Hanselmann
      if job is not None or not list_all:
2499 e07f7f7a Michael Hanselmann
        jobs.append((job_id, job))
2500 e07f7f7a Michael Hanselmann
2501 e07f7f7a Michael Hanselmann
    return (qobj, jobs, list_all)
2502 e07f7f7a Michael Hanselmann
2503 e07f7f7a Michael Hanselmann
  def QueryJobs(self, fields, qfilter):
2504 e07f7f7a Michael Hanselmann
    """Returns a list of jobs in queue.
2505 e07f7f7a Michael Hanselmann

2506 e07f7f7a Michael Hanselmann
    @type fields: sequence
2507 e07f7f7a Michael Hanselmann
    @param fields: List of wanted fields
2508 e07f7f7a Michael Hanselmann
    @type qfilter: None or query2 filter (list)
2509 e07f7f7a Michael Hanselmann
    @param qfilter: Query filter
2510 e07f7f7a Michael Hanselmann

2511 e07f7f7a Michael Hanselmann
    """
2512 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2513 e07f7f7a Michael Hanselmann
2514 76b62028 Iustin Pop
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2515 e07f7f7a Michael Hanselmann
2516 e07f7f7a Michael Hanselmann
  def OldStyleQueryJobs(self, job_ids, fields):
2517 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
2518 e2715f69 Michael Hanselmann

2519 ea03467c Iustin Pop
    @type job_ids: list
2520 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
2521 ea03467c Iustin Pop
    @type fields: list
2522 ea03467c Iustin Pop
    @param fields: names of fields to return
2523 ea03467c Iustin Pop
    @rtype: list
2524 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
2525 ea03467c Iustin Pop
        the requested fields
2526 e2715f69 Michael Hanselmann

2527 e2715f69 Michael Hanselmann
    """
2528 76b62028 Iustin Pop
    # backwards compat:
2529 76b62028 Iustin Pop
    job_ids = [int(jid) for jid in job_ids]
2530 e07f7f7a Michael Hanselmann
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2531 e2715f69 Michael Hanselmann
2532 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2533 e2715f69 Michael Hanselmann
2534 76b62028 Iustin Pop
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2535 e2715f69 Michael Hanselmann
2536 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2537 6d5ea385 Michael Hanselmann
  def PrepareShutdown(self):
2538 6d5ea385 Michael Hanselmann
    """Prepare to stop the job queue.
2539 6d5ea385 Michael Hanselmann

2540 6d5ea385 Michael Hanselmann
    Disables execution of jobs in the workerpool and returns whether there are
2541 6d5ea385 Michael Hanselmann
    any jobs currently running. If the latter is the case, the job queue is not
2542 6d5ea385 Michael Hanselmann
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2543 6d5ea385 Michael Hanselmann
    be called without interfering with any job. Queued and unfinished jobs will
2544 6d5ea385 Michael Hanselmann
    be resumed next time.
2545 6d5ea385 Michael Hanselmann

2546 6d5ea385 Michael Hanselmann
    Once this function has been called no new job submissions will be accepted
2547 6d5ea385 Michael Hanselmann
    (see L{_RequireNonDrainedQueue}).
2548 6d5ea385 Michael Hanselmann

2549 6d5ea385 Michael Hanselmann
    @rtype: bool
2550 6d5ea385 Michael Hanselmann
    @return: Whether there are any running jobs
2551 6d5ea385 Michael Hanselmann

2552 6d5ea385 Michael Hanselmann
    """
2553 6d5ea385 Michael Hanselmann
    if self._accepting_jobs:
2554 6d5ea385 Michael Hanselmann
      self._accepting_jobs = False
2555 6d5ea385 Michael Hanselmann
2556 6d5ea385 Michael Hanselmann
      # Tell worker pool to stop processing pending tasks
2557 6d5ea385 Michael Hanselmann
      self._wpool.SetActive(False)
2558 6d5ea385 Michael Hanselmann
2559 6d5ea385 Michael Hanselmann
    return self._wpool.HasRunningTasks()
2560 6d5ea385 Michael Hanselmann
2561 942e2262 Michael Hanselmann
  def AcceptingJobsUnlocked(self):
2562 942e2262 Michael Hanselmann
    """Returns whether jobs are accepted.
2563 942e2262 Michael Hanselmann

2564 942e2262 Michael Hanselmann
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2565 942e2262 Michael Hanselmann
    queue is shutting down.
2566 942e2262 Michael Hanselmann

2567 942e2262 Michael Hanselmann
    @rtype: bool
2568 942e2262 Michael Hanselmann

2569 942e2262 Michael Hanselmann
    """
2570 942e2262 Michael Hanselmann
    return self._accepting_jobs
2571 942e2262 Michael Hanselmann
2572 6d5ea385 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2573 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2574 e2715f69 Michael Hanselmann
  def Shutdown(self):
2575 e2715f69 Michael Hanselmann
    """Stops the job queue.
2576 e2715f69 Michael Hanselmann

2577 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
2578 ea03467c Iustin Pop

2579 e2715f69 Michael Hanselmann
    """
2580 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
2581 85f03e0d Michael Hanselmann
2582 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
2583 a71f9c7d Guido Trotter
    self._queue_filelock = None