Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 95a4e33f

History | View | Annotate | Download (79.9 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 95a4e33f Hrvoje Ribicic
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 e2715f69 Michael Hanselmann
import logging
33 f1da30e6 Michael Hanselmann
import errno
34 f1048938 Iustin Pop
import time
35 5685c1a5 Michael Hanselmann
import weakref
36 b95479a5 Michael Hanselmann
import threading
37 dfc8824a Michael Hanselmann
import itertools
38 99fb250b Michael Hanselmann
import operator
39 498ae1cc Iustin Pop
40 6c2549d6 Guido Trotter
try:
41 b459a848 Andrea Spadaccini
  # pylint: disable=E0611
42 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
43 6c2549d6 Guido Trotter
except ImportError:
44 6c2549d6 Guido Trotter
  import pyinotify
45 6c2549d6 Guido Trotter
46 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
47 e2715f69 Michael Hanselmann
from ganeti import constants
48 f1da30e6 Michael Hanselmann
from ganeti import serializer
49 e2715f69 Michael Hanselmann
from ganeti import workerpool
50 99bd4f0a Guido Trotter
from ganeti import locking
51 f1da30e6 Michael Hanselmann
from ganeti import opcodes
52 7a1ecaed Iustin Pop
from ganeti import errors
53 e2715f69 Michael Hanselmann
from ganeti import mcpu
54 7996a135 Iustin Pop
from ganeti import utils
55 04ab05ce Michael Hanselmann
from ganeti import jstore
56 c3f0a12f Iustin Pop
from ganeti import rpc
57 82b22e19 René Nussbaumer
from ganeti import runtime
58 a744b676 Manuel Franceschini
from ganeti import netutils
59 989a8bee Michael Hanselmann
from ganeti import compat
60 b95479a5 Michael Hanselmann
from ganeti import ht
61 a06c6ae8 Michael Hanselmann
from ganeti import query
62 a06c6ae8 Michael Hanselmann
from ganeti import qlang
63 e2b4a7ba Michael Hanselmann
from ganeti import pathutils
64 cffbbae7 Michael Hanselmann
from ganeti import vcluster
65 e2715f69 Michael Hanselmann
66 fbf0262f Michael Hanselmann
67 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
68 e2715f69 Michael Hanselmann
69 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
70 ebb80afa Guido Trotter
_LOCK = "_lock"
71 ebb80afa Guido Trotter
_QUEUE = "_queue"
72 99bd4f0a Guido Trotter
73 99fb250b Michael Hanselmann
#: Retrieves "id" attribute
74 99fb250b Michael Hanselmann
_GetIdAttr = operator.attrgetter("id")
75 99fb250b Michael Hanselmann
76 498ae1cc Iustin Pop
77 9728ae5d Iustin Pop
class CancelJob(Exception):
78 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
79 fbf0262f Michael Hanselmann

80 fbf0262f Michael Hanselmann
  """
81 fbf0262f Michael Hanselmann
82 fbf0262f Michael Hanselmann
83 942e2262 Michael Hanselmann
class QueueShutdown(Exception):
84 942e2262 Michael Hanselmann
  """Special exception to abort a job when the job queue is shutting down.
85 942e2262 Michael Hanselmann

86 942e2262 Michael Hanselmann
  """
87 942e2262 Michael Hanselmann
88 942e2262 Michael Hanselmann
89 70552c46 Michael Hanselmann
def TimeStampNow():
90 ea03467c Iustin Pop
  """Returns the current timestamp.
91 ea03467c Iustin Pop

92 ea03467c Iustin Pop
  @rtype: tuple
93 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
94 ea03467c Iustin Pop

95 ea03467c Iustin Pop
  """
96 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
97 70552c46 Michael Hanselmann
98 70552c46 Michael Hanselmann
99 cffbbae7 Michael Hanselmann
def _CallJqUpdate(runner, names, file_name, content):
100 cffbbae7 Michael Hanselmann
  """Updates job queue file after virtualizing filename.
101 cffbbae7 Michael Hanselmann

102 cffbbae7 Michael Hanselmann
  """
103 cffbbae7 Michael Hanselmann
  virt_file_name = vcluster.MakeVirtualPath(file_name)
104 cffbbae7 Michael Hanselmann
  return runner.call_jobqueue_update(names, virt_file_name, content)
105 cffbbae7 Michael Hanselmann
106 cffbbae7 Michael Hanselmann
107 a06c6ae8 Michael Hanselmann
class _SimpleJobQuery:
108 a06c6ae8 Michael Hanselmann
  """Wrapper for job queries.
109 a06c6ae8 Michael Hanselmann

110 a06c6ae8 Michael Hanselmann
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
111 a06c6ae8 Michael Hanselmann

112 a06c6ae8 Michael Hanselmann
  """
113 a06c6ae8 Michael Hanselmann
  def __init__(self, fields):
114 a06c6ae8 Michael Hanselmann
    """Initializes this class.
115 a06c6ae8 Michael Hanselmann

116 a06c6ae8 Michael Hanselmann
    """
117 a06c6ae8 Michael Hanselmann
    self._query = query.Query(query.JOB_FIELDS, fields)
118 a06c6ae8 Michael Hanselmann
119 a06c6ae8 Michael Hanselmann
  def __call__(self, job):
120 a06c6ae8 Michael Hanselmann
    """Executes a job query using cached field list.
121 a06c6ae8 Michael Hanselmann

122 a06c6ae8 Michael Hanselmann
    """
123 a06c6ae8 Michael Hanselmann
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
124 a06c6ae8 Michael Hanselmann
125 a06c6ae8 Michael Hanselmann
126 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
127 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
128 e2715f69 Michael Hanselmann

129 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
130 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
131 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
132 ea03467c Iustin Pop
  @ivar status: the current status
133 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
134 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
135 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
137 f1048938 Iustin Pop

138 e2715f69 Michael Hanselmann
  """
139 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
140 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
141 66d895a8 Iustin Pop
               "__weakref__"]
142 66d895a8 Iustin Pop
143 85f03e0d Michael Hanselmann
  def __init__(self, op):
144 66abb9ff Michael Hanselmann
    """Initializes instances of this class.
145 ea03467c Iustin Pop

146 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
147 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
148 ea03467c Iustin Pop

149 ea03467c Iustin Pop
    """
150 85f03e0d Michael Hanselmann
    self.input = op
151 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
152 85f03e0d Michael Hanselmann
    self.result = None
153 85f03e0d Michael Hanselmann
    self.log = []
154 70552c46 Michael Hanselmann
    self.start_timestamp = None
155 b9b5abcb Iustin Pop
    self.exec_timestamp = None
156 70552c46 Michael Hanselmann
    self.end_timestamp = None
157 f1da30e6 Michael Hanselmann
158 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
159 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
160 8f5c488d Michael Hanselmann
161 f1da30e6 Michael Hanselmann
  @classmethod
162 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
163 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
164 ea03467c Iustin Pop

165 ea03467c Iustin Pop
    @type state: dict
166 ea03467c Iustin Pop
    @param state: the serialized state
167 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
168 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
169 ea03467c Iustin Pop

170 ea03467c Iustin Pop
    """
171 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
172 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173 85f03e0d Michael Hanselmann
    obj.status = state["status"]
174 85f03e0d Michael Hanselmann
    obj.result = state["result"]
175 85f03e0d Michael Hanselmann
    obj.log = state["log"]
176 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
177 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
178 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
179 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
180 f1da30e6 Michael Hanselmann
    return obj
181 f1da30e6 Michael Hanselmann
182 f1da30e6 Michael Hanselmann
  def Serialize(self):
183 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
184 ea03467c Iustin Pop

185 ea03467c Iustin Pop
    @rtype: dict
186 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
187 ea03467c Iustin Pop

188 ea03467c Iustin Pop
    """
189 6c5a7090 Michael Hanselmann
    return {
190 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
191 6c5a7090 Michael Hanselmann
      "status": self.status,
192 6c5a7090 Michael Hanselmann
      "result": self.result,
193 6c5a7090 Michael Hanselmann
      "log": self.log,
194 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
195 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
196 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
197 8f5c488d Michael Hanselmann
      "priority": self.priority,
198 6c5a7090 Michael Hanselmann
      }
199 f1048938 Iustin Pop
200 e2715f69 Michael Hanselmann
201 e2715f69 Michael Hanselmann
class _QueuedJob(object):
202 e2715f69 Michael Hanselmann
  """In-memory job representation.
203 e2715f69 Michael Hanselmann

204 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
205 ea03467c Iustin Pop
  be taken care of by users of this class.
206 ea03467c Iustin Pop

207 ea03467c Iustin Pop
  @type queue: L{JobQueue}
208 ea03467c Iustin Pop
  @ivar queue: the parent queue
209 ea03467c Iustin Pop
  @ivar id: the job ID
210 ea03467c Iustin Pop
  @type ops: list
211 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
212 ea03467c Iustin Pop
  @type log_serial: int
213 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
214 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
215 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
216 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
217 c0f6d0d8 Michael Hanselmann
  @ivar writable: Whether the job is allowed to be modified
218 e2715f69 Michael Hanselmann

219 e2715f69 Michael Hanselmann
  """
220 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
221 26d3fd2f Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
223 8a3cd185 Michael Hanselmann
               "__weakref__", "processor_lock", "writable", "archived"]
224 66d895a8 Iustin Pop
225 e0f2bf1e Michele Tartara
  def _AddReasons(self):
226 e0f2bf1e Michele Tartara
    """Extend the reason trail
227 e0f2bf1e Michele Tartara

228 e0f2bf1e Michele Tartara
    Add the reason for all the opcodes of this job to be executed.
229 e0f2bf1e Michele Tartara

230 e0f2bf1e Michele Tartara
    """
231 e0f2bf1e Michele Tartara
    count = 0
232 e0f2bf1e Michele Tartara
    for queued_op in self.ops:
233 e0f2bf1e Michele Tartara
      op = queued_op.input
234 e0f2bf1e Michele Tartara
      reason_src = opcodes.NameToReasonSrc(op.__class__.__name__)
235 e0f2bf1e Michele Tartara
      reason_text = "job=%d;index=%d" % (self.id, count)
236 e0f2bf1e Michele Tartara
      reason = getattr(op, "reason", [])
237 e0f2bf1e Michele Tartara
      reason.append((reason_src, reason_text, utils.EpochNano()))
238 e0f2bf1e Michele Tartara
      op.reason = reason
239 e0f2bf1e Michele Tartara
      count = count + 1
240 e0f2bf1e Michele Tartara
241 c0f6d0d8 Michael Hanselmann
  def __init__(self, queue, job_id, ops, writable):
242 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
243 ea03467c Iustin Pop

244 ea03467c Iustin Pop
    @type queue: L{JobQueue}
245 ea03467c Iustin Pop
    @param queue: our parent queue
246 ea03467c Iustin Pop
    @type job_id: job_id
247 ea03467c Iustin Pop
    @param job_id: our job id
248 ea03467c Iustin Pop
    @type ops: list
249 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
250 ea03467c Iustin Pop
        in _QueuedOpCodes
251 c0f6d0d8 Michael Hanselmann
    @type writable: bool
252 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
253 ea03467c Iustin Pop

254 ea03467c Iustin Pop
    """
255 e2715f69 Michael Hanselmann
    if not ops:
256 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
257 e2715f69 Michael Hanselmann
258 85f03e0d Michael Hanselmann
    self.queue = queue
259 76b62028 Iustin Pop
    self.id = int(job_id)
260 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
261 e0f2bf1e Michele Tartara
    self._AddReasons()
262 6c5a7090 Michael Hanselmann
    self.log_serial = 0
263 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
264 c56ec146 Iustin Pop
    self.start_timestamp = None
265 c56ec146 Iustin Pop
    self.end_timestamp = None
266 8a3cd185 Michael Hanselmann
    self.archived = False
267 6c5a7090 Michael Hanselmann
268 c0f6d0d8 Michael Hanselmann
    self._InitInMemory(self, writable)
269 fa4aa6b4 Michael Hanselmann
270 8a3cd185 Michael Hanselmann
    assert not self.archived, "New jobs can not be marked as archived"
271 8a3cd185 Michael Hanselmann
272 fa4aa6b4 Michael Hanselmann
  @staticmethod
273 c0f6d0d8 Michael Hanselmann
  def _InitInMemory(obj, writable):
274 fa4aa6b4 Michael Hanselmann
    """Initializes in-memory variables.
275 fa4aa6b4 Michael Hanselmann

276 fa4aa6b4 Michael Hanselmann
    """
277 c0f6d0d8 Michael Hanselmann
    obj.writable = writable
278 03b63608 Michael Hanselmann
    obj.ops_iter = None
279 26d3fd2f Michael Hanselmann
    obj.cur_opctx = None
280 f8a4adfa Michael Hanselmann
281 f8a4adfa Michael Hanselmann
    # Read-only jobs are not processed and therefore don't need a lock
282 f8a4adfa Michael Hanselmann
    if writable:
283 f8a4adfa Michael Hanselmann
      obj.processor_lock = threading.Lock()
284 f8a4adfa Michael Hanselmann
    else:
285 f8a4adfa Michael Hanselmann
      obj.processor_lock = None
286 be760ba8 Michael Hanselmann
287 9fa2e150 Michael Hanselmann
  def __repr__(self):
288 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
289 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
290 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
291 9fa2e150 Michael Hanselmann
292 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
293 9fa2e150 Michael Hanselmann
294 f1da30e6 Michael Hanselmann
  @classmethod
295 8a3cd185 Michael Hanselmann
  def Restore(cls, queue, state, writable, archived):
296 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
297 ea03467c Iustin Pop

298 ea03467c Iustin Pop
    @type queue: L{JobQueue}
299 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
300 ea03467c Iustin Pop
    @type state: dict
301 ea03467c Iustin Pop
    @param state: the serialized state
302 c0f6d0d8 Michael Hanselmann
    @type writable: bool
303 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
304 8a3cd185 Michael Hanselmann
    @type archived: bool
305 8a3cd185 Michael Hanselmann
    @param archived: Whether job was already archived
306 ea03467c Iustin Pop
    @rtype: _JobQueue
307 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
308 ea03467c Iustin Pop

309 ea03467c Iustin Pop
    """
310 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
311 85f03e0d Michael Hanselmann
    obj.queue = queue
312 76b62028 Iustin Pop
    obj.id = int(state["id"])
313 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
314 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
315 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
316 8a3cd185 Michael Hanselmann
    obj.archived = archived
317 6c5a7090 Michael Hanselmann
318 6c5a7090 Michael Hanselmann
    obj.ops = []
319 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
320 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
321 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
322 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
323 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
324 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
325 6c5a7090 Michael Hanselmann
326 c0f6d0d8 Michael Hanselmann
    cls._InitInMemory(obj, writable)
327 be760ba8 Michael Hanselmann
328 f1da30e6 Michael Hanselmann
    return obj
329 f1da30e6 Michael Hanselmann
330 f1da30e6 Michael Hanselmann
  def Serialize(self):
331 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
332 ea03467c Iustin Pop

333 ea03467c Iustin Pop
    @rtype: dict
334 ea03467c Iustin Pop
    @return: the serialized state
335 ea03467c Iustin Pop

336 ea03467c Iustin Pop
    """
337 f1da30e6 Michael Hanselmann
    return {
338 f1da30e6 Michael Hanselmann
      "id": self.id,
339 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
340 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
341 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
342 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
343 f1da30e6 Michael Hanselmann
      }
344 f1da30e6 Michael Hanselmann
345 85f03e0d Michael Hanselmann
  def CalcStatus(self):
346 ea03467c Iustin Pop
    """Compute the status of this job.
347 ea03467c Iustin Pop

348 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
349 ea03467c Iustin Pop
    based on their status, computes the job status.
350 ea03467c Iustin Pop

351 ea03467c Iustin Pop
    The algorithm is:
352 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
353 ea03467c Iustin Pop
        status will be the same
354 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
355 ea03467c Iustin Pop
          - waitlock
356 fbf0262f Michael Hanselmann
          - canceling
357 ea03467c Iustin Pop
          - running
358 ea03467c Iustin Pop

359 ea03467c Iustin Pop
        will determine the job status
360 ea03467c Iustin Pop

361 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
362 ea03467c Iustin Pop
        and the job status will be the same
363 ea03467c Iustin Pop

364 ea03467c Iustin Pop
    @return: the job status
365 ea03467c Iustin Pop

366 ea03467c Iustin Pop
    """
367 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
368 e2715f69 Michael Hanselmann
369 e2715f69 Michael Hanselmann
    all_success = True
370 85f03e0d Michael Hanselmann
    for op in self.ops:
371 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
372 e2715f69 Michael Hanselmann
        continue
373 e2715f69 Michael Hanselmann
374 e2715f69 Michael Hanselmann
      all_success = False
375 e2715f69 Michael Hanselmann
376 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
377 e2715f69 Michael Hanselmann
        pass
378 47099cd1 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING:
379 47099cd1 Michael Hanselmann
        status = constants.JOB_STATUS_WAITING
380 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
381 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
382 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
383 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
384 fbf0262f Michael Hanselmann
        break
385 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
386 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
387 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
388 f1da30e6 Michael Hanselmann
        break
389 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
390 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
391 4cb1d919 Michael Hanselmann
        break
392 e2715f69 Michael Hanselmann
393 e2715f69 Michael Hanselmann
    if all_success:
394 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
395 e2715f69 Michael Hanselmann
396 e2715f69 Michael Hanselmann
    return status
397 e2715f69 Michael Hanselmann
398 8f5c488d Michael Hanselmann
  def CalcPriority(self):
399 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
400 8f5c488d Michael Hanselmann

401 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
402 8f5c488d Michael Hanselmann
    priority is used.
403 8f5c488d Michael Hanselmann

404 8f5c488d Michael Hanselmann
    @rtype: int
405 8f5c488d Michael Hanselmann

406 8f5c488d Michael Hanselmann
    """
407 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
408 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
409 8f5c488d Michael Hanselmann
410 8f5c488d Michael Hanselmann
    if not priorities:
411 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
412 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
413 8f5c488d Michael Hanselmann
414 8f5c488d Michael Hanselmann
    return min(priorities)
415 8f5c488d Michael Hanselmann
416 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
417 ea03467c Iustin Pop
    """Selectively returns the log entries.
418 ea03467c Iustin Pop

419 ea03467c Iustin Pop
    @type newer_than: None or int
420 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
421 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
422 ea03467c Iustin Pop
        than this value
423 ea03467c Iustin Pop
    @rtype: list
424 ea03467c Iustin Pop
    @return: the list of the log entries selected
425 ea03467c Iustin Pop

426 ea03467c Iustin Pop
    """
427 6c5a7090 Michael Hanselmann
    if newer_than is None:
428 6c5a7090 Michael Hanselmann
      serial = -1
429 6c5a7090 Michael Hanselmann
    else:
430 6c5a7090 Michael Hanselmann
      serial = newer_than
431 6c5a7090 Michael Hanselmann
432 6c5a7090 Michael Hanselmann
    entries = []
433 6c5a7090 Michael Hanselmann
    for op in self.ops:
434 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
435 6c5a7090 Michael Hanselmann
436 6c5a7090 Michael Hanselmann
    return entries
437 6c5a7090 Michael Hanselmann
438 6a290889 Guido Trotter
  def GetInfo(self, fields):
439 6a290889 Guido Trotter
    """Returns information about a job.
440 6a290889 Guido Trotter

441 6a290889 Guido Trotter
    @type fields: list
442 6a290889 Guido Trotter
    @param fields: names of fields to return
443 6a290889 Guido Trotter
    @rtype: list
444 6a290889 Guido Trotter
    @return: list with one element for each field
445 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
446 6a290889 Guido Trotter
        has been passed
447 6a290889 Guido Trotter

448 6a290889 Guido Trotter
    """
449 a06c6ae8 Michael Hanselmann
    return _SimpleJobQuery(fields)(self)
450 6a290889 Guido Trotter
451 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
452 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
453 34327f51 Iustin Pop

454 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
455 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
456 34327f51 Iustin Pop
    finalised are not changed.
457 34327f51 Iustin Pop

458 34327f51 Iustin Pop
    @param status: a given opcode status
459 34327f51 Iustin Pop
    @param result: the opcode result
460 34327f51 Iustin Pop

461 34327f51 Iustin Pop
    """
462 747f6113 Michael Hanselmann
    not_marked = True
463 747f6113 Michael Hanselmann
    for op in self.ops:
464 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
465 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
466 747f6113 Michael Hanselmann
        continue
467 747f6113 Michael Hanselmann
      op.status = status
468 747f6113 Michael Hanselmann
      op.result = result
469 747f6113 Michael Hanselmann
      not_marked = False
470 34327f51 Iustin Pop
471 66bd7445 Michael Hanselmann
  def Finalize(self):
472 66bd7445 Michael Hanselmann
    """Marks the job as finalized.
473 66bd7445 Michael Hanselmann

474 66bd7445 Michael Hanselmann
    """
475 66bd7445 Michael Hanselmann
    self.end_timestamp = TimeStampNow()
476 66bd7445 Michael Hanselmann
477 099b2870 Michael Hanselmann
  def Cancel(self):
478 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
479 a0d2fe2c Michael Hanselmann

480 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
481 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
482 a0d2fe2c Michael Hanselmann
      as canceling and a text message
483 a0d2fe2c Michael Hanselmann

484 a0d2fe2c Michael Hanselmann
    """
485 099b2870 Michael Hanselmann
    status = self.CalcStatus()
486 099b2870 Michael Hanselmann
487 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
488 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
489 099b2870 Michael Hanselmann
                             "Job canceled by request")
490 66bd7445 Michael Hanselmann
      self.Finalize()
491 86b16e9d Michael Hanselmann
      return (True, "Job %s canceled" % self.id)
492 099b2870 Michael Hanselmann
493 47099cd1 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITING:
494 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
495 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
496 86b16e9d Michael Hanselmann
      return (True, "Job %s will be canceled" % self.id)
497 099b2870 Michael Hanselmann
498 86b16e9d Michael Hanselmann
    else:
499 86b16e9d Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
500 86b16e9d Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
501 099b2870 Michael Hanselmann
502 4679547e Michael Hanselmann
  def ChangePriority(self, priority):
503 4679547e Michael Hanselmann
    """Changes the job priority.
504 4679547e Michael Hanselmann

505 4679547e Michael Hanselmann
    @type priority: int
506 4679547e Michael Hanselmann
    @param priority: New priority
507 4679547e Michael Hanselmann
    @rtype: tuple; (bool, string)
508 4679547e Michael Hanselmann
    @return: Boolean describing whether job's priority was successfully changed
509 4679547e Michael Hanselmann
      and a text message
510 4679547e Michael Hanselmann

511 4679547e Michael Hanselmann
    """
512 4679547e Michael Hanselmann
    status = self.CalcStatus()
513 4679547e Michael Hanselmann
514 4679547e Michael Hanselmann
    if status in constants.JOBS_FINALIZED:
515 4679547e Michael Hanselmann
      return (False, "Job %s is finished" % self.id)
516 4679547e Michael Hanselmann
    elif status == constants.JOB_STATUS_CANCELING:
517 4679547e Michael Hanselmann
      return (False, "Job %s is cancelling" % self.id)
518 4679547e Michael Hanselmann
    else:
519 4679547e Michael Hanselmann
      assert status in (constants.JOB_STATUS_QUEUED,
520 4679547e Michael Hanselmann
                        constants.JOB_STATUS_WAITING,
521 4679547e Michael Hanselmann
                        constants.JOB_STATUS_RUNNING)
522 4679547e Michael Hanselmann
523 4679547e Michael Hanselmann
      changed = False
524 4679547e Michael Hanselmann
      for op in self.ops:
525 4679547e Michael Hanselmann
        if (op.status == constants.OP_STATUS_RUNNING or
526 4679547e Michael Hanselmann
            op.status in constants.OPS_FINALIZED):
527 4679547e Michael Hanselmann
          assert not changed, \
528 4679547e Michael Hanselmann
            ("Found opcode for which priority should not be changed after"
529 4679547e Michael Hanselmann
             " priority has been changed for previous opcodes")
530 4679547e Michael Hanselmann
          continue
531 4679547e Michael Hanselmann
532 4679547e Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
533 4679547e Michael Hanselmann
                             constants.OP_STATUS_WAITING)
534 4679547e Michael Hanselmann
535 4679547e Michael Hanselmann
        changed = True
536 4679547e Michael Hanselmann
537 3c631ea2 Michael Hanselmann
        # Set new priority (doesn't modify opcode input)
538 4679547e Michael Hanselmann
        op.priority = priority
539 4679547e Michael Hanselmann
540 4679547e Michael Hanselmann
      if changed:
541 4679547e Michael Hanselmann
        return (True, ("Priorities of pending opcodes for job %s have been"
542 4679547e Michael Hanselmann
                       " changed to %s" % (self.id, priority)))
543 4679547e Michael Hanselmann
      else:
544 4679547e Michael Hanselmann
        return (False, "Job %s had no pending opcodes" % self.id)
545 4679547e Michael Hanselmann
546 f1048938 Iustin Pop
547 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
548 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
549 031a3e57 Michael Hanselmann
    """Initializes this class.
550 ea03467c Iustin Pop

551 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
552 031a3e57 Michael Hanselmann
    @param queue: Job queue
553 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
554 031a3e57 Michael Hanselmann
    @param job: Job object
555 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
556 031a3e57 Michael Hanselmann
    @param op: OpCode
557 031a3e57 Michael Hanselmann

558 031a3e57 Michael Hanselmann
    """
559 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
560 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
561 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
562 031a3e57 Michael Hanselmann
563 031a3e57 Michael Hanselmann
    self._queue = queue
564 031a3e57 Michael Hanselmann
    self._job = job
565 031a3e57 Michael Hanselmann
    self._op = op
566 031a3e57 Michael Hanselmann
567 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
568 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
569 dc1e2262 Michael Hanselmann

570 dc1e2262 Michael Hanselmann
    """
571 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
572 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
573 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
574 dc1e2262 Michael Hanselmann
      raise CancelJob()
575 dc1e2262 Michael Hanselmann
576 942e2262 Michael Hanselmann
    # See if queue is shutting down
577 942e2262 Michael Hanselmann
    if not self._queue.AcceptingJobsUnlocked():
578 942e2262 Michael Hanselmann
      logging.debug("Queue is shutting down")
579 942e2262 Michael Hanselmann
      raise QueueShutdown()
580 942e2262 Michael Hanselmann
581 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
582 031a3e57 Michael Hanselmann
  def NotifyStart(self):
583 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
584 e92376d7 Iustin Pop

585 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
586 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
587 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
588 47099cd1 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
589 e92376d7 Iustin Pop

590 e92376d7 Iustin Pop
    """
591 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
592 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
593 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
594 fbf0262f Michael Hanselmann
595 271daef8 Iustin Pop
    # Cancel here if we were asked to
596 dc1e2262 Michael Hanselmann
    self._CheckCancel()
597 fbf0262f Michael Hanselmann
598 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
599 9bdab621 Michael Hanselmann
600 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
601 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
602 271daef8 Iustin Pop
603 271daef8 Iustin Pop
    # And finally replicate the job status
604 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
605 031a3e57 Michael Hanselmann
606 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
607 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
608 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
609 9bf5e01f Guido Trotter

610 9bf5e01f Guido Trotter
    """
611 9bf5e01f Guido Trotter
    self._job.log_serial += 1
612 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
613 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
614 9bf5e01f Guido Trotter
615 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
616 031a3e57 Michael Hanselmann
    """Append a log entry.
617 031a3e57 Michael Hanselmann

618 031a3e57 Michael Hanselmann
    """
619 031a3e57 Michael Hanselmann
    assert len(args) < 3
620 031a3e57 Michael Hanselmann
621 031a3e57 Michael Hanselmann
    if len(args) == 1:
622 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
623 031a3e57 Michael Hanselmann
      log_msg = args[0]
624 031a3e57 Michael Hanselmann
    else:
625 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
626 031a3e57 Michael Hanselmann
627 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
628 031a3e57 Michael Hanselmann
    # precision.
629 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
630 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
631 031a3e57 Michael Hanselmann
632 e4e59de8 Michael Hanselmann
  def CurrentPriority(self):
633 e4e59de8 Michael Hanselmann
    """Returns current priority for opcode.
634 ef2df7d3 Michael Hanselmann

635 ef2df7d3 Michael Hanselmann
    """
636 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
637 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
638 dc1e2262 Michael Hanselmann
639 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
640 dc1e2262 Michael Hanselmann
    self._CheckCancel()
641 dc1e2262 Michael Hanselmann
642 e4e59de8 Michael Hanselmann
    return self._op.priority
643 e4e59de8 Michael Hanselmann
644 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
645 6a373640 Michael Hanselmann
    """Submits jobs for processing.
646 6a373640 Michael Hanselmann

647 6a373640 Michael Hanselmann
    See L{JobQueue.SubmitManyJobs}.
648 6a373640 Michael Hanselmann

649 6a373640 Michael Hanselmann
    """
650 6a373640 Michael Hanselmann
    # Locking is done in job queue
651 6a373640 Michael Hanselmann
    return self._queue.SubmitManyJobs(jobs)
652 6a373640 Michael Hanselmann
653 031a3e57 Michael Hanselmann
654 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
655 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
656 989a8bee Michael Hanselmann
    """Initializes this class.
657 6c2549d6 Guido Trotter

658 989a8bee Michael Hanselmann
    @type fields: list of strings
659 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
660 989a8bee Michael Hanselmann
    @type prev_job_info: string
661 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
662 989a8bee Michael Hanselmann
    @type prev_log_serial: string
663 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
664 6c2549d6 Guido Trotter

665 989a8bee Michael Hanselmann
    """
666 dc2879ea Michael Hanselmann
    self._squery = _SimpleJobQuery(fields)
667 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
668 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
669 6c2549d6 Guido Trotter
670 989a8bee Michael Hanselmann
  def __call__(self, job):
671 989a8bee Michael Hanselmann
    """Checks whether job has changed.
672 6c2549d6 Guido Trotter

673 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
674 989a8bee Michael Hanselmann
    @param job: Job object
675 6c2549d6 Guido Trotter

676 6c2549d6 Guido Trotter
    """
677 c0f6d0d8 Michael Hanselmann
    assert not job.writable, "Expected read-only job"
678 c0f6d0d8 Michael Hanselmann
679 989a8bee Michael Hanselmann
    status = job.CalcStatus()
680 dc2879ea Michael Hanselmann
    job_info = self._squery(job)
681 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
682 6c2549d6 Guido Trotter
683 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
684 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
685 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
686 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
687 6c2549d6 Guido Trotter
    # significantly different.
688 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
689 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
690 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
691 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
692 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
693 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
694 6c2549d6 Guido Trotter
695 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
696 6c2549d6 Guido Trotter
    # no changes.
697 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
698 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
699 47099cd1 Michael Hanselmann
                       constants.JOB_STATUS_WAITING) or
700 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
701 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
702 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
703 989a8bee Michael Hanselmann
      return (job_info, log_entries)
704 6c2549d6 Guido Trotter
705 989a8bee Michael Hanselmann
    return None
706 989a8bee Michael Hanselmann
707 989a8bee Michael Hanselmann
708 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
709 383477e9 Michael Hanselmann
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
710 989a8bee Michael Hanselmann
    """Initializes this class.
711 989a8bee Michael Hanselmann

712 989a8bee Michael Hanselmann
    @type filename: string
713 989a8bee Michael Hanselmann
    @param filename: Path to job file
714 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
715 6c2549d6 Guido Trotter

716 989a8bee Michael Hanselmann
    """
717 383477e9 Michael Hanselmann
    self._wm = _inotify_wm_cls()
718 989a8bee Michael Hanselmann
    self._inotify_handler = \
719 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
720 989a8bee Michael Hanselmann
    self._notifier = \
721 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
722 989a8bee Michael Hanselmann
    try:
723 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
724 989a8bee Michael Hanselmann
    except Exception:
725 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
726 989a8bee Michael Hanselmann
      self._notifier.stop()
727 989a8bee Michael Hanselmann
      raise
728 989a8bee Michael Hanselmann
729 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
730 989a8bee Michael Hanselmann
    """Callback for inotify.
731 989a8bee Michael Hanselmann

732 989a8bee Michael Hanselmann
    """
733 6c2549d6 Guido Trotter
    if not notifier_enabled:
734 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
735 989a8bee Michael Hanselmann
736 989a8bee Michael Hanselmann
  def Wait(self, timeout):
737 989a8bee Michael Hanselmann
    """Waits for the job file to change.
738 989a8bee Michael Hanselmann

739 989a8bee Michael Hanselmann
    @type timeout: float
740 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
741 989a8bee Michael Hanselmann
    @return: Whether there have been events
742 989a8bee Michael Hanselmann

743 989a8bee Michael Hanselmann
    """
744 989a8bee Michael Hanselmann
    assert timeout >= 0
745 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
746 989a8bee Michael Hanselmann
    if have_events:
747 989a8bee Michael Hanselmann
      self._notifier.read_events()
748 989a8bee Michael Hanselmann
    self._notifier.process_events()
749 989a8bee Michael Hanselmann
    return have_events
750 989a8bee Michael Hanselmann
751 989a8bee Michael Hanselmann
  def Close(self):
752 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
753 989a8bee Michael Hanselmann

754 989a8bee Michael Hanselmann
    """
755 989a8bee Michael Hanselmann
    self._notifier.stop()
756 989a8bee Michael Hanselmann
757 989a8bee Michael Hanselmann
758 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
759 383477e9 Michael Hanselmann
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
760 989a8bee Michael Hanselmann
    """Initializes this class.
761 989a8bee Michael Hanselmann

762 989a8bee Michael Hanselmann
    @type filename: string
763 989a8bee Michael Hanselmann
    @param filename: Path to job file
764 989a8bee Michael Hanselmann

765 989a8bee Michael Hanselmann
    """
766 989a8bee Michael Hanselmann
    self._filewaiter = None
767 989a8bee Michael Hanselmann
    self._filename = filename
768 383477e9 Michael Hanselmann
    self._waiter_cls = _waiter_cls
769 6c2549d6 Guido Trotter
770 989a8bee Michael Hanselmann
  def Wait(self, timeout):
771 989a8bee Michael Hanselmann
    """Waits for a job to change.
772 6c2549d6 Guido Trotter

773 989a8bee Michael Hanselmann
    @type timeout: float
774 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
775 989a8bee Michael Hanselmann
    @return: Whether there have been events
776 989a8bee Michael Hanselmann

777 989a8bee Michael Hanselmann
    """
778 989a8bee Michael Hanselmann
    if self._filewaiter:
779 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
780 989a8bee Michael Hanselmann
781 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
782 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
783 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
784 989a8bee Michael Hanselmann
    # race condition.
785 383477e9 Michael Hanselmann
    self._filewaiter = self._waiter_cls(self._filename)
786 989a8bee Michael Hanselmann
787 989a8bee Michael Hanselmann
    return True
788 989a8bee Michael Hanselmann
789 989a8bee Michael Hanselmann
  def Close(self):
790 989a8bee Michael Hanselmann
    """Closes underlying waiter.
791 989a8bee Michael Hanselmann

792 989a8bee Michael Hanselmann
    """
793 989a8bee Michael Hanselmann
    if self._filewaiter:
794 989a8bee Michael Hanselmann
      self._filewaiter.Close()
795 989a8bee Michael Hanselmann
796 989a8bee Michael Hanselmann
797 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
798 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
799 989a8bee Michael Hanselmann

800 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
801 989a8bee Michael Hanselmann
  the current job status has changed.
802 989a8bee Michael Hanselmann

803 989a8bee Michael Hanselmann
  """
804 989a8bee Michael Hanselmann
  @staticmethod
805 dfc8824a Michael Hanselmann
  def _CheckForChanges(counter, job_load_fn, check_fn):
806 dfc8824a Michael Hanselmann
    if counter.next() > 0:
807 dfc8824a Michael Hanselmann
      # If this isn't the first check the job is given some more time to change
808 dfc8824a Michael Hanselmann
      # again. This gives better performance for jobs generating many
809 dfc8824a Michael Hanselmann
      # changes/messages.
810 dfc8824a Michael Hanselmann
      time.sleep(0.1)
811 dfc8824a Michael Hanselmann
812 989a8bee Michael Hanselmann
    job = job_load_fn()
813 989a8bee Michael Hanselmann
    if not job:
814 989a8bee Michael Hanselmann
      raise errors.JobLost()
815 989a8bee Michael Hanselmann
816 989a8bee Michael Hanselmann
    result = check_fn(job)
817 989a8bee Michael Hanselmann
    if result is None:
818 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
819 989a8bee Michael Hanselmann
820 989a8bee Michael Hanselmann
    return result
821 989a8bee Michael Hanselmann
822 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
823 383477e9 Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout,
824 383477e9 Michael Hanselmann
               _waiter_cls=_JobChangesWaiter):
825 989a8bee Michael Hanselmann
    """Waits for changes on a job.
826 989a8bee Michael Hanselmann

827 989a8bee Michael Hanselmann
    @type filename: string
828 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
829 989a8bee Michael Hanselmann
    @type job_load_fn: callable
830 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
831 989a8bee Michael Hanselmann
    @type fields: list of strings
832 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
833 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
834 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
835 989a8bee Michael Hanselmann
    @type prev_log_serial: int
836 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
837 989a8bee Michael Hanselmann
    @type timeout: float
838 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
839 989a8bee Michael Hanselmann

840 989a8bee Michael Hanselmann
    """
841 dfc8824a Michael Hanselmann
    counter = itertools.count()
842 6c2549d6 Guido Trotter
    try:
843 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
844 383477e9 Michael Hanselmann
      waiter = _waiter_cls(filename)
845 989a8bee Michael Hanselmann
      try:
846 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
847 dfc8824a Michael Hanselmann
                                          counter, job_load_fn, check_fn),
848 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
849 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
850 989a8bee Michael Hanselmann
      finally:
851 989a8bee Michael Hanselmann
        waiter.Close()
852 383477e9 Michael Hanselmann
    except errors.JobLost:
853 6c2549d6 Guido Trotter
      return None
854 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
855 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
856 6c2549d6 Guido Trotter
857 6c2549d6 Guido Trotter
858 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
859 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
860 6760e4ed Michael Hanselmann

861 6760e4ed Michael Hanselmann
  """
862 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
863 6760e4ed Michael Hanselmann
    to_encode = err
864 6760e4ed Michael Hanselmann
  else:
865 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
866 6760e4ed Michael Hanselmann
867 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
868 6760e4ed Michael Hanselmann
869 6760e4ed Michael Hanselmann
870 26d3fd2f Michael Hanselmann
class _TimeoutStrategyWrapper:
871 26d3fd2f Michael Hanselmann
  def __init__(self, fn):
872 26d3fd2f Michael Hanselmann
    """Initializes this class.
873 26d3fd2f Michael Hanselmann

874 26d3fd2f Michael Hanselmann
    """
875 26d3fd2f Michael Hanselmann
    self._fn = fn
876 26d3fd2f Michael Hanselmann
    self._next = None
877 26d3fd2f Michael Hanselmann
878 26d3fd2f Michael Hanselmann
  def _Advance(self):
879 26d3fd2f Michael Hanselmann
    """Gets the next timeout if necessary.
880 26d3fd2f Michael Hanselmann

881 26d3fd2f Michael Hanselmann
    """
882 26d3fd2f Michael Hanselmann
    if self._next is None:
883 26d3fd2f Michael Hanselmann
      self._next = self._fn()
884 26d3fd2f Michael Hanselmann
885 26d3fd2f Michael Hanselmann
  def Peek(self):
886 26d3fd2f Michael Hanselmann
    """Returns the next timeout.
887 26d3fd2f Michael Hanselmann

888 26d3fd2f Michael Hanselmann
    """
889 26d3fd2f Michael Hanselmann
    self._Advance()
890 26d3fd2f Michael Hanselmann
    return self._next
891 26d3fd2f Michael Hanselmann
892 26d3fd2f Michael Hanselmann
  def Next(self):
893 26d3fd2f Michael Hanselmann
    """Returns the current timeout and advances the internal state.
894 26d3fd2f Michael Hanselmann

895 26d3fd2f Michael Hanselmann
    """
896 26d3fd2f Michael Hanselmann
    self._Advance()
897 26d3fd2f Michael Hanselmann
    result = self._next
898 26d3fd2f Michael Hanselmann
    self._next = None
899 26d3fd2f Michael Hanselmann
    return result
900 26d3fd2f Michael Hanselmann
901 26d3fd2f Michael Hanselmann
902 b80cc518 Michael Hanselmann
class _OpExecContext:
903 26d3fd2f Michael Hanselmann
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
904 b80cc518 Michael Hanselmann
    """Initializes this class.
905 b80cc518 Michael Hanselmann

906 b80cc518 Michael Hanselmann
    """
907 b80cc518 Michael Hanselmann
    self.op = op
908 b80cc518 Michael Hanselmann
    self.index = index
909 b80cc518 Michael Hanselmann
    self.log_prefix = log_prefix
910 b80cc518 Michael Hanselmann
    self.summary = op.input.Summary()
911 b80cc518 Michael Hanselmann
912 b95479a5 Michael Hanselmann
    # Create local copy to modify
913 b95479a5 Michael Hanselmann
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
914 b95479a5 Michael Hanselmann
      self.jobdeps = op.input.depends[:]
915 b95479a5 Michael Hanselmann
    else:
916 b95479a5 Michael Hanselmann
      self.jobdeps = None
917 b95479a5 Michael Hanselmann
918 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = timeout_strategy_factory
919 26d3fd2f Michael Hanselmann
    self._ResetTimeoutStrategy()
920 26d3fd2f Michael Hanselmann
921 26d3fd2f Michael Hanselmann
  def _ResetTimeoutStrategy(self):
922 26d3fd2f Michael Hanselmann
    """Creates a new timeout strategy.
923 26d3fd2f Michael Hanselmann

924 26d3fd2f Michael Hanselmann
    """
925 26d3fd2f Michael Hanselmann
    self._timeout_strategy = \
926 26d3fd2f Michael Hanselmann
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
927 26d3fd2f Michael Hanselmann
928 26d3fd2f Michael Hanselmann
  def CheckPriorityIncrease(self):
929 26d3fd2f Michael Hanselmann
    """Checks whether priority can and should be increased.
930 26d3fd2f Michael Hanselmann

931 26d3fd2f Michael Hanselmann
    Called when locks couldn't be acquired.
932 26d3fd2f Michael Hanselmann

933 26d3fd2f Michael Hanselmann
    """
934 26d3fd2f Michael Hanselmann
    op = self.op
935 26d3fd2f Michael Hanselmann
936 26d3fd2f Michael Hanselmann
    # Exhausted all retries and next round should not use blocking acquire
937 26d3fd2f Michael Hanselmann
    # for locks?
938 26d3fd2f Michael Hanselmann
    if (self._timeout_strategy.Peek() is None and
939 26d3fd2f Michael Hanselmann
        op.priority > constants.OP_PRIO_HIGHEST):
940 26d3fd2f Michael Hanselmann
      logging.debug("Increasing priority")
941 26d3fd2f Michael Hanselmann
      op.priority -= 1
942 26d3fd2f Michael Hanselmann
      self._ResetTimeoutStrategy()
943 26d3fd2f Michael Hanselmann
      return True
944 26d3fd2f Michael Hanselmann
945 26d3fd2f Michael Hanselmann
    return False
946 26d3fd2f Michael Hanselmann
947 26d3fd2f Michael Hanselmann
  def GetNextLockTimeout(self):
948 26d3fd2f Michael Hanselmann
    """Returns the next lock acquire timeout.
949 26d3fd2f Michael Hanselmann

950 26d3fd2f Michael Hanselmann
    """
951 26d3fd2f Michael Hanselmann
    return self._timeout_strategy.Next()
952 26d3fd2f Michael Hanselmann
953 b80cc518 Michael Hanselmann
954 be760ba8 Michael Hanselmann
class _JobProcessor(object):
955 75d81fc8 Michael Hanselmann
  (DEFER,
956 75d81fc8 Michael Hanselmann
   WAITDEP,
957 75d81fc8 Michael Hanselmann
   FINISHED) = range(1, 4)
958 75d81fc8 Michael Hanselmann
959 26d3fd2f Michael Hanselmann
  def __init__(self, queue, opexec_fn, job,
960 26d3fd2f Michael Hanselmann
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
961 be760ba8 Michael Hanselmann
    """Initializes this class.
962 be760ba8 Michael Hanselmann

963 be760ba8 Michael Hanselmann
    """
964 be760ba8 Michael Hanselmann
    self.queue = queue
965 be760ba8 Michael Hanselmann
    self.opexec_fn = opexec_fn
966 be760ba8 Michael Hanselmann
    self.job = job
967 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = _timeout_strategy_factory
968 be760ba8 Michael Hanselmann
969 be760ba8 Michael Hanselmann
  @staticmethod
970 26d3fd2f Michael Hanselmann
  def _FindNextOpcode(job, timeout_strategy_factory):
971 be760ba8 Michael Hanselmann
    """Locates the next opcode to run.
972 be760ba8 Michael Hanselmann

973 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
974 be760ba8 Michael Hanselmann
    @param job: Job object
975 26d3fd2f Michael Hanselmann
    @param timeout_strategy_factory: Callable to create new timeout strategy
976 be760ba8 Michael Hanselmann

977 be760ba8 Michael Hanselmann
    """
978 be760ba8 Michael Hanselmann
    # Create some sort of a cache to speed up locating next opcode for future
979 be760ba8 Michael Hanselmann
    # lookups
980 be760ba8 Michael Hanselmann
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
981 be760ba8 Michael Hanselmann
    # pending and one for processed ops.
982 03b63608 Michael Hanselmann
    if job.ops_iter is None:
983 03b63608 Michael Hanselmann
      job.ops_iter = enumerate(job.ops)
984 be760ba8 Michael Hanselmann
985 be760ba8 Michael Hanselmann
    # Find next opcode to run
986 be760ba8 Michael Hanselmann
    while True:
987 be760ba8 Michael Hanselmann
      try:
988 03b63608 Michael Hanselmann
        (idx, op) = job.ops_iter.next()
989 be760ba8 Michael Hanselmann
      except StopIteration:
990 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for a finished job")
991 be760ba8 Michael Hanselmann
992 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_RUNNING:
993 be760ba8 Michael Hanselmann
        # Found an opcode already marked as running
994 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for job marked as running")
995 be760ba8 Michael Hanselmann
996 26d3fd2f Michael Hanselmann
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
997 26d3fd2f Michael Hanselmann
                             timeout_strategy_factory)
998 be760ba8 Michael Hanselmann
999 66bd7445 Michael Hanselmann
      if op.status not in constants.OPS_FINALIZED:
1000 66bd7445 Michael Hanselmann
        return opctx
1001 be760ba8 Michael Hanselmann
1002 66bd7445 Michael Hanselmann
      # This is a job that was partially completed before master daemon
1003 66bd7445 Michael Hanselmann
      # shutdown, so it can be expected that some opcodes are already
1004 66bd7445 Michael Hanselmann
      # completed successfully (if any did error out, then the whole job
1005 66bd7445 Michael Hanselmann
      # should have been aborted and not resubmitted for processing).
1006 66bd7445 Michael Hanselmann
      logging.info("%s: opcode %s already processed, skipping",
1007 66bd7445 Michael Hanselmann
                   opctx.log_prefix, opctx.summary)
1008 be760ba8 Michael Hanselmann
1009 be760ba8 Michael Hanselmann
  @staticmethod
1010 be760ba8 Michael Hanselmann
  def _MarkWaitlock(job, op):
1011 be760ba8 Michael Hanselmann
    """Marks an opcode as waiting for locks.
1012 be760ba8 Michael Hanselmann

1013 be760ba8 Michael Hanselmann
    The job's start timestamp is also set if necessary.
1014 be760ba8 Michael Hanselmann

1015 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
1016 be760ba8 Michael Hanselmann
    @param job: Job object
1017 a38e8674 Michael Hanselmann
    @type op: L{_QueuedOpCode}
1018 a38e8674 Michael Hanselmann
    @param op: Opcode object
1019 be760ba8 Michael Hanselmann

1020 be760ba8 Michael Hanselmann
    """
1021 be760ba8 Michael Hanselmann
    assert op in job.ops
1022 5fd6b694 Michael Hanselmann
    assert op.status in (constants.OP_STATUS_QUEUED,
1023 47099cd1 Michael Hanselmann
                         constants.OP_STATUS_WAITING)
1024 5fd6b694 Michael Hanselmann
1025 5fd6b694 Michael Hanselmann
    update = False
1026 be760ba8 Michael Hanselmann
1027 be760ba8 Michael Hanselmann
    op.result = None
1028 5fd6b694 Michael Hanselmann
1029 5fd6b694 Michael Hanselmann
    if op.status == constants.OP_STATUS_QUEUED:
1030 47099cd1 Michael Hanselmann
      op.status = constants.OP_STATUS_WAITING
1031 5fd6b694 Michael Hanselmann
      update = True
1032 5fd6b694 Michael Hanselmann
1033 5fd6b694 Michael Hanselmann
    if op.start_timestamp is None:
1034 5fd6b694 Michael Hanselmann
      op.start_timestamp = TimeStampNow()
1035 5fd6b694 Michael Hanselmann
      update = True
1036 be760ba8 Michael Hanselmann
1037 be760ba8 Michael Hanselmann
    if job.start_timestamp is None:
1038 be760ba8 Michael Hanselmann
      job.start_timestamp = op.start_timestamp
1039 5fd6b694 Michael Hanselmann
      update = True
1040 5fd6b694 Michael Hanselmann
1041 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1042 5fd6b694 Michael Hanselmann
1043 5fd6b694 Michael Hanselmann
    return update
1044 be760ba8 Michael Hanselmann
1045 b95479a5 Michael Hanselmann
  @staticmethod
1046 b95479a5 Michael Hanselmann
  def _CheckDependencies(queue, job, opctx):
1047 b95479a5 Michael Hanselmann
    """Checks if an opcode has dependencies and if so, processes them.
1048 b95479a5 Michael Hanselmann

1049 b95479a5 Michael Hanselmann
    @type queue: L{JobQueue}
1050 b95479a5 Michael Hanselmann
    @param queue: Queue object
1051 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1052 b95479a5 Michael Hanselmann
    @param job: Job object
1053 b95479a5 Michael Hanselmann
    @type opctx: L{_OpExecContext}
1054 b95479a5 Michael Hanselmann
    @param opctx: Opcode execution context
1055 b95479a5 Michael Hanselmann
    @rtype: bool
1056 b95479a5 Michael Hanselmann
    @return: Whether opcode will be re-scheduled by dependency tracker
1057 b95479a5 Michael Hanselmann

1058 b95479a5 Michael Hanselmann
    """
1059 b95479a5 Michael Hanselmann
    op = opctx.op
1060 b95479a5 Michael Hanselmann
1061 b95479a5 Michael Hanselmann
    result = False
1062 b95479a5 Michael Hanselmann
1063 b95479a5 Michael Hanselmann
    while opctx.jobdeps:
1064 b95479a5 Michael Hanselmann
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1065 b95479a5 Michael Hanselmann
1066 b95479a5 Michael Hanselmann
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1067 b95479a5 Michael Hanselmann
                                                          dep_status)
1068 b95479a5 Michael Hanselmann
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1069 b95479a5 Michael Hanselmann
1070 b95479a5 Michael Hanselmann
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1071 b95479a5 Michael Hanselmann
1072 b95479a5 Michael Hanselmann
      if depresult == _JobDependencyManager.CONTINUE:
1073 b95479a5 Michael Hanselmann
        # Remove dependency and continue
1074 b95479a5 Michael Hanselmann
        opctx.jobdeps.pop(0)
1075 b95479a5 Michael Hanselmann
1076 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.WAIT:
1077 b95479a5 Michael Hanselmann
        # Need to wait for notification, dependency tracker will re-add job
1078 b95479a5 Michael Hanselmann
        # to workerpool
1079 b95479a5 Michael Hanselmann
        result = True
1080 b95479a5 Michael Hanselmann
        break
1081 b95479a5 Michael Hanselmann
1082 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.CANCEL:
1083 b95479a5 Michael Hanselmann
        # Job was cancelled, cancel this job as well
1084 b95479a5 Michael Hanselmann
        job.Cancel()
1085 b95479a5 Michael Hanselmann
        assert op.status == constants.OP_STATUS_CANCELING
1086 b95479a5 Michael Hanselmann
        break
1087 b95479a5 Michael Hanselmann
1088 b95479a5 Michael Hanselmann
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1089 b95479a5 Michael Hanselmann
                         _JobDependencyManager.ERROR):
1090 b95479a5 Michael Hanselmann
        # Job failed or there was an error, this job must fail
1091 b95479a5 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1092 b95479a5 Michael Hanselmann
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1093 b95479a5 Michael Hanselmann
        break
1094 b95479a5 Michael Hanselmann
1095 b95479a5 Michael Hanselmann
      else:
1096 b95479a5 Michael Hanselmann
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1097 b95479a5 Michael Hanselmann
                                     depresult)
1098 b95479a5 Michael Hanselmann
1099 b95479a5 Michael Hanselmann
    return result
1100 b95479a5 Michael Hanselmann
1101 b80cc518 Michael Hanselmann
  def _ExecOpCodeUnlocked(self, opctx):
1102 be760ba8 Michael Hanselmann
    """Processes one opcode and returns the result.
1103 be760ba8 Michael Hanselmann

1104 be760ba8 Michael Hanselmann
    """
1105 b80cc518 Michael Hanselmann
    op = opctx.op
1106 b80cc518 Michael Hanselmann
1107 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1108 be760ba8 Michael Hanselmann
1109 26d3fd2f Michael Hanselmann
    timeout = opctx.GetNextLockTimeout()
1110 26d3fd2f Michael Hanselmann
1111 be760ba8 Michael Hanselmann
    try:
1112 be760ba8 Michael Hanselmann
      # Make sure not to hold queue lock while calling ExecOpCode
1113 be760ba8 Michael Hanselmann
      result = self.opexec_fn(op.input,
1114 26d3fd2f Michael Hanselmann
                              _OpExecCallbacks(self.queue, self.job, op),
1115 e4e59de8 Michael Hanselmann
                              timeout=timeout)
1116 26d3fd2f Michael Hanselmann
    except mcpu.LockAcquireTimeout:
1117 26d3fd2f Michael Hanselmann
      assert timeout is not None, "Received timeout for blocking acquire"
1118 26d3fd2f Michael Hanselmann
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1119 9e49dfc5 Michael Hanselmann
1120 47099cd1 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_WAITING,
1121 9e49dfc5 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1122 9e49dfc5 Michael Hanselmann
1123 9e49dfc5 Michael Hanselmann
      # Was job cancelled while we were waiting for the lock?
1124 9e49dfc5 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELING:
1125 9e49dfc5 Michael Hanselmann
        return (constants.OP_STATUS_CANCELING, None)
1126 9e49dfc5 Michael Hanselmann
1127 942e2262 Michael Hanselmann
      # Queue is shutting down, return to queued
1128 942e2262 Michael Hanselmann
      if not self.queue.AcceptingJobsUnlocked():
1129 942e2262 Michael Hanselmann
        return (constants.OP_STATUS_QUEUED, None)
1130 942e2262 Michael Hanselmann
1131 5fd6b694 Michael Hanselmann
      # Stay in waitlock while trying to re-acquire lock
1132 47099cd1 Michael Hanselmann
      return (constants.OP_STATUS_WAITING, None)
1133 be760ba8 Michael Hanselmann
    except CancelJob:
1134 b80cc518 Michael Hanselmann
      logging.exception("%s: Canceling job", opctx.log_prefix)
1135 be760ba8 Michael Hanselmann
      assert op.status == constants.OP_STATUS_CANCELING
1136 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_CANCELING, None)
1137 942e2262 Michael Hanselmann
1138 942e2262 Michael Hanselmann
    except QueueShutdown:
1139 942e2262 Michael Hanselmann
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1140 942e2262 Michael Hanselmann
1141 942e2262 Michael Hanselmann
      assert op.status == constants.OP_STATUS_WAITING
1142 942e2262 Michael Hanselmann
1143 942e2262 Michael Hanselmann
      # Job hadn't been started yet, so it should return to the queue
1144 942e2262 Michael Hanselmann
      return (constants.OP_STATUS_QUEUED, None)
1145 942e2262 Michael Hanselmann
1146 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1147 b80cc518 Michael Hanselmann
      logging.exception("%s: Caught exception in %s",
1148 b80cc518 Michael Hanselmann
                        opctx.log_prefix, opctx.summary)
1149 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1150 be760ba8 Michael Hanselmann
    else:
1151 b80cc518 Michael Hanselmann
      logging.debug("%s: %s successful",
1152 b80cc518 Michael Hanselmann
                    opctx.log_prefix, opctx.summary)
1153 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_SUCCESS, result)
1154 be760ba8 Michael Hanselmann
1155 26d3fd2f Michael Hanselmann
  def __call__(self, _nextop_fn=None):
1156 be760ba8 Michael Hanselmann
    """Continues execution of a job.
1157 be760ba8 Michael Hanselmann

1158 26d3fd2f Michael Hanselmann
    @param _nextop_fn: Callback function for tests
1159 75d81fc8 Michael Hanselmann
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1160 75d81fc8 Michael Hanselmann
      be deferred and C{WAITDEP} if the dependency manager
1161 75d81fc8 Michael Hanselmann
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1162 be760ba8 Michael Hanselmann

1163 be760ba8 Michael Hanselmann
    """
1164 be760ba8 Michael Hanselmann
    queue = self.queue
1165 be760ba8 Michael Hanselmann
    job = self.job
1166 be760ba8 Michael Hanselmann
1167 be760ba8 Michael Hanselmann
    logging.debug("Processing job %s", job.id)
1168 be760ba8 Michael Hanselmann
1169 be760ba8 Michael Hanselmann
    queue.acquire(shared=1)
1170 be760ba8 Michael Hanselmann
    try:
1171 be760ba8 Michael Hanselmann
      opcount = len(job.ops)
1172 be760ba8 Michael Hanselmann
1173 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Expected writable job"
1174 c0f6d0d8 Michael Hanselmann
1175 66bd7445 Michael Hanselmann
      # Don't do anything for finalized jobs
1176 66bd7445 Michael Hanselmann
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1177 75d81fc8 Michael Hanselmann
        return self.FINISHED
1178 66bd7445 Michael Hanselmann
1179 26d3fd2f Michael Hanselmann
      # Is a previous opcode still pending?
1180 26d3fd2f Michael Hanselmann
      if job.cur_opctx:
1181 26d3fd2f Michael Hanselmann
        opctx = job.cur_opctx
1182 5fd6b694 Michael Hanselmann
        job.cur_opctx = None
1183 26d3fd2f Michael Hanselmann
      else:
1184 26d3fd2f Michael Hanselmann
        if __debug__ and _nextop_fn:
1185 26d3fd2f Michael Hanselmann
          _nextop_fn()
1186 26d3fd2f Michael Hanselmann
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1187 26d3fd2f Michael Hanselmann
1188 b80cc518 Michael Hanselmann
      op = opctx.op
1189 be760ba8 Michael Hanselmann
1190 be760ba8 Michael Hanselmann
      # Consistency check
1191 be760ba8 Michael Hanselmann
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1192 66bd7445 Michael Hanselmann
                                     constants.OP_STATUS_CANCELING)
1193 5fd6b694 Michael Hanselmann
                        for i in job.ops[opctx.index + 1:])
1194 be760ba8 Michael Hanselmann
1195 be760ba8 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_QUEUED,
1196 47099cd1 Michael Hanselmann
                           constants.OP_STATUS_WAITING,
1197 66bd7445 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1198 be760ba8 Michael Hanselmann
1199 26d3fd2f Michael Hanselmann
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1200 26d3fd2f Michael Hanselmann
              op.priority >= constants.OP_PRIO_HIGHEST)
1201 26d3fd2f Michael Hanselmann
1202 b95479a5 Michael Hanselmann
      waitjob = None
1203 b95479a5 Michael Hanselmann
1204 66bd7445 Michael Hanselmann
      if op.status != constants.OP_STATUS_CANCELING:
1205 30c945d0 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
1206 47099cd1 Michael Hanselmann
                             constants.OP_STATUS_WAITING)
1207 30c945d0 Michael Hanselmann
1208 be760ba8 Michael Hanselmann
        # Prepare to start opcode
1209 5fd6b694 Michael Hanselmann
        if self._MarkWaitlock(job, op):
1210 5fd6b694 Michael Hanselmann
          # Write to disk
1211 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1212 be760ba8 Michael Hanselmann
1213 47099cd1 Michael Hanselmann
        assert op.status == constants.OP_STATUS_WAITING
1214 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1215 5fd6b694 Michael Hanselmann
        assert job.start_timestamp and op.start_timestamp
1216 b95479a5 Michael Hanselmann
        assert waitjob is None
1217 b95479a5 Michael Hanselmann
1218 b95479a5 Michael Hanselmann
        # Check if waiting for a job is necessary
1219 b95479a5 Michael Hanselmann
        waitjob = self._CheckDependencies(queue, job, opctx)
1220 be760ba8 Michael Hanselmann
1221 47099cd1 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_WAITING,
1222 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_CANCELING,
1223 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_ERROR)
1224 be760ba8 Michael Hanselmann
1225 b95479a5 Michael Hanselmann
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1226 b95479a5 Michael Hanselmann
                                         constants.OP_STATUS_ERROR)):
1227 b95479a5 Michael Hanselmann
          logging.info("%s: opcode %s waiting for locks",
1228 b95479a5 Michael Hanselmann
                       opctx.log_prefix, opctx.summary)
1229 be760ba8 Michael Hanselmann
1230 b95479a5 Michael Hanselmann
          assert not opctx.jobdeps, "Not all dependencies were removed"
1231 b95479a5 Michael Hanselmann
1232 b95479a5 Michael Hanselmann
          queue.release()
1233 b95479a5 Michael Hanselmann
          try:
1234 b95479a5 Michael Hanselmann
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1235 b95479a5 Michael Hanselmann
          finally:
1236 b95479a5 Michael Hanselmann
            queue.acquire(shared=1)
1237 b95479a5 Michael Hanselmann
1238 b95479a5 Michael Hanselmann
          op.status = op_status
1239 b95479a5 Michael Hanselmann
          op.result = op_result
1240 b95479a5 Michael Hanselmann
1241 b95479a5 Michael Hanselmann
          assert not waitjob
1242 be760ba8 Michael Hanselmann
1243 942e2262 Michael Hanselmann
        if op.status in (constants.OP_STATUS_WAITING,
1244 942e2262 Michael Hanselmann
                         constants.OP_STATUS_QUEUED):
1245 942e2262 Michael Hanselmann
          # waiting: Couldn't get locks in time
1246 942e2262 Michael Hanselmann
          # queued: Queue is shutting down
1247 26d3fd2f Michael Hanselmann
          assert not op.end_timestamp
1248 be760ba8 Michael Hanselmann
        else:
1249 26d3fd2f Michael Hanselmann
          # Finalize opcode
1250 26d3fd2f Michael Hanselmann
          op.end_timestamp = TimeStampNow()
1251 be760ba8 Michael Hanselmann
1252 26d3fd2f Michael Hanselmann
          if op.status == constants.OP_STATUS_CANCELING:
1253 26d3fd2f Michael Hanselmann
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1254 26d3fd2f Michael Hanselmann
                                  for i in job.ops[opctx.index:])
1255 26d3fd2f Michael Hanselmann
          else:
1256 26d3fd2f Michael Hanselmann
            assert op.status in constants.OPS_FINALIZED
1257 be760ba8 Michael Hanselmann
1258 942e2262 Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
1259 942e2262 Michael Hanselmann
        # Queue is shutting down
1260 942e2262 Michael Hanselmann
        assert not waitjob
1261 942e2262 Michael Hanselmann
1262 942e2262 Michael Hanselmann
        finalize = False
1263 942e2262 Michael Hanselmann
1264 942e2262 Michael Hanselmann
        # Reset context
1265 942e2262 Michael Hanselmann
        job.cur_opctx = None
1266 942e2262 Michael Hanselmann
1267 942e2262 Michael Hanselmann
        # In no case must the status be finalized here
1268 942e2262 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1269 942e2262 Michael Hanselmann
1270 942e2262 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1271 be760ba8 Michael Hanselmann
        finalize = False
1272 be760ba8 Michael Hanselmann
1273 b95479a5 Michael Hanselmann
        if not waitjob and opctx.CheckPriorityIncrease():
1274 5fd6b694 Michael Hanselmann
          # Priority was changed, need to update on-disk file
1275 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1276 be760ba8 Michael Hanselmann
1277 26d3fd2f Michael Hanselmann
        # Keep around for another round
1278 26d3fd2f Michael Hanselmann
        job.cur_opctx = opctx
1279 be760ba8 Michael Hanselmann
1280 26d3fd2f Michael Hanselmann
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1281 26d3fd2f Michael Hanselmann
                op.priority >= constants.OP_PRIO_HIGHEST)
1282 be760ba8 Michael Hanselmann
1283 26d3fd2f Michael Hanselmann
        # In no case must the status be finalized here
1284 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1285 be760ba8 Michael Hanselmann
1286 be760ba8 Michael Hanselmann
      else:
1287 26d3fd2f Michael Hanselmann
        # Ensure all opcodes so far have been successful
1288 26d3fd2f Michael Hanselmann
        assert (opctx.index == 0 or
1289 26d3fd2f Michael Hanselmann
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1290 26d3fd2f Michael Hanselmann
                           for i in job.ops[:opctx.index]))
1291 26d3fd2f Michael Hanselmann
1292 26d3fd2f Michael Hanselmann
        # Reset context
1293 26d3fd2f Michael Hanselmann
        job.cur_opctx = None
1294 26d3fd2f Michael Hanselmann
1295 26d3fd2f Michael Hanselmann
        if op.status == constants.OP_STATUS_SUCCESS:
1296 26d3fd2f Michael Hanselmann
          finalize = False
1297 26d3fd2f Michael Hanselmann
1298 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_ERROR:
1299 26d3fd2f Michael Hanselmann
          # Ensure failed opcode has an exception as its result
1300 26d3fd2f Michael Hanselmann
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1301 26d3fd2f Michael Hanselmann
1302 26d3fd2f Michael Hanselmann
          to_encode = errors.OpExecError("Preceding opcode failed")
1303 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1304 26d3fd2f Michael Hanselmann
                                _EncodeOpError(to_encode))
1305 26d3fd2f Michael Hanselmann
          finalize = True
1306 be760ba8 Michael Hanselmann
1307 26d3fd2f Michael Hanselmann
          # Consistency check
1308 26d3fd2f Michael Hanselmann
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1309 26d3fd2f Michael Hanselmann
                            errors.GetEncodedError(i.result)
1310 26d3fd2f Michael Hanselmann
                            for i in job.ops[opctx.index:])
1311 be760ba8 Michael Hanselmann
1312 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELING:
1313 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1314 26d3fd2f Michael Hanselmann
                                "Job canceled by request")
1315 26d3fd2f Michael Hanselmann
          finalize = True
1316 26d3fd2f Michael Hanselmann
1317 26d3fd2f Michael Hanselmann
        else:
1318 26d3fd2f Michael Hanselmann
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1319 26d3fd2f Michael Hanselmann
1320 66bd7445 Michael Hanselmann
        if opctx.index == (opcount - 1):
1321 66bd7445 Michael Hanselmann
          # Finalize on last opcode
1322 66bd7445 Michael Hanselmann
          finalize = True
1323 66bd7445 Michael Hanselmann
1324 66bd7445 Michael Hanselmann
        if finalize:
1325 26d3fd2f Michael Hanselmann
          # All opcodes have been run, finalize job
1326 66bd7445 Michael Hanselmann
          job.Finalize()
1327 26d3fd2f Michael Hanselmann
1328 26d3fd2f Michael Hanselmann
        # Write to disk. If the job status is final, this is the final write
1329 26d3fd2f Michael Hanselmann
        # allowed. Once the file has been written, it can be archived anytime.
1330 26d3fd2f Michael Hanselmann
        queue.UpdateJobUnlocked(job)
1331 be760ba8 Michael Hanselmann
1332 b95479a5 Michael Hanselmann
        assert not waitjob
1333 b95479a5 Michael Hanselmann
1334 66bd7445 Michael Hanselmann
        if finalize:
1335 26d3fd2f Michael Hanselmann
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1336 75d81fc8 Michael Hanselmann
          return self.FINISHED
1337 be760ba8 Michael Hanselmann
1338 b95479a5 Michael Hanselmann
      assert not waitjob or queue.depmgr.JobWaiting(job)
1339 b95479a5 Michael Hanselmann
1340 75d81fc8 Michael Hanselmann
      if waitjob:
1341 75d81fc8 Michael Hanselmann
        return self.WAITDEP
1342 75d81fc8 Michael Hanselmann
      else:
1343 75d81fc8 Michael Hanselmann
        return self.DEFER
1344 be760ba8 Michael Hanselmann
    finally:
1345 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Job became read-only while being processed"
1346 be760ba8 Michael Hanselmann
      queue.release()
1347 be760ba8 Michael Hanselmann
1348 be760ba8 Michael Hanselmann
1349 df5a5730 Michael Hanselmann
def _EvaluateJobProcessorResult(depmgr, job, result):
1350 df5a5730 Michael Hanselmann
  """Looks at a result from L{_JobProcessor} for a job.
1351 df5a5730 Michael Hanselmann

1352 df5a5730 Michael Hanselmann
  To be used in a L{_JobQueueWorker}.
1353 df5a5730 Michael Hanselmann

1354 df5a5730 Michael Hanselmann
  """
1355 df5a5730 Michael Hanselmann
  if result == _JobProcessor.FINISHED:
1356 df5a5730 Michael Hanselmann
    # Notify waiting jobs
1357 df5a5730 Michael Hanselmann
    depmgr.NotifyWaiters(job.id)
1358 df5a5730 Michael Hanselmann
1359 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.DEFER:
1360 df5a5730 Michael Hanselmann
    # Schedule again
1361 df5a5730 Michael Hanselmann
    raise workerpool.DeferTask(priority=job.CalcPriority())
1362 df5a5730 Michael Hanselmann
1363 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.WAITDEP:
1364 df5a5730 Michael Hanselmann
    # No-op, dependency manager will re-schedule
1365 df5a5730 Michael Hanselmann
    pass
1366 df5a5730 Michael Hanselmann
1367 df5a5730 Michael Hanselmann
  else:
1368 df5a5730 Michael Hanselmann
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1369 df5a5730 Michael Hanselmann
                                 (result, ))
1370 df5a5730 Michael Hanselmann
1371 df5a5730 Michael Hanselmann
1372 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
1373 031a3e57 Michael Hanselmann
  """The actual job workers.
1374 031a3e57 Michael Hanselmann

1375 031a3e57 Michael Hanselmann
  """
1376 b459a848 Andrea Spadaccini
  def RunTask(self, job): # pylint: disable=W0221
1377 e2715f69 Michael Hanselmann
    """Job executor.
1378 e2715f69 Michael Hanselmann

1379 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1380 ea03467c Iustin Pop
    @param job: the job to be processed
1381 ea03467c Iustin Pop

1382 e2715f69 Michael Hanselmann
    """
1383 f8a4adfa Michael Hanselmann
    assert job.writable, "Expected writable job"
1384 f8a4adfa Michael Hanselmann
1385 b95479a5 Michael Hanselmann
    # Ensure only one worker is active on a single job. If a job registers for
1386 b95479a5 Michael Hanselmann
    # a dependency job, and the other job notifies before the first worker is
1387 b95479a5 Michael Hanselmann
    # done, the job can end up in the tasklist more than once.
1388 b95479a5 Michael Hanselmann
    job.processor_lock.acquire()
1389 b95479a5 Michael Hanselmann
    try:
1390 b95479a5 Michael Hanselmann
      return self._RunTaskInner(job)
1391 b95479a5 Michael Hanselmann
    finally:
1392 b95479a5 Michael Hanselmann
      job.processor_lock.release()
1393 b95479a5 Michael Hanselmann
1394 b95479a5 Michael Hanselmann
  def _RunTaskInner(self, job):
1395 b95479a5 Michael Hanselmann
    """Executes a job.
1396 b95479a5 Michael Hanselmann

1397 b95479a5 Michael Hanselmann
    Must be called with per-job lock acquired.
1398 b95479a5 Michael Hanselmann

1399 b95479a5 Michael Hanselmann
    """
1400 be760ba8 Michael Hanselmann
    queue = job.queue
1401 be760ba8 Michael Hanselmann
    assert queue == self.pool.queue
1402 be760ba8 Michael Hanselmann
1403 0aeeb6e3 Michael Hanselmann
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1404 0aeeb6e3 Michael Hanselmann
    setname_fn(None)
1405 daba67c7 Michael Hanselmann
1406 be760ba8 Michael Hanselmann
    proc = mcpu.Processor(queue.context, job.id)
1407 be760ba8 Michael Hanselmann
1408 0aeeb6e3 Michael Hanselmann
    # Create wrapper for setting thread name
1409 0aeeb6e3 Michael Hanselmann
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1410 0aeeb6e3 Michael Hanselmann
                                    proc.ExecOpCode)
1411 0aeeb6e3 Michael Hanselmann
1412 df5a5730 Michael Hanselmann
    _EvaluateJobProcessorResult(queue.depmgr, job,
1413 df5a5730 Michael Hanselmann
                                _JobProcessor(queue, wrap_execop_fn, job)())
1414 75d81fc8 Michael Hanselmann
1415 0aeeb6e3 Michael Hanselmann
  @staticmethod
1416 0aeeb6e3 Michael Hanselmann
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1417 0aeeb6e3 Michael Hanselmann
    """Updates the worker thread name to include a short summary of the opcode.
1418 0aeeb6e3 Michael Hanselmann

1419 0aeeb6e3 Michael Hanselmann
    @param setname_fn: Callable setting worker thread name
1420 0aeeb6e3 Michael Hanselmann
    @param execop_fn: Callable for executing opcode (usually
1421 0aeeb6e3 Michael Hanselmann
                      L{mcpu.Processor.ExecOpCode})
1422 0aeeb6e3 Michael Hanselmann

1423 0aeeb6e3 Michael Hanselmann
    """
1424 0aeeb6e3 Michael Hanselmann
    setname_fn(op)
1425 0aeeb6e3 Michael Hanselmann
    try:
1426 0aeeb6e3 Michael Hanselmann
      return execop_fn(op, *args, **kwargs)
1427 0aeeb6e3 Michael Hanselmann
    finally:
1428 0aeeb6e3 Michael Hanselmann
      setname_fn(None)
1429 0aeeb6e3 Michael Hanselmann
1430 0aeeb6e3 Michael Hanselmann
  @staticmethod
1431 0aeeb6e3 Michael Hanselmann
  def _GetWorkerName(job, op):
1432 0aeeb6e3 Michael Hanselmann
    """Sets the worker thread name.
1433 0aeeb6e3 Michael Hanselmann

1434 0aeeb6e3 Michael Hanselmann
    @type job: L{_QueuedJob}
1435 0aeeb6e3 Michael Hanselmann
    @type op: L{opcodes.OpCode}
1436 0aeeb6e3 Michael Hanselmann

1437 0aeeb6e3 Michael Hanselmann
    """
1438 0aeeb6e3 Michael Hanselmann
    parts = ["Job%s" % job.id]
1439 0aeeb6e3 Michael Hanselmann
1440 0aeeb6e3 Michael Hanselmann
    if op:
1441 0aeeb6e3 Michael Hanselmann
      parts.append(op.TinySummary())
1442 0aeeb6e3 Michael Hanselmann
1443 0aeeb6e3 Michael Hanselmann
    return "/".join(parts)
1444 0aeeb6e3 Michael Hanselmann
1445 e2715f69 Michael Hanselmann
1446 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
1447 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
1448 ea03467c Iustin Pop

1449 ea03467c Iustin Pop
  """
1450 5bdce580 Michael Hanselmann
  def __init__(self, queue):
1451 0aeeb6e3 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("Jq",
1452 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
1453 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
1454 5bdce580 Michael Hanselmann
    self.queue = queue
1455 e2715f69 Michael Hanselmann
1456 e2715f69 Michael Hanselmann
1457 b95479a5 Michael Hanselmann
class _JobDependencyManager:
1458 b95479a5 Michael Hanselmann
  """Keeps track of job dependencies.
1459 b95479a5 Michael Hanselmann

1460 b95479a5 Michael Hanselmann
  """
1461 b95479a5 Michael Hanselmann
  (WAIT,
1462 b95479a5 Michael Hanselmann
   ERROR,
1463 b95479a5 Michael Hanselmann
   CANCEL,
1464 b95479a5 Michael Hanselmann
   CONTINUE,
1465 b95479a5 Michael Hanselmann
   WRONGSTATUS) = range(1, 6)
1466 b95479a5 Michael Hanselmann
1467 b95479a5 Michael Hanselmann
  def __init__(self, getstatus_fn, enqueue_fn):
1468 b95479a5 Michael Hanselmann
    """Initializes this class.
1469 b95479a5 Michael Hanselmann

1470 b95479a5 Michael Hanselmann
    """
1471 b95479a5 Michael Hanselmann
    self._getstatus_fn = getstatus_fn
1472 b95479a5 Michael Hanselmann
    self._enqueue_fn = enqueue_fn
1473 b95479a5 Michael Hanselmann
1474 b95479a5 Michael Hanselmann
    self._waiters = {}
1475 b95479a5 Michael Hanselmann
    self._lock = locking.SharedLock("JobDepMgr")
1476 b95479a5 Michael Hanselmann
1477 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1478 b459a848 Andrea Spadaccini
  def GetLockInfo(self, requested): # pylint: disable=W0613
1479 fcb21ad7 Michael Hanselmann
    """Retrieves information about waiting jobs.
1480 fcb21ad7 Michael Hanselmann

1481 fcb21ad7 Michael Hanselmann
    @type requested: set
1482 fcb21ad7 Michael Hanselmann
    @param requested: Requested information, see C{query.LQ_*}
1483 fcb21ad7 Michael Hanselmann

1484 fcb21ad7 Michael Hanselmann
    """
1485 fcb21ad7 Michael Hanselmann
    # No need to sort here, that's being done by the lock manager and query
1486 fcb21ad7 Michael Hanselmann
    # library. There are no priorities for notifying jobs, hence all show up as
1487 fcb21ad7 Michael Hanselmann
    # one item under "pending".
1488 fcb21ad7 Michael Hanselmann
    return [("job/%s" % job_id, None, None,
1489 fcb21ad7 Michael Hanselmann
             [("job", [job.id for job in waiters])])
1490 fcb21ad7 Michael Hanselmann
            for job_id, waiters in self._waiters.items()
1491 fcb21ad7 Michael Hanselmann
            if waiters]
1492 fcb21ad7 Michael Hanselmann
1493 fcb21ad7 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1494 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
1495 b95479a5 Michael Hanselmann
    """Checks if a job is waiting.
1496 b95479a5 Michael Hanselmann

1497 b95479a5 Michael Hanselmann
    """
1498 b95479a5 Michael Hanselmann
    return compat.any(job in jobs
1499 b95479a5 Michael Hanselmann
                      for jobs in self._waiters.values())
1500 b95479a5 Michael Hanselmann
1501 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1502 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1503 b95479a5 Michael Hanselmann
    """Checks if a dependency job has the requested status.
1504 b95479a5 Michael Hanselmann

1505 b95479a5 Michael Hanselmann
    If the other job is not yet in a finalized status, the calling job will be
1506 b95479a5 Michael Hanselmann
    notified (re-added to the workerpool) at a later point.
1507 b95479a5 Michael Hanselmann

1508 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1509 b95479a5 Michael Hanselmann
    @param job: Job object
1510 76b62028 Iustin Pop
    @type dep_job_id: int
1511 b95479a5 Michael Hanselmann
    @param dep_job_id: ID of dependency job
1512 b95479a5 Michael Hanselmann
    @type dep_status: list
1513 b95479a5 Michael Hanselmann
    @param dep_status: Required status
1514 b95479a5 Michael Hanselmann

1515 b95479a5 Michael Hanselmann
    """
1516 76b62028 Iustin Pop
    assert ht.TJobId(job.id)
1517 76b62028 Iustin Pop
    assert ht.TJobId(dep_job_id)
1518 b95479a5 Michael Hanselmann
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1519 b95479a5 Michael Hanselmann
1520 b95479a5 Michael Hanselmann
    if job.id == dep_job_id:
1521 b95479a5 Michael Hanselmann
      return (self.ERROR, "Job can't depend on itself")
1522 b95479a5 Michael Hanselmann
1523 b95479a5 Michael Hanselmann
    # Get status of dependency job
1524 b95479a5 Michael Hanselmann
    try:
1525 b95479a5 Michael Hanselmann
      status = self._getstatus_fn(dep_job_id)
1526 b95479a5 Michael Hanselmann
    except errors.JobLost, err:
1527 b95479a5 Michael Hanselmann
      return (self.ERROR, "Dependency error: %s" % err)
1528 b95479a5 Michael Hanselmann
1529 b95479a5 Michael Hanselmann
    assert status in constants.JOB_STATUS_ALL
1530 b95479a5 Michael Hanselmann
1531 b95479a5 Michael Hanselmann
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1532 b95479a5 Michael Hanselmann
1533 b95479a5 Michael Hanselmann
    if status not in constants.JOBS_FINALIZED:
1534 b95479a5 Michael Hanselmann
      # Register for notification and wait for job to finish
1535 b95479a5 Michael Hanselmann
      job_id_waiters.add(job)
1536 b95479a5 Michael Hanselmann
      return (self.WAIT,
1537 b95479a5 Michael Hanselmann
              "Need to wait for job %s, wanted status '%s'" %
1538 b95479a5 Michael Hanselmann
              (dep_job_id, dep_status))
1539 b95479a5 Michael Hanselmann
1540 b95479a5 Michael Hanselmann
    # Remove from waiters list
1541 b95479a5 Michael Hanselmann
    if job in job_id_waiters:
1542 b95479a5 Michael Hanselmann
      job_id_waiters.remove(job)
1543 b95479a5 Michael Hanselmann
1544 b95479a5 Michael Hanselmann
    if (status == constants.JOB_STATUS_CANCELED and
1545 b95479a5 Michael Hanselmann
        constants.JOB_STATUS_CANCELED not in dep_status):
1546 b95479a5 Michael Hanselmann
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1547 b95479a5 Michael Hanselmann
1548 b95479a5 Michael Hanselmann
    elif not dep_status or status in dep_status:
1549 b95479a5 Michael Hanselmann
      return (self.CONTINUE,
1550 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s'" %
1551 b95479a5 Michael Hanselmann
              (dep_job_id, status))
1552 b95479a5 Michael Hanselmann
1553 b95479a5 Michael Hanselmann
    else:
1554 b95479a5 Michael Hanselmann
      return (self.WRONGSTATUS,
1555 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s',"
1556 b95479a5 Michael Hanselmann
              " not one of '%s' as required" %
1557 b95479a5 Michael Hanselmann
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1558 b95479a5 Michael Hanselmann
1559 37d76f1e Michael Hanselmann
  def _RemoveEmptyWaitersUnlocked(self):
1560 37d76f1e Michael Hanselmann
    """Remove all jobs without actual waiters.
1561 37d76f1e Michael Hanselmann

1562 37d76f1e Michael Hanselmann
    """
1563 37d76f1e Michael Hanselmann
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1564 37d76f1e Michael Hanselmann
                   if not waiters]:
1565 37d76f1e Michael Hanselmann
      del self._waiters[job_id]
1566 37d76f1e Michael Hanselmann
1567 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
1568 b95479a5 Michael Hanselmann
    """Notifies all jobs waiting for a certain job ID.
1569 b95479a5 Michael Hanselmann

1570 1316ebc2 Michael Hanselmann
    @attention: Do not call until L{CheckAndRegister} returned a status other
1571 1316ebc2 Michael Hanselmann
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1572 76b62028 Iustin Pop
    @type job_id: int
1573 b95479a5 Michael Hanselmann
    @param job_id: Job ID
1574 b95479a5 Michael Hanselmann

1575 b95479a5 Michael Hanselmann
    """
1576 76b62028 Iustin Pop
    assert ht.TJobId(job_id)
1577 b95479a5 Michael Hanselmann
1578 37d76f1e Michael Hanselmann
    self._lock.acquire()
1579 37d76f1e Michael Hanselmann
    try:
1580 37d76f1e Michael Hanselmann
      self._RemoveEmptyWaitersUnlocked()
1581 37d76f1e Michael Hanselmann
1582 37d76f1e Michael Hanselmann
      jobs = self._waiters.pop(job_id, None)
1583 37d76f1e Michael Hanselmann
    finally:
1584 37d76f1e Michael Hanselmann
      self._lock.release()
1585 37d76f1e Michael Hanselmann
1586 b95479a5 Michael Hanselmann
    if jobs:
1587 b95479a5 Michael Hanselmann
      # Re-add jobs to workerpool
1588 b95479a5 Michael Hanselmann
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1589 b95479a5 Michael Hanselmann
                    len(jobs), job_id)
1590 b95479a5 Michael Hanselmann
      self._enqueue_fn(jobs)
1591 b95479a5 Michael Hanselmann
1592 b95479a5 Michael Hanselmann
1593 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
1594 6c881c52 Iustin Pop
  """Decorator for "public" functions.
1595 ea03467c Iustin Pop

1596 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
1597 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
1598 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
1599 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
1600 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
1601 ea03467c Iustin Pop

1602 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
1603 f1da30e6 Michael Hanselmann

1604 6c881c52 Iustin Pop
  Example::
1605 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
1606 6c881c52 Iustin Pop
    @_RequireOpenQueue
1607 6c881c52 Iustin Pop
    def Example(self):
1608 6c881c52 Iustin Pop
      pass
1609 db37da70 Michael Hanselmann

1610 6c881c52 Iustin Pop
  """
1611 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
1612 b459a848 Andrea Spadaccini
    # pylint: disable=W0212
1613 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
1614 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
1615 6c881c52 Iustin Pop
  return wrapper
1616 db37da70 Michael Hanselmann
1617 db37da70 Michael Hanselmann
1618 c8d0be94 Michael Hanselmann
def _RequireNonDrainedQueue(fn):
1619 c8d0be94 Michael Hanselmann
  """Decorator checking for a non-drained queue.
1620 c8d0be94 Michael Hanselmann

1621 c8d0be94 Michael Hanselmann
  To be used with functions submitting new jobs.
1622 c8d0be94 Michael Hanselmann

1623 c8d0be94 Michael Hanselmann
  """
1624 c8d0be94 Michael Hanselmann
  def wrapper(self, *args, **kwargs):
1625 c8d0be94 Michael Hanselmann
    """Wrapper function.
1626 c8d0be94 Michael Hanselmann

1627 c8d0be94 Michael Hanselmann
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1628 c8d0be94 Michael Hanselmann

1629 c8d0be94 Michael Hanselmann
    """
1630 c8d0be94 Michael Hanselmann
    # Ok when sharing the big job queue lock, as the drain file is created when
1631 c8d0be94 Michael Hanselmann
    # the lock is exclusive.
1632 c8d0be94 Michael Hanselmann
    # Needs access to protected member, pylint: disable=W0212
1633 c8d0be94 Michael Hanselmann
    if self._drained:
1634 c8d0be94 Michael Hanselmann
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1635 6d5ea385 Michael Hanselmann
1636 6d5ea385 Michael Hanselmann
    if not self._accepting_jobs:
1637 6d5ea385 Michael Hanselmann
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1638 6d5ea385 Michael Hanselmann
1639 c8d0be94 Michael Hanselmann
    return fn(self, *args, **kwargs)
1640 c8d0be94 Michael Hanselmann
  return wrapper
1641 c8d0be94 Michael Hanselmann
1642 c8d0be94 Michael Hanselmann
1643 6c881c52 Iustin Pop
class JobQueue(object):
1644 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
1645 db37da70 Michael Hanselmann

1646 6c881c52 Iustin Pop
  """
1647 85f03e0d Michael Hanselmann
  def __init__(self, context):
1648 ea03467c Iustin Pop
    """Constructor for JobQueue.
1649 ea03467c Iustin Pop

1650 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
1651 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
1652 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
1653 ea03467c Iustin Pop
    running).
1654 ea03467c Iustin Pop

1655 ea03467c Iustin Pop
    @type context: GanetiContext
1656 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
1657 ea03467c Iustin Pop
        data and other ganeti objects
1658 ea03467c Iustin Pop

1659 ea03467c Iustin Pop
    """
1660 5bdce580 Michael Hanselmann
    self.context = context
1661 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
1662 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
1663 f1da30e6 Michael Hanselmann
1664 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
1665 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
1666 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
1667 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
1668 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
1669 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
1670 ebb80afa Guido Trotter
1671 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
1672 ebb80afa Guido Trotter
    self.release = self._lock.release
1673 85f03e0d Michael Hanselmann
1674 6d5ea385 Michael Hanselmann
    # Accept jobs by default
1675 6d5ea385 Michael Hanselmann
    self._accepting_jobs = True
1676 6d5ea385 Michael Hanselmann
1677 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
1678 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
1679 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1680 f1da30e6 Michael Hanselmann
1681 04ab05ce Michael Hanselmann
    # Read serial file
1682 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
1683 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
1684 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
1685 c4beba1c Iustin Pop
1686 23752136 Michael Hanselmann
    # Get initial list of nodes
1687 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
1688 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
1689 59303563 Iustin Pop
                       if n.master_candidate)
1690 8e00939c Michael Hanselmann
1691 8e00939c Michael Hanselmann
    # Remove master node
1692 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
1693 23752136 Michael Hanselmann
1694 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
1695 23752136 Michael Hanselmann
1696 6d5ea385 Michael Hanselmann
    self._queue_size = None
1697 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1698 6d5ea385 Michael Hanselmann
    assert ht.TInt(self._queue_size)
1699 ff699aa9 Michael Hanselmann
    self._drained = jstore.CheckDrainFlag()
1700 20571a26 Guido Trotter
1701 b95479a5 Michael Hanselmann
    # Job dependencies
1702 b95479a5 Michael Hanselmann
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1703 b95479a5 Michael Hanselmann
                                        self._EnqueueJobs)
1704 fcb21ad7 Michael Hanselmann
    self.context.glm.AddToLockMonitor(self.depmgr)
1705 b95479a5 Michael Hanselmann
1706 85f03e0d Michael Hanselmann
    # Setup worker pool
1707 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
1708 85f03e0d Michael Hanselmann
    try:
1709 de9d02c7 Michael Hanselmann
      self._InspectQueue()
1710 de9d02c7 Michael Hanselmann
    except:
1711 de9d02c7 Michael Hanselmann
      self._wpool.TerminateWorkers()
1712 de9d02c7 Michael Hanselmann
      raise
1713 711b5124 Michael Hanselmann
1714 de9d02c7 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1715 de9d02c7 Michael Hanselmann
  @_RequireOpenQueue
1716 de9d02c7 Michael Hanselmann
  def _InspectQueue(self):
1717 de9d02c7 Michael Hanselmann
    """Loads the whole job queue and resumes unfinished jobs.
1718 de9d02c7 Michael Hanselmann

1719 de9d02c7 Michael Hanselmann
    This function needs the lock here because WorkerPool.AddTask() may start a
1720 de9d02c7 Michael Hanselmann
    job while we're still doing our work.
1721 711b5124 Michael Hanselmann

1722 de9d02c7 Michael Hanselmann
    """
1723 de9d02c7 Michael Hanselmann
    logging.info("Inspecting job queue")
1724 de9d02c7 Michael Hanselmann
1725 7b5c4a69 Michael Hanselmann
    restartjobs = []
1726 7b5c4a69 Michael Hanselmann
1727 de9d02c7 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked()
1728 de9d02c7 Michael Hanselmann
    jobs_count = len(all_job_ids)
1729 de9d02c7 Michael Hanselmann
    lastinfo = time.time()
1730 de9d02c7 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1731 de9d02c7 Michael Hanselmann
      # Give an update every 1000 jobs or 10 seconds
1732 de9d02c7 Michael Hanselmann
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1733 de9d02c7 Michael Hanselmann
          idx == (jobs_count - 1)):
1734 de9d02c7 Michael Hanselmann
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1735 de9d02c7 Michael Hanselmann
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1736 711b5124 Michael Hanselmann
        lastinfo = time.time()
1737 94ed59a5 Iustin Pop
1738 de9d02c7 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1739 85f03e0d Michael Hanselmann
1740 de9d02c7 Michael Hanselmann
      # a failure in loading the job can cause 'None' to be returned
1741 de9d02c7 Michael Hanselmann
      if job is None:
1742 de9d02c7 Michael Hanselmann
        continue
1743 85f03e0d Michael Hanselmann
1744 de9d02c7 Michael Hanselmann
      status = job.CalcStatus()
1745 711b5124 Michael Hanselmann
1746 320d1daf Michael Hanselmann
      if status == constants.JOB_STATUS_QUEUED:
1747 7b5c4a69 Michael Hanselmann
        restartjobs.append(job)
1748 de9d02c7 Michael Hanselmann
1749 de9d02c7 Michael Hanselmann
      elif status in (constants.JOB_STATUS_RUNNING,
1750 47099cd1 Michael Hanselmann
                      constants.JOB_STATUS_WAITING,
1751 de9d02c7 Michael Hanselmann
                      constants.JOB_STATUS_CANCELING):
1752 de9d02c7 Michael Hanselmann
        logging.warning("Unfinished job %s found: %s", job.id, job)
1753 320d1daf Michael Hanselmann
1754 47099cd1 Michael Hanselmann
        if status == constants.JOB_STATUS_WAITING:
1755 320d1daf Michael Hanselmann
          # Restart job
1756 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1757 320d1daf Michael Hanselmann
          restartjobs.append(job)
1758 320d1daf Michael Hanselmann
        else:
1759 9cbcb1be Hrvoje Ribicic
          to_encode = errors.OpExecError("Unclean master daemon shutdown")
1760 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1761 9cbcb1be Hrvoje Ribicic
                                _EncodeOpError(to_encode))
1762 45df0793 Michael Hanselmann
          job.Finalize()
1763 320d1daf Michael Hanselmann
1764 de9d02c7 Michael Hanselmann
        self.UpdateJobUnlocked(job)
1765 de9d02c7 Michael Hanselmann
1766 7b5c4a69 Michael Hanselmann
    if restartjobs:
1767 7b5c4a69 Michael Hanselmann
      logging.info("Restarting %s jobs", len(restartjobs))
1768 75d81fc8 Michael Hanselmann
      self._EnqueueJobsUnlocked(restartjobs)
1769 7b5c4a69 Michael Hanselmann
1770 de9d02c7 Michael Hanselmann
    logging.info("Job queue inspection finished")
1771 85f03e0d Michael Hanselmann
1772 fb1ffbca Michael Hanselmann
  def _GetRpc(self, address_list):
1773 fb1ffbca Michael Hanselmann
    """Gets RPC runner with context.
1774 fb1ffbca Michael Hanselmann

1775 fb1ffbca Michael Hanselmann
    """
1776 fb1ffbca Michael Hanselmann
    return rpc.JobQueueRunner(self.context, address_list)
1777 fb1ffbca Michael Hanselmann
1778 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1779 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1780 99aabbed Iustin Pop
  def AddNode(self, node):
1781 99aabbed Iustin Pop
    """Register a new node with the queue.
1782 99aabbed Iustin Pop

1783 99aabbed Iustin Pop
    @type node: L{objects.Node}
1784 99aabbed Iustin Pop
    @param node: the node object to be added
1785 99aabbed Iustin Pop

1786 99aabbed Iustin Pop
    """
1787 99aabbed Iustin Pop
    node_name = node.name
1788 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1789 23752136 Michael Hanselmann
1790 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1791 fb1ffbca Michael Hanselmann
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1792 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1793 c8457ce7 Iustin Pop
    if msg:
1794 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1795 c8457ce7 Iustin Pop
                      node_name, msg)
1796 23752136 Michael Hanselmann
1797 59303563 Iustin Pop
    if not node.master_candidate:
1798 59303563 Iustin Pop
      # remove if existing, ignoring errors
1799 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1800 59303563 Iustin Pop
      # and skip the replication of the job ids
1801 59303563 Iustin Pop
      return
1802 59303563 Iustin Pop
1803 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1804 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1805 23752136 Michael Hanselmann
1806 d2e03a33 Michael Hanselmann
    # Upload current serial file
1807 e2b4a7ba Michael Hanselmann
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1808 d2e03a33 Michael Hanselmann
1809 fb1ffbca Michael Hanselmann
    # Static address list
1810 fb1ffbca Michael Hanselmann
    addrs = [node.primary_ip]
1811 fb1ffbca Michael Hanselmann
1812 d2e03a33 Michael Hanselmann
    for file_name in files:
1813 9f774ee8 Michael Hanselmann
      # Read file content
1814 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1815 9f774ee8 Michael Hanselmann
1816 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1817 cffbbae7 Michael Hanselmann
                             file_name, content)
1818 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1819 c8457ce7 Iustin Pop
      if msg:
1820 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1821 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1822 d2e03a33 Michael Hanselmann
1823 be6c403e Michael Hanselmann
    # Set queue drained flag
1824 be6c403e Michael Hanselmann
    result = \
1825 be6c403e Michael Hanselmann
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1826 be6c403e Michael Hanselmann
                                                       self._drained)
1827 be6c403e Michael Hanselmann
    msg = result[node_name].fail_msg
1828 be6c403e Michael Hanselmann
    if msg:
1829 be6c403e Michael Hanselmann
      logging.error("Failed to set queue drained flag on node %s: %s",
1830 be6c403e Michael Hanselmann
                    node_name, msg)
1831 be6c403e Michael Hanselmann
1832 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1833 d2e03a33 Michael Hanselmann
1834 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1835 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1836 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1837 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1838 ea03467c Iustin Pop

1839 ea03467c Iustin Pop
    @type node_name: str
1840 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1841 ea03467c Iustin Pop

1842 ea03467c Iustin Pop
    """
1843 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1844 23752136 Michael Hanselmann
1845 7e950d31 Iustin Pop
  @staticmethod
1846 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1847 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1848 ea03467c Iustin Pop

1849 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1850 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1851 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1852 ea03467c Iustin Pop

1853 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1854 ea03467c Iustin Pop
    @type nodes: list
1855 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1856 ea03467c Iustin Pop
    @type failmsg: str
1857 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1858 ea03467c Iustin Pop

1859 ea03467c Iustin Pop
    """
1860 e74798c1 Michael Hanselmann
    failed = []
1861 e74798c1 Michael Hanselmann
    success = []
1862 e74798c1 Michael Hanselmann
1863 e74798c1 Michael Hanselmann
    for node in nodes:
1864 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1865 c8457ce7 Iustin Pop
      if msg:
1866 e74798c1 Michael Hanselmann
        failed.append(node)
1867 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1868 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1869 c8457ce7 Iustin Pop
      else:
1870 c8457ce7 Iustin Pop
        success.append(node)
1871 e74798c1 Michael Hanselmann
1872 e74798c1 Michael Hanselmann
    # +1 for the master node
1873 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1874 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1875 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1876 e74798c1 Michael Hanselmann
1877 99aabbed Iustin Pop
  def _GetNodeIp(self):
1878 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1879 99aabbed Iustin Pop

1880 ea03467c Iustin Pop
    @rtype: (list, list)
1881 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1882 ea03467c Iustin Pop
        names and the second one with the node addresses
1883 ea03467c Iustin Pop

1884 99aabbed Iustin Pop
    """
1885 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1886 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1887 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1888 99aabbed Iustin Pop
    return name_list, addr_list
1889 99aabbed Iustin Pop
1890 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1891 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1892 8e00939c Michael Hanselmann

1893 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1894 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1895 ea03467c Iustin Pop

1896 ea03467c Iustin Pop
    @type file_name: str
1897 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1898 ea03467c Iustin Pop
    @type data: str
1899 ea03467c Iustin Pop
    @param data: the new contents of the file
1900 4c36bdf5 Guido Trotter
    @type replicate: boolean
1901 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1902 ea03467c Iustin Pop

1903 8e00939c Michael Hanselmann
    """
1904 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1905 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1906 fe05a931 Michele Tartara
                    gid=getents.daemons_gid,
1907 fe05a931 Michele Tartara
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1908 8e00939c Michael Hanselmann
1909 4c36bdf5 Guido Trotter
    if replicate:
1910 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1911 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1912 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1913 23752136 Michael Hanselmann
1914 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1915 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1916 ea03467c Iustin Pop

1917 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1918 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1919 ea03467c Iustin Pop

1920 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1921 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1922 ea03467c Iustin Pop

1923 ea03467c Iustin Pop
    """
1924 dd875d32 Michael Hanselmann
    # Rename them locally
1925 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1926 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1927 abc1f2ce Michael Hanselmann
1928 dd875d32 Michael Hanselmann
    # ... and on all nodes
1929 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1930 fb1ffbca Michael Hanselmann
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1931 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1932 abc1f2ce Michael Hanselmann
1933 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1934 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1935 f1da30e6 Michael Hanselmann

1936 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1937 f1da30e6 Michael Hanselmann

1938 009e73d0 Iustin Pop
    @type count: integer
1939 009e73d0 Iustin Pop
    @param count: how many serials to return
1940 76b62028 Iustin Pop
    @rtype: list of int
1941 76b62028 Iustin Pop
    @return: a list of job identifiers.
1942 f1da30e6 Michael Hanselmann

1943 f1da30e6 Michael Hanselmann
    """
1944 2c9fa1ff Iustin Pop
    assert ht.TNonNegativeInt(count)
1945 719f8fba Michael Hanselmann
1946 f1da30e6 Michael Hanselmann
    # New number
1947 009e73d0 Iustin Pop
    serial = self._last_serial + count
1948 f1da30e6 Michael Hanselmann
1949 f1da30e6 Michael Hanselmann
    # Write to file
1950 e2b4a7ba Michael Hanselmann
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1951 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1952 f1da30e6 Michael Hanselmann
1953 1410a389 Michael Hanselmann
    result = [jstore.FormatJobID(v)
1954 3c88bf36 Michael Hanselmann
              for v in range(self._last_serial + 1, serial + 1)]
1955 3c88bf36 Michael Hanselmann
1956 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1957 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1958 f1da30e6 Michael Hanselmann
1959 3c88bf36 Michael Hanselmann
    assert len(result) == count
1960 3c88bf36 Michael Hanselmann
1961 009e73d0 Iustin Pop
    return result
1962 f1da30e6 Michael Hanselmann
1963 85f03e0d Michael Hanselmann
  @staticmethod
1964 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1965 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1966 ea03467c Iustin Pop

1967 ea03467c Iustin Pop
    @type job_id: str
1968 ea03467c Iustin Pop
    @param job_id: the job identifier
1969 ea03467c Iustin Pop
    @rtype: str
1970 ea03467c Iustin Pop
    @return: the path to the job file
1971 ea03467c Iustin Pop

1972 ea03467c Iustin Pop
    """
1973 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1974 f1da30e6 Michael Hanselmann
1975 1410a389 Michael Hanselmann
  @staticmethod
1976 1410a389 Michael Hanselmann
  def _GetArchivedJobPath(job_id):
1977 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1978 ea03467c Iustin Pop

1979 ea03467c Iustin Pop
    @type job_id: str
1980 ea03467c Iustin Pop
    @param job_id: the job identifier
1981 ea03467c Iustin Pop
    @rtype: str
1982 ea03467c Iustin Pop
    @return: the path to the archived job file
1983 ea03467c Iustin Pop

1984 ea03467c Iustin Pop
    """
1985 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1986 1410a389 Michael Hanselmann
                          jstore.GetArchiveDirectory(job_id),
1987 1410a389 Michael Hanselmann
                          "job-%s" % job_id)
1988 0cb94105 Michael Hanselmann
1989 cb66225d Michael Hanselmann
  @staticmethod
1990 0422250e Michael Hanselmann
  def _DetermineJobDirectories(archived):
1991 bb921668 Michael Hanselmann
    """Build list of directories containing job files.
1992 bb921668 Michael Hanselmann

1993 bb921668 Michael Hanselmann
    @type archived: bool
1994 bb921668 Michael Hanselmann
    @param archived: Whether to include directories for archived jobs
1995 bb921668 Michael Hanselmann
    @rtype: list
1996 bb921668 Michael Hanselmann

1997 bb921668 Michael Hanselmann
    """
1998 0422250e Michael Hanselmann
    result = [pathutils.QUEUE_DIR]
1999 0422250e Michael Hanselmann
2000 0422250e Michael Hanselmann
    if archived:
2001 0422250e Michael Hanselmann
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
2002 0422250e Michael Hanselmann
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
2003 0422250e Michael Hanselmann
                        utils.ListVisibleFiles(archive_path)))
2004 0422250e Michael Hanselmann
2005 0422250e Michael Hanselmann
    return result
2006 0422250e Michael Hanselmann
2007 0422250e Michael Hanselmann
  @classmethod
2008 0422250e Michael Hanselmann
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
2009 911a495b Iustin Pop
    """Return all known job IDs.
2010 911a495b Iustin Pop

2011 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
2012 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
2013 ac0930b9 Iustin Pop
    extra IDs).
2014 ac0930b9 Iustin Pop

2015 85a1c57d Guido Trotter
    @type sort: boolean
2016 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
2017 ea03467c Iustin Pop
    @rtype: list
2018 ea03467c Iustin Pop
    @return: the list of job IDs
2019 ea03467c Iustin Pop

2020 911a495b Iustin Pop
    """
2021 85a1c57d Guido Trotter
    jlist = []
2022 0422250e Michael Hanselmann
2023 0422250e Michael Hanselmann
    for path in cls._DetermineJobDirectories(archived):
2024 0422250e Michael Hanselmann
      for filename in utils.ListVisibleFiles(path):
2025 0422250e Michael Hanselmann
        m = constants.JOB_FILE_RE.match(filename)
2026 0422250e Michael Hanselmann
        if m:
2027 0422250e Michael Hanselmann
          jlist.append(int(m.group(1)))
2028 0422250e Michael Hanselmann
2029 85a1c57d Guido Trotter
    if sort:
2030 76b62028 Iustin Pop
      jlist.sort()
2031 f0d874fe Iustin Pop
    return jlist
2032 911a495b Iustin Pop
2033 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
2034 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
2035 ea03467c Iustin Pop

2036 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
2037 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
2038 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
2039 ea03467c Iustin Pop

2040 76b62028 Iustin Pop
    @type job_id: int
2041 ea03467c Iustin Pop
    @param job_id: the job id
2042 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
2043 ea03467c Iustin Pop
    @return: either None or the job object
2044 ea03467c Iustin Pop

2045 ea03467c Iustin Pop
    """
2046 95a4e33f Hrvoje Ribicic
    assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
2047 95a4e33f Hrvoje Ribicic
2048 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
2049 5685c1a5 Michael Hanselmann
    if job:
2050 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
2051 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Found read-only job in memcache"
2052 5685c1a5 Michael Hanselmann
      return job
2053 ac0930b9 Iustin Pop
2054 3d6c5566 Guido Trotter
    try:
2055 194c8ca4 Michael Hanselmann
      job = self._LoadJobFromDisk(job_id, False)
2056 aa9f8167 Iustin Pop
      if job is None:
2057 aa9f8167 Iustin Pop
        return job
2058 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
2059 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
2060 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
2061 3d6c5566 Guido Trotter
      if old_path == new_path:
2062 3d6c5566 Guido Trotter
        # job already archived (future case)
2063 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
2064 3d6c5566 Guido Trotter
      else:
2065 3d6c5566 Guido Trotter
        # non-archived case
2066 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
2067 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
2068 3d6c5566 Guido Trotter
      return None
2069 162c8636 Guido Trotter
2070 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Job just loaded is not writable"
2071 c0f6d0d8 Michael Hanselmann
2072 162c8636 Guido Trotter
    self._memcache[job_id] = job
2073 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
2074 162c8636 Guido Trotter
    return job
2075 162c8636 Guido Trotter
2076 c0f6d0d8 Michael Hanselmann
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2077 162c8636 Guido Trotter
    """Load the given job file from disk.
2078 162c8636 Guido Trotter

2079 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2080 162c8636 Guido Trotter

2081 76b62028 Iustin Pop
    @type job_id: int
2082 162c8636 Guido Trotter
    @param job_id: job identifier
2083 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2084 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2085 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
2086 162c8636 Guido Trotter
    @return: either None or the job object
2087 162c8636 Guido Trotter

2088 162c8636 Guido Trotter
    """
2089 8a3cd185 Michael Hanselmann
    path_functions = [(self._GetJobPath, False)]
2090 194c8ca4 Michael Hanselmann
2091 194c8ca4 Michael Hanselmann
    if try_archived:
2092 8a3cd185 Michael Hanselmann
      path_functions.append((self._GetArchivedJobPath, True))
2093 194c8ca4 Michael Hanselmann
2094 194c8ca4 Michael Hanselmann
    raw_data = None
2095 8a3cd185 Michael Hanselmann
    archived = None
2096 194c8ca4 Michael Hanselmann
2097 8a3cd185 Michael Hanselmann
    for (fn, archived) in path_functions:
2098 194c8ca4 Michael Hanselmann
      filepath = fn(job_id)
2099 194c8ca4 Michael Hanselmann
      logging.debug("Loading job from %s", filepath)
2100 194c8ca4 Michael Hanselmann
      try:
2101 194c8ca4 Michael Hanselmann
        raw_data = utils.ReadFile(filepath)
2102 194c8ca4 Michael Hanselmann
      except EnvironmentError, err:
2103 194c8ca4 Michael Hanselmann
        if err.errno != errno.ENOENT:
2104 194c8ca4 Michael Hanselmann
          raise
2105 194c8ca4 Michael Hanselmann
      else:
2106 194c8ca4 Michael Hanselmann
        break
2107 194c8ca4 Michael Hanselmann
2108 194c8ca4 Michael Hanselmann
    if not raw_data:
2109 194c8ca4 Michael Hanselmann
      return None
2110 13998ef2 Michael Hanselmann
2111 c0f6d0d8 Michael Hanselmann
    if writable is None:
2112 8a3cd185 Michael Hanselmann
      writable = not archived
2113 c0f6d0d8 Michael Hanselmann
2114 94ed59a5 Iustin Pop
    try:
2115 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
2116 8a3cd185 Michael Hanselmann
      job = _QueuedJob.Restore(self, data, writable, archived)
2117 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
2118 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
2119 94ed59a5 Iustin Pop
2120 ac0930b9 Iustin Pop
    return job
2121 f1da30e6 Michael Hanselmann
2122 c0f6d0d8 Michael Hanselmann
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2123 0f9c08dc Guido Trotter
    """Load the given job file from disk.
2124 0f9c08dc Guido Trotter

2125 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2126 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
2127 0f9c08dc Guido Trotter
    exception is logged.
2128 0f9c08dc Guido Trotter

2129 76b62028 Iustin Pop
    @type job_id: int
2130 0f9c08dc Guido Trotter
    @param job_id: job identifier
2131 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2132 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2133 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
2134 0f9c08dc Guido Trotter
    @return: either None or the job object
2135 0f9c08dc Guido Trotter

2136 0f9c08dc Guido Trotter
    """
2137 0f9c08dc Guido Trotter
    try:
2138 c0f6d0d8 Michael Hanselmann
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2139 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
2140 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
2141 0f9c08dc Guido Trotter
      return None
2142 0f9c08dc Guido Trotter
2143 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
2144 20571a26 Guido Trotter
    """Update the queue size.
2145 20571a26 Guido Trotter

2146 20571a26 Guido Trotter
    """
2147 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2148 20571a26 Guido Trotter
2149 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2150 20571a26 Guido Trotter
  @_RequireOpenQueue
2151 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
2152 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
2153 3ccafd0e Iustin Pop

2154 ea03467c Iustin Pop
    @type drain_flag: boolean
2155 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
2156 ea03467c Iustin Pop

2157 3ccafd0e Iustin Pop
    """
2158 be6c403e Michael Hanselmann
    # Change flag locally
2159 ff699aa9 Michael Hanselmann
    jstore.SetDrainFlag(drain_flag)
2160 20571a26 Guido Trotter
2161 20571a26 Guido Trotter
    self._drained = drain_flag
2162 20571a26 Guido Trotter
2163 be6c403e Michael Hanselmann
    # ... and on all nodes
2164 be6c403e Michael Hanselmann
    (names, addrs) = self._GetNodeIp()
2165 be6c403e Michael Hanselmann
    result = \
2166 be6c403e Michael Hanselmann
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2167 be6c403e Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
2168 be6c403e Michael Hanselmann
                         "Setting queue drain flag to %s" % drain_flag)
2169 be6c403e Michael Hanselmann
2170 3ccafd0e Iustin Pop
    return True
2171 3ccafd0e Iustin Pop
2172 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2173 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
2174 85f03e0d Michael Hanselmann
    """Create and store a new job.
2175 f1da30e6 Michael Hanselmann

2176 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
2177 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
2178 c3f0a12f Iustin Pop

2179 009e73d0 Iustin Pop
    @type job_id: job ID
2180 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
2181 c3f0a12f Iustin Pop
    @type ops: list
2182 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
2183 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
2184 7beb1e53 Guido Trotter
    @return: the job object to be queued
2185 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2186 e71c8147 Michael Hanselmann
    @raise errors.GenericError: If an opcode is not valid
2187 c3f0a12f Iustin Pop

2188 c3f0a12f Iustin Pop
    """
2189 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2190 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
2191 f87b405e Michael Hanselmann
2192 c0f6d0d8 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops, True)
2193 f1da30e6 Michael Hanselmann
2194 e71c8147 Michael Hanselmann
    for idx, op in enumerate(job.ops):
2195 42d49574 Michael Hanselmann
      # Check priority
2196 e71c8147 Michael Hanselmann
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2197 e71c8147 Michael Hanselmann
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2198 e71c8147 Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2199 e71c8147 Michael Hanselmann
                                  " are %s" % (idx, op.priority, allowed))
2200 e71c8147 Michael Hanselmann
2201 42d49574 Michael Hanselmann
      # Check job dependencies
2202 b247c6fc Michael Hanselmann
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2203 b247c6fc Michael Hanselmann
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2204 b247c6fc Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2205 b247c6fc Michael Hanselmann
                                  " match %s: %s" %
2206 b247c6fc Michael Hanselmann
                                  (idx, opcodes.TNoRelativeJobDependencies,
2207 b247c6fc Michael Hanselmann
                                   dependencies))
2208 b247c6fc Michael Hanselmann
2209 f1da30e6 Michael Hanselmann
    # Write to disk
2210 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
2211 f1da30e6 Michael Hanselmann
2212 20571a26 Guido Trotter
    self._queue_size += 1
2213 20571a26 Guido Trotter
2214 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
2215 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
2216 ac0930b9 Iustin Pop
2217 7beb1e53 Guido Trotter
    return job
2218 f1da30e6 Michael Hanselmann
2219 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2220 2971c913 Iustin Pop
  @_RequireOpenQueue
2221 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2222 2971c913 Iustin Pop
  def SubmitJob(self, ops):
2223 2971c913 Iustin Pop
    """Create and store a new job.
2224 2971c913 Iustin Pop

2225 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2226 2971c913 Iustin Pop

2227 2971c913 Iustin Pop
    """
2228 b247c6fc Michael Hanselmann
    (job_id, ) = self._NewSerialsUnlocked(1)
2229 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2230 7beb1e53 Guido Trotter
    return job_id
2231 2971c913 Iustin Pop
2232 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2233 2971c913 Iustin Pop
  @_RequireOpenQueue
2234 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2235 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
2236 2971c913 Iustin Pop
    """Create and store multiple jobs.
2237 2971c913 Iustin Pop

2238 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2239 2971c913 Iustin Pop

2240 2971c913 Iustin Pop
    """
2241 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2242 b247c6fc Michael Hanselmann
2243 b247c6fc Michael Hanselmann
    (results, added_jobs) = \
2244 b247c6fc Michael Hanselmann
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2245 7b5c4a69 Michael Hanselmann
2246 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked(added_jobs)
2247 2971c913 Iustin Pop
2248 2971c913 Iustin Pop
    return results
2249 2971c913 Iustin Pop
2250 b247c6fc Michael Hanselmann
  @staticmethod
2251 b247c6fc Michael Hanselmann
  def _FormatSubmitError(msg, ops):
2252 b247c6fc Michael Hanselmann
    """Formats errors which occurred while submitting a job.
2253 b247c6fc Michael Hanselmann

2254 b247c6fc Michael Hanselmann
    """
2255 b247c6fc Michael Hanselmann
    return ("%s; opcodes %s" %
2256 b247c6fc Michael Hanselmann
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2257 b247c6fc Michael Hanselmann
2258 b247c6fc Michael Hanselmann
  @staticmethod
2259 b247c6fc Michael Hanselmann
  def _ResolveJobDependencies(resolve_fn, deps):
2260 b247c6fc Michael Hanselmann
    """Resolves relative job IDs in dependencies.
2261 b247c6fc Michael Hanselmann

2262 b247c6fc Michael Hanselmann
    @type resolve_fn: callable
2263 b247c6fc Michael Hanselmann
    @param resolve_fn: Function to resolve a relative job ID
2264 b247c6fc Michael Hanselmann
    @type deps: list
2265 b247c6fc Michael Hanselmann
    @param deps: Dependencies
2266 4c27b231 Michael Hanselmann
    @rtype: tuple; (boolean, string or list)
2267 4c27b231 Michael Hanselmann
    @return: If successful (first tuple item), the returned list contains
2268 4c27b231 Michael Hanselmann
      resolved job IDs along with the requested status; if not successful,
2269 4c27b231 Michael Hanselmann
      the second element is an error message
2270 b247c6fc Michael Hanselmann

2271 b247c6fc Michael Hanselmann
    """
2272 b247c6fc Michael Hanselmann
    result = []
2273 b247c6fc Michael Hanselmann
2274 b247c6fc Michael Hanselmann
    for (dep_job_id, dep_status) in deps:
2275 b247c6fc Michael Hanselmann
      if ht.TRelativeJobId(dep_job_id):
2276 b247c6fc Michael Hanselmann
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2277 b247c6fc Michael Hanselmann
        try:
2278 b247c6fc Michael Hanselmann
          job_id = resolve_fn(dep_job_id)
2279 b247c6fc Michael Hanselmann
        except IndexError:
2280 b247c6fc Michael Hanselmann
          # Abort
2281 b247c6fc Michael Hanselmann
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2282 b247c6fc Michael Hanselmann
      else:
2283 b247c6fc Michael Hanselmann
        job_id = dep_job_id
2284 b247c6fc Michael Hanselmann
2285 b247c6fc Michael Hanselmann
      result.append((job_id, dep_status))
2286 b247c6fc Michael Hanselmann
2287 b247c6fc Michael Hanselmann
    return (True, result)
2288 b247c6fc Michael Hanselmann
2289 b247c6fc Michael Hanselmann
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2290 b247c6fc Michael Hanselmann
    """Create and store multiple jobs.
2291 b247c6fc Michael Hanselmann

2292 b247c6fc Michael Hanselmann
    @see: L{_SubmitJobUnlocked}
2293 b247c6fc Michael Hanselmann

2294 b247c6fc Michael Hanselmann
    """
2295 b247c6fc Michael Hanselmann
    results = []
2296 b247c6fc Michael Hanselmann
    added_jobs = []
2297 b247c6fc Michael Hanselmann
2298 b247c6fc Michael Hanselmann
    def resolve_fn(job_idx, reljobid):
2299 b247c6fc Michael Hanselmann
      assert reljobid < 0
2300 b247c6fc Michael Hanselmann
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2301 b247c6fc Michael Hanselmann
2302 b247c6fc Michael Hanselmann
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2303 b247c6fc Michael Hanselmann
      for op in ops:
2304 b247c6fc Michael Hanselmann
        if getattr(op, opcodes.DEPEND_ATTR, None):
2305 b247c6fc Michael Hanselmann
          (status, data) = \
2306 b247c6fc Michael Hanselmann
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2307 b247c6fc Michael Hanselmann
                                         op.depends)
2308 b247c6fc Michael Hanselmann
          if not status:
2309 b247c6fc Michael Hanselmann
            # Abort resolving dependencies
2310 b247c6fc Michael Hanselmann
            assert ht.TNonEmptyString(data), "No error message"
2311 b247c6fc Michael Hanselmann
            break
2312 b247c6fc Michael Hanselmann
          # Use resolved dependencies
2313 b247c6fc Michael Hanselmann
          op.depends = data
2314 b247c6fc Michael Hanselmann
      else:
2315 b247c6fc Michael Hanselmann
        try:
2316 b247c6fc Michael Hanselmann
          job = self._SubmitJobUnlocked(job_id, ops)
2317 b247c6fc Michael Hanselmann
        except errors.GenericError, err:
2318 b247c6fc Michael Hanselmann
          status = False
2319 b247c6fc Michael Hanselmann
          data = self._FormatSubmitError(str(err), ops)
2320 b247c6fc Michael Hanselmann
        else:
2321 b247c6fc Michael Hanselmann
          status = True
2322 b247c6fc Michael Hanselmann
          data = job_id
2323 b247c6fc Michael Hanselmann
          added_jobs.append(job)
2324 b247c6fc Michael Hanselmann
2325 b247c6fc Michael Hanselmann
      results.append((status, data))
2326 b247c6fc Michael Hanselmann
2327 b247c6fc Michael Hanselmann
    return (results, added_jobs)
2328 b247c6fc Michael Hanselmann
2329 75d81fc8 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2330 7b5c4a69 Michael Hanselmann
  def _EnqueueJobs(self, jobs):
2331 7b5c4a69 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2332 7b5c4a69 Michael Hanselmann

2333 7b5c4a69 Michael Hanselmann
    @type jobs: list
2334 7b5c4a69 Michael Hanselmann
    @param jobs: List of all jobs
2335 7b5c4a69 Michael Hanselmann

2336 7b5c4a69 Michael Hanselmann
    """
2337 75d81fc8 Michael Hanselmann
    return self._EnqueueJobsUnlocked(jobs)
2338 75d81fc8 Michael Hanselmann
2339 75d81fc8 Michael Hanselmann
  def _EnqueueJobsUnlocked(self, jobs):
2340 75d81fc8 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2341 75d81fc8 Michael Hanselmann

2342 75d81fc8 Michael Hanselmann
    @type jobs: list
2343 75d81fc8 Michael Hanselmann
    @param jobs: List of all jobs
2344 75d81fc8 Michael Hanselmann

2345 75d81fc8 Michael Hanselmann
    """
2346 75d81fc8 Michael Hanselmann
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2347 7b5c4a69 Michael Hanselmann
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2348 99fb250b Michael Hanselmann
                             priority=[job.CalcPriority() for job in jobs],
2349 99fb250b Michael Hanselmann
                             task_id=map(_GetIdAttr, jobs))
2350 7b5c4a69 Michael Hanselmann
2351 b95479a5 Michael Hanselmann
  def _GetJobStatusForDependencies(self, job_id):
2352 b95479a5 Michael Hanselmann
    """Gets the status of a job for dependencies.
2353 b95479a5 Michael Hanselmann

2354 76b62028 Iustin Pop
    @type job_id: int
2355 b95479a5 Michael Hanselmann
    @param job_id: Job ID
2356 b95479a5 Michael Hanselmann
    @raise errors.JobLost: If job can't be found
2357 b95479a5 Michael Hanselmann

2358 b95479a5 Michael Hanselmann
    """
2359 b95479a5 Michael Hanselmann
    # Not using in-memory cache as doing so would require an exclusive lock
2360 b95479a5 Michael Hanselmann
2361 b95479a5 Michael Hanselmann
    # Try to load from disk
2362 c0f6d0d8 Michael Hanselmann
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2363 c0f6d0d8 Michael Hanselmann
2364 17385bd2 Andrea Spadaccini
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2365 b95479a5 Michael Hanselmann
2366 b95479a5 Michael Hanselmann
    if job:
2367 b95479a5 Michael Hanselmann
      return job.CalcStatus()
2368 b95479a5 Michael Hanselmann
2369 b95479a5 Michael Hanselmann
    raise errors.JobLost("Job %s not found" % job_id)
2370 b95479a5 Michael Hanselmann
2371 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2372 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
2373 ea03467c Iustin Pop
    """Update a job's on disk storage.
2374 ea03467c Iustin Pop

2375 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
2376 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
2377 ea03467c Iustin Pop
    nodes.
2378 ea03467c Iustin Pop

2379 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
2380 ea03467c Iustin Pop
    @param job: the changed job
2381 4c36bdf5 Guido Trotter
    @type replicate: boolean
2382 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
2383 ea03467c Iustin Pop

2384 ea03467c Iustin Pop
    """
2385 66bd7445 Michael Hanselmann
    if __debug__:
2386 66bd7445 Michael Hanselmann
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2387 66bd7445 Michael Hanselmann
      assert (finalized ^ (job.end_timestamp is None))
2388 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't update read-only job"
2389 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't update archived job"
2390 66bd7445 Michael Hanselmann
2391 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
2392 a182a3ed Michael Hanselmann
    data = serializer.DumpJson(job.Serialize())
2393 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
2394 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
2395 ac0930b9 Iustin Pop
2396 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2397 5c735209 Iustin Pop
                        timeout):
2398 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
2399 6c5a7090 Michael Hanselmann

2400 76b62028 Iustin Pop
    @type job_id: int
2401 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
2402 6c5a7090 Michael Hanselmann
    @type fields: list of strings
2403 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
2404 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
2405 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
2406 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
2407 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
2408 5c735209 Iustin Pop
    @type timeout: float
2409 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
2410 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
2411 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
2412 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
2413 ea03467c Iustin Pop

2414 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
2415 ea03467c Iustin Pop
        we instead return a special value,
2416 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2417 ea03467c Iustin Pop
        as such by the clients
2418 6c5a7090 Michael Hanselmann

2419 6c5a7090 Michael Hanselmann
    """
2420 04569469 Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2421 c0f6d0d8 Michael Hanselmann
                             writable=False)
2422 989a8bee Michael Hanselmann
2423 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
2424 989a8bee Michael Hanselmann
2425 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
2426 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
2427 dfe57c22 Michael Hanselmann
2428 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2429 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2430 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
2431 188c5e0a Michael Hanselmann
    """Cancels a job.
2432 188c5e0a Michael Hanselmann

2433 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
2434 ea03467c Iustin Pop

2435 76b62028 Iustin Pop
    @type job_id: int
2436 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
2437 188c5e0a Michael Hanselmann

2438 188c5e0a Michael Hanselmann
    """
2439 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
2440 188c5e0a Michael Hanselmann
2441 aebd0e4e Michael Hanselmann
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2442 aebd0e4e Michael Hanselmann
2443 4679547e Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2444 4679547e Michael Hanselmann
  @_RequireOpenQueue
2445 4679547e Michael Hanselmann
  def ChangeJobPriority(self, job_id, priority):
2446 4679547e Michael Hanselmann
    """Changes a job's priority.
2447 4679547e Michael Hanselmann

2448 4679547e Michael Hanselmann
    @type job_id: int
2449 4679547e Michael Hanselmann
    @param job_id: ID of the job whose priority should be changed
2450 4679547e Michael Hanselmann
    @type priority: int
2451 4679547e Michael Hanselmann
    @param priority: New priority
2452 4679547e Michael Hanselmann

2453 4679547e Michael Hanselmann
    """
2454 4679547e Michael Hanselmann
    logging.info("Changing priority of job %s to %s", job_id, priority)
2455 4679547e Michael Hanselmann
2456 4679547e Michael Hanselmann
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2457 4679547e Michael Hanselmann
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2458 4679547e Michael Hanselmann
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2459 4679547e Michael Hanselmann
                                (priority, allowed))
2460 4679547e Michael Hanselmann
2461 4679547e Michael Hanselmann
    def fn(job):
2462 4679547e Michael Hanselmann
      (success, msg) = job.ChangePriority(priority)
2463 4679547e Michael Hanselmann
2464 4679547e Michael Hanselmann
      if success:
2465 4679547e Michael Hanselmann
        try:
2466 4679547e Michael Hanselmann
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2467 4679547e Michael Hanselmann
        except workerpool.NoSuchTask:
2468 4679547e Michael Hanselmann
          logging.debug("Job %s is not in workerpool at this time", job.id)
2469 4679547e Michael Hanselmann
2470 4679547e Michael Hanselmann
      return (success, msg)
2471 4679547e Michael Hanselmann
2472 4679547e Michael Hanselmann
    return self._ModifyJobUnlocked(job_id, fn)
2473 4679547e Michael Hanselmann
2474 aebd0e4e Michael Hanselmann
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2475 aebd0e4e Michael Hanselmann
    """Modifies a job.
2476 aebd0e4e Michael Hanselmann

2477 aebd0e4e Michael Hanselmann
    @type job_id: int
2478 aebd0e4e Michael Hanselmann
    @param job_id: Job ID
2479 aebd0e4e Michael Hanselmann
    @type mod_fn: callable
2480 aebd0e4e Michael Hanselmann
    @param mod_fn: Modifying function, receiving job object as parameter,
2481 aebd0e4e Michael Hanselmann
      returning tuple of (status boolean, message string)
2482 aebd0e4e Michael Hanselmann

2483 aebd0e4e Michael Hanselmann
    """
2484 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2485 188c5e0a Michael Hanselmann
    if not job:
2486 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2487 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
2488 fbf0262f Michael Hanselmann
2489 aebd0e4e Michael Hanselmann
    assert job.writable, "Can't modify read-only job"
2490 aebd0e4e Michael Hanselmann
    assert not job.archived, "Can't modify archived job"
2491 c0f6d0d8 Michael Hanselmann
2492 aebd0e4e Michael Hanselmann
    (success, msg) = mod_fn(job)
2493 188c5e0a Michael Hanselmann
2494 099b2870 Michael Hanselmann
    if success:
2495 66bd7445 Michael Hanselmann
      # If the job was finalized (e.g. cancelled), this is the final write
2496 66bd7445 Michael Hanselmann
      # allowed. The job can be archived anytime.
2497 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
2498 fbf0262f Michael Hanselmann
2499 099b2870 Michael Hanselmann
    return (success, msg)
2500 fbf0262f Michael Hanselmann
2501 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
2502 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
2503 d7fd1f28 Michael Hanselmann
    """Archives jobs.
2504 c609f802 Michael Hanselmann

2505 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
2506 25e7b43f Iustin Pop
    @param jobs: Job objects
2507 d7fd1f28 Michael Hanselmann
    @rtype: int
2508 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
2509 c609f802 Michael Hanselmann

2510 c609f802 Michael Hanselmann
    """
2511 d7fd1f28 Michael Hanselmann
    archive_jobs = []
2512 d7fd1f28 Michael Hanselmann
    rename_files = []
2513 d7fd1f28 Michael Hanselmann
    for job in jobs:
2514 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't archive read-only job"
2515 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't cancel archived job"
2516 c0f6d0d8 Michael Hanselmann
2517 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2518 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
2519 d7fd1f28 Michael Hanselmann
        continue
2520 c609f802 Michael Hanselmann
2521 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
2522 c609f802 Michael Hanselmann
2523 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
2524 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
2525 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
2526 c609f802 Michael Hanselmann
2527 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
2528 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
2529 f1da30e6 Michael Hanselmann
2530 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
2531 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
2532 d7fd1f28 Michael Hanselmann
2533 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2534 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
2535 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
2536 20571a26 Guido Trotter
    # archived jobs to fix this.
2537 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
2538 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
2539 78d12585 Michael Hanselmann
2540 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2541 07cd723a Iustin Pop
  @_RequireOpenQueue
2542 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
2543 07cd723a Iustin Pop
    """Archives a job.
2544 07cd723a Iustin Pop

2545 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2546 ea03467c Iustin Pop

2547 76b62028 Iustin Pop
    @type job_id: int
2548 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
2549 78d12585 Michael Hanselmann
    @rtype: bool
2550 78d12585 Michael Hanselmann
    @return: Whether job was archived
2551 07cd723a Iustin Pop

2552 07cd723a Iustin Pop
    """
2553 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
2554 78d12585 Michael Hanselmann
2555 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2556 78d12585 Michael Hanselmann
    if not job:
2557 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2558 78d12585 Michael Hanselmann
      return False
2559 78d12585 Michael Hanselmann
2560 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
2561 07cd723a Iustin Pop
2562 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2563 07cd723a Iustin Pop
  @_RequireOpenQueue
2564 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
2565 07cd723a Iustin Pop
    """Archives all jobs based on age.
2566 07cd723a Iustin Pop

2567 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
2568 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
2569 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
2570 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
2571 07cd723a Iustin Pop

2572 07cd723a Iustin Pop
    @type age: int
2573 07cd723a Iustin Pop
    @param age: the minimum age in seconds
2574 07cd723a Iustin Pop

2575 07cd723a Iustin Pop
    """
2576 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
2577 07cd723a Iustin Pop
2578 07cd723a Iustin Pop
    now = time.time()
2579 f8ad5591 Michael Hanselmann
    end_time = now + timeout
2580 f8ad5591 Michael Hanselmann
    archived_count = 0
2581 f8ad5591 Michael Hanselmann
    last_touched = 0
2582 f8ad5591 Michael Hanselmann
2583 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
2584 d7fd1f28 Michael Hanselmann
    pending = []
2585 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
2586 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
2587 f8ad5591 Michael Hanselmann
2588 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
2589 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
2590 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
2591 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
2592 f8ad5591 Michael Hanselmann
        break
2593 f8ad5591 Michael Hanselmann
2594 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
2595 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
2596 f8ad5591 Michael Hanselmann
      if job:
2597 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
2598 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
2599 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
2600 f8ad5591 Michael Hanselmann
          else:
2601 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
2602 07cd723a Iustin Pop
        else:
2603 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
2604 f8ad5591 Michael Hanselmann
2605 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
2606 d7fd1f28 Michael Hanselmann
          pending.append(job)
2607 d7fd1f28 Michael Hanselmann
2608 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
2609 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
2610 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
2611 d7fd1f28 Michael Hanselmann
            pending = []
2612 f8ad5591 Michael Hanselmann
2613 d7fd1f28 Michael Hanselmann
    if pending:
2614 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
2615 07cd723a Iustin Pop
2616 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
2617 07cd723a Iustin Pop
2618 e07f7f7a Michael Hanselmann
  def _Query(self, fields, qfilter):
2619 e07f7f7a Michael Hanselmann
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2620 e07f7f7a Michael Hanselmann
                       namefield="id")
2621 e07f7f7a Michael Hanselmann
2622 0422250e Michael Hanselmann
    # Archived jobs are only looked at if the "archived" field is referenced
2623 0422250e Michael Hanselmann
    # either as a requested field or in the filter. By default archived jobs
2624 0422250e Michael Hanselmann
    # are ignored.
2625 0422250e Michael Hanselmann
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2626 0422250e Michael Hanselmann
2627 e07f7f7a Michael Hanselmann
    job_ids = qobj.RequestedNames()
2628 e07f7f7a Michael Hanselmann
2629 e07f7f7a Michael Hanselmann
    list_all = (job_ids is None)
2630 e07f7f7a Michael Hanselmann
2631 e07f7f7a Michael Hanselmann
    if list_all:
2632 e07f7f7a Michael Hanselmann
      # Since files are added to/removed from the queue atomically, there's no
2633 e07f7f7a Michael Hanselmann
      # risk of getting the job ids in an inconsistent state.
2634 0422250e Michael Hanselmann
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2635 e07f7f7a Michael Hanselmann
2636 e07f7f7a Michael Hanselmann
    jobs = []
2637 e07f7f7a Michael Hanselmann
2638 e07f7f7a Michael Hanselmann
    for job_id in job_ids:
2639 e07f7f7a Michael Hanselmann
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2640 e07f7f7a Michael Hanselmann
      if job is not None or not list_all:
2641 e07f7f7a Michael Hanselmann
        jobs.append((job_id, job))
2642 e07f7f7a Michael Hanselmann
2643 e07f7f7a Michael Hanselmann
    return (qobj, jobs, list_all)
2644 e07f7f7a Michael Hanselmann
2645 e07f7f7a Michael Hanselmann
  def QueryJobs(self, fields, qfilter):
2646 e07f7f7a Michael Hanselmann
    """Returns a list of jobs in queue.
2647 e07f7f7a Michael Hanselmann

2648 e07f7f7a Michael Hanselmann
    @type fields: sequence
2649 e07f7f7a Michael Hanselmann
    @param fields: List of wanted fields
2650 e07f7f7a Michael Hanselmann
    @type qfilter: None or query2 filter (list)
2651 e07f7f7a Michael Hanselmann
    @param qfilter: Query filter
2652 e07f7f7a Michael Hanselmann

2653 e07f7f7a Michael Hanselmann
    """
2654 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2655 e07f7f7a Michael Hanselmann
2656 76b62028 Iustin Pop
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2657 e07f7f7a Michael Hanselmann
2658 e07f7f7a Michael Hanselmann
  def OldStyleQueryJobs(self, job_ids, fields):
2659 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
2660 e2715f69 Michael Hanselmann

2661 ea03467c Iustin Pop
    @type job_ids: list
2662 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
2663 ea03467c Iustin Pop
    @type fields: list
2664 ea03467c Iustin Pop
    @param fields: names of fields to return
2665 ea03467c Iustin Pop
    @rtype: list
2666 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
2667 ea03467c Iustin Pop
        the requested fields
2668 e2715f69 Michael Hanselmann

2669 e2715f69 Michael Hanselmann
    """
2670 76b62028 Iustin Pop
    # backwards compat:
2671 76b62028 Iustin Pop
    job_ids = [int(jid) for jid in job_ids]
2672 e07f7f7a Michael Hanselmann
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2673 e2715f69 Michael Hanselmann
2674 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2675 e2715f69 Michael Hanselmann
2676 76b62028 Iustin Pop
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2677 e2715f69 Michael Hanselmann
2678 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2679 6d5ea385 Michael Hanselmann
  def PrepareShutdown(self):
2680 6d5ea385 Michael Hanselmann
    """Prepare to stop the job queue.
2681 6d5ea385 Michael Hanselmann

2682 6d5ea385 Michael Hanselmann
    Disables execution of jobs in the workerpool and returns whether there are
2683 6d5ea385 Michael Hanselmann
    any jobs currently running. If the latter is the case, the job queue is not
2684 6d5ea385 Michael Hanselmann
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2685 6d5ea385 Michael Hanselmann
    be called without interfering with any job. Queued and unfinished jobs will
2686 6d5ea385 Michael Hanselmann
    be resumed next time.
2687 6d5ea385 Michael Hanselmann

2688 6d5ea385 Michael Hanselmann
    Once this function has been called no new job submissions will be accepted
2689 6d5ea385 Michael Hanselmann
    (see L{_RequireNonDrainedQueue}).
2690 6d5ea385 Michael Hanselmann

2691 6d5ea385 Michael Hanselmann
    @rtype: bool
2692 6d5ea385 Michael Hanselmann
    @return: Whether there are any running jobs
2693 6d5ea385 Michael Hanselmann

2694 6d5ea385 Michael Hanselmann
    """
2695 6d5ea385 Michael Hanselmann
    if self._accepting_jobs:
2696 6d5ea385 Michael Hanselmann
      self._accepting_jobs = False
2697 6d5ea385 Michael Hanselmann
2698 6d5ea385 Michael Hanselmann
      # Tell worker pool to stop processing pending tasks
2699 6d5ea385 Michael Hanselmann
      self._wpool.SetActive(False)
2700 6d5ea385 Michael Hanselmann
2701 6d5ea385 Michael Hanselmann
    return self._wpool.HasRunningTasks()
2702 6d5ea385 Michael Hanselmann
2703 942e2262 Michael Hanselmann
  def AcceptingJobsUnlocked(self):
2704 942e2262 Michael Hanselmann
    """Returns whether jobs are accepted.
2705 942e2262 Michael Hanselmann

2706 942e2262 Michael Hanselmann
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2707 942e2262 Michael Hanselmann
    queue is shutting down.
2708 942e2262 Michael Hanselmann

2709 942e2262 Michael Hanselmann
    @rtype: bool
2710 942e2262 Michael Hanselmann

2711 942e2262 Michael Hanselmann
    """
2712 942e2262 Michael Hanselmann
    return self._accepting_jobs
2713 942e2262 Michael Hanselmann
2714 6d5ea385 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2715 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2716 e2715f69 Michael Hanselmann
  def Shutdown(self):
2717 e2715f69 Michael Hanselmann
    """Stops the job queue.
2718 e2715f69 Michael Hanselmann

2719 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
2720 ea03467c Iustin Pop

2721 e2715f69 Michael Hanselmann
    """
2722 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
2723 85f03e0d Michael Hanselmann
2724 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
2725 a71f9c7d Guido Trotter
    self._queue_filelock = None