Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ aebd0e4e

History | View | Annotate | Download (74.5 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 76b62028 Iustin Pop
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 e2715f69 Michael Hanselmann
import logging
33 f1da30e6 Michael Hanselmann
import errno
34 f1048938 Iustin Pop
import time
35 5685c1a5 Michael Hanselmann
import weakref
36 b95479a5 Michael Hanselmann
import threading
37 dfc8824a Michael Hanselmann
import itertools
38 498ae1cc Iustin Pop
39 6c2549d6 Guido Trotter
try:
40 b459a848 Andrea Spadaccini
  # pylint: disable=E0611
41 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
42 6c2549d6 Guido Trotter
except ImportError:
43 6c2549d6 Guido Trotter
  import pyinotify
44 6c2549d6 Guido Trotter
45 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
46 e2715f69 Michael Hanselmann
from ganeti import constants
47 f1da30e6 Michael Hanselmann
from ganeti import serializer
48 e2715f69 Michael Hanselmann
from ganeti import workerpool
49 99bd4f0a Guido Trotter
from ganeti import locking
50 f1da30e6 Michael Hanselmann
from ganeti import opcodes
51 7a1ecaed Iustin Pop
from ganeti import errors
52 e2715f69 Michael Hanselmann
from ganeti import mcpu
53 7996a135 Iustin Pop
from ganeti import utils
54 04ab05ce Michael Hanselmann
from ganeti import jstore
55 c3f0a12f Iustin Pop
from ganeti import rpc
56 82b22e19 René Nussbaumer
from ganeti import runtime
57 a744b676 Manuel Franceschini
from ganeti import netutils
58 989a8bee Michael Hanselmann
from ganeti import compat
59 b95479a5 Michael Hanselmann
from ganeti import ht
60 a06c6ae8 Michael Hanselmann
from ganeti import query
61 a06c6ae8 Michael Hanselmann
from ganeti import qlang
62 e2b4a7ba Michael Hanselmann
from ganeti import pathutils
63 cffbbae7 Michael Hanselmann
from ganeti import vcluster
64 e2715f69 Michael Hanselmann
65 fbf0262f Michael Hanselmann
66 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
67 e2715f69 Michael Hanselmann
68 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
69 ebb80afa Guido Trotter
_LOCK = "_lock"
70 ebb80afa Guido Trotter
_QUEUE = "_queue"
71 99bd4f0a Guido Trotter
72 498ae1cc Iustin Pop
73 9728ae5d Iustin Pop
class CancelJob(Exception):
74 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
75 fbf0262f Michael Hanselmann

76 fbf0262f Michael Hanselmann
  """
77 fbf0262f Michael Hanselmann
78 fbf0262f Michael Hanselmann
79 70552c46 Michael Hanselmann
def TimeStampNow():
80 ea03467c Iustin Pop
  """Returns the current timestamp.
81 ea03467c Iustin Pop

82 ea03467c Iustin Pop
  @rtype: tuple
83 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
84 ea03467c Iustin Pop

85 ea03467c Iustin Pop
  """
86 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
87 70552c46 Michael Hanselmann
88 70552c46 Michael Hanselmann
89 cffbbae7 Michael Hanselmann
def _CallJqUpdate(runner, names, file_name, content):
90 cffbbae7 Michael Hanselmann
  """Updates job queue file after virtualizing filename.
91 cffbbae7 Michael Hanselmann

92 cffbbae7 Michael Hanselmann
  """
93 cffbbae7 Michael Hanselmann
  virt_file_name = vcluster.MakeVirtualPath(file_name)
94 cffbbae7 Michael Hanselmann
  return runner.call_jobqueue_update(names, virt_file_name, content)
95 cffbbae7 Michael Hanselmann
96 cffbbae7 Michael Hanselmann
97 a06c6ae8 Michael Hanselmann
class _SimpleJobQuery:
98 a06c6ae8 Michael Hanselmann
  """Wrapper for job queries.
99 a06c6ae8 Michael Hanselmann

100 a06c6ae8 Michael Hanselmann
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
101 a06c6ae8 Michael Hanselmann

102 a06c6ae8 Michael Hanselmann
  """
103 a06c6ae8 Michael Hanselmann
  def __init__(self, fields):
104 a06c6ae8 Michael Hanselmann
    """Initializes this class.
105 a06c6ae8 Michael Hanselmann

106 a06c6ae8 Michael Hanselmann
    """
107 a06c6ae8 Michael Hanselmann
    self._query = query.Query(query.JOB_FIELDS, fields)
108 a06c6ae8 Michael Hanselmann
109 a06c6ae8 Michael Hanselmann
  def __call__(self, job):
110 a06c6ae8 Michael Hanselmann
    """Executes a job query using cached field list.
111 a06c6ae8 Michael Hanselmann

112 a06c6ae8 Michael Hanselmann
    """
113 a06c6ae8 Michael Hanselmann
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
114 a06c6ae8 Michael Hanselmann
115 a06c6ae8 Michael Hanselmann
116 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
117 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
118 e2715f69 Michael Hanselmann

119 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
120 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
121 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
122 ea03467c Iustin Pop
  @ivar status: the current status
123 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
124 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
125 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
126 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
127 f1048938 Iustin Pop

128 e2715f69 Michael Hanselmann
  """
129 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
130 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
131 66d895a8 Iustin Pop
               "__weakref__"]
132 66d895a8 Iustin Pop
133 85f03e0d Michael Hanselmann
  def __init__(self, op):
134 66abb9ff Michael Hanselmann
    """Initializes instances of this class.
135 ea03467c Iustin Pop

136 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
137 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
138 ea03467c Iustin Pop

139 ea03467c Iustin Pop
    """
140 85f03e0d Michael Hanselmann
    self.input = op
141 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
142 85f03e0d Michael Hanselmann
    self.result = None
143 85f03e0d Michael Hanselmann
    self.log = []
144 70552c46 Michael Hanselmann
    self.start_timestamp = None
145 b9b5abcb Iustin Pop
    self.exec_timestamp = None
146 70552c46 Michael Hanselmann
    self.end_timestamp = None
147 f1da30e6 Michael Hanselmann
148 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
149 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
150 8f5c488d Michael Hanselmann
151 f1da30e6 Michael Hanselmann
  @classmethod
152 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
153 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
154 ea03467c Iustin Pop

155 ea03467c Iustin Pop
    @type state: dict
156 ea03467c Iustin Pop
    @param state: the serialized state
157 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
158 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
159 ea03467c Iustin Pop

160 ea03467c Iustin Pop
    """
161 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
162 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
163 85f03e0d Michael Hanselmann
    obj.status = state["status"]
164 85f03e0d Michael Hanselmann
    obj.result = state["result"]
165 85f03e0d Michael Hanselmann
    obj.log = state["log"]
166 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
167 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
168 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
169 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
170 f1da30e6 Michael Hanselmann
    return obj
171 f1da30e6 Michael Hanselmann
172 f1da30e6 Michael Hanselmann
  def Serialize(self):
173 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
174 ea03467c Iustin Pop

175 ea03467c Iustin Pop
    @rtype: dict
176 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
177 ea03467c Iustin Pop

178 ea03467c Iustin Pop
    """
179 6c5a7090 Michael Hanselmann
    return {
180 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
181 6c5a7090 Michael Hanselmann
      "status": self.status,
182 6c5a7090 Michael Hanselmann
      "result": self.result,
183 6c5a7090 Michael Hanselmann
      "log": self.log,
184 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
185 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
186 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
187 8f5c488d Michael Hanselmann
      "priority": self.priority,
188 6c5a7090 Michael Hanselmann
      }
189 f1048938 Iustin Pop
190 e2715f69 Michael Hanselmann
191 e2715f69 Michael Hanselmann
class _QueuedJob(object):
192 e2715f69 Michael Hanselmann
  """In-memory job representation.
193 e2715f69 Michael Hanselmann

194 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
195 ea03467c Iustin Pop
  be taken care of by users of this class.
196 ea03467c Iustin Pop

197 ea03467c Iustin Pop
  @type queue: L{JobQueue}
198 ea03467c Iustin Pop
  @ivar queue: the parent queue
199 ea03467c Iustin Pop
  @ivar id: the job ID
200 ea03467c Iustin Pop
  @type ops: list
201 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
202 ea03467c Iustin Pop
  @type log_serial: int
203 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
204 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
205 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
206 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
207 c0f6d0d8 Michael Hanselmann
  @ivar writable: Whether the job is allowed to be modified
208 e2715f69 Michael Hanselmann

209 e2715f69 Michael Hanselmann
  """
210 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
211 26d3fd2f Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
212 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
213 8a3cd185 Michael Hanselmann
               "__weakref__", "processor_lock", "writable", "archived"]
214 66d895a8 Iustin Pop
215 c0f6d0d8 Michael Hanselmann
  def __init__(self, queue, job_id, ops, writable):
216 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
217 ea03467c Iustin Pop

218 ea03467c Iustin Pop
    @type queue: L{JobQueue}
219 ea03467c Iustin Pop
    @param queue: our parent queue
220 ea03467c Iustin Pop
    @type job_id: job_id
221 ea03467c Iustin Pop
    @param job_id: our job id
222 ea03467c Iustin Pop
    @type ops: list
223 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
224 ea03467c Iustin Pop
        in _QueuedOpCodes
225 c0f6d0d8 Michael Hanselmann
    @type writable: bool
226 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
227 ea03467c Iustin Pop

228 ea03467c Iustin Pop
    """
229 e2715f69 Michael Hanselmann
    if not ops:
230 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
231 e2715f69 Michael Hanselmann
232 85f03e0d Michael Hanselmann
    self.queue = queue
233 76b62028 Iustin Pop
    self.id = int(job_id)
234 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
235 6c5a7090 Michael Hanselmann
    self.log_serial = 0
236 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
237 c56ec146 Iustin Pop
    self.start_timestamp = None
238 c56ec146 Iustin Pop
    self.end_timestamp = None
239 8a3cd185 Michael Hanselmann
    self.archived = False
240 6c5a7090 Michael Hanselmann
241 c0f6d0d8 Michael Hanselmann
    self._InitInMemory(self, writable)
242 fa4aa6b4 Michael Hanselmann
243 8a3cd185 Michael Hanselmann
    assert not self.archived, "New jobs can not be marked as archived"
244 8a3cd185 Michael Hanselmann
245 fa4aa6b4 Michael Hanselmann
  @staticmethod
246 c0f6d0d8 Michael Hanselmann
  def _InitInMemory(obj, writable):
247 fa4aa6b4 Michael Hanselmann
    """Initializes in-memory variables.
248 fa4aa6b4 Michael Hanselmann

249 fa4aa6b4 Michael Hanselmann
    """
250 c0f6d0d8 Michael Hanselmann
    obj.writable = writable
251 03b63608 Michael Hanselmann
    obj.ops_iter = None
252 26d3fd2f Michael Hanselmann
    obj.cur_opctx = None
253 f8a4adfa Michael Hanselmann
254 f8a4adfa Michael Hanselmann
    # Read-only jobs are not processed and therefore don't need a lock
255 f8a4adfa Michael Hanselmann
    if writable:
256 f8a4adfa Michael Hanselmann
      obj.processor_lock = threading.Lock()
257 f8a4adfa Michael Hanselmann
    else:
258 f8a4adfa Michael Hanselmann
      obj.processor_lock = None
259 be760ba8 Michael Hanselmann
260 9fa2e150 Michael Hanselmann
  def __repr__(self):
261 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
262 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
263 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
264 9fa2e150 Michael Hanselmann
265 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
266 9fa2e150 Michael Hanselmann
267 f1da30e6 Michael Hanselmann
  @classmethod
268 8a3cd185 Michael Hanselmann
  def Restore(cls, queue, state, writable, archived):
269 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
270 ea03467c Iustin Pop

271 ea03467c Iustin Pop
    @type queue: L{JobQueue}
272 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
273 ea03467c Iustin Pop
    @type state: dict
274 ea03467c Iustin Pop
    @param state: the serialized state
275 c0f6d0d8 Michael Hanselmann
    @type writable: bool
276 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
277 8a3cd185 Michael Hanselmann
    @type archived: bool
278 8a3cd185 Michael Hanselmann
    @param archived: Whether job was already archived
279 ea03467c Iustin Pop
    @rtype: _JobQueue
280 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
281 ea03467c Iustin Pop

282 ea03467c Iustin Pop
    """
283 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
284 85f03e0d Michael Hanselmann
    obj.queue = queue
285 76b62028 Iustin Pop
    obj.id = int(state["id"])
286 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
287 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
288 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
289 8a3cd185 Michael Hanselmann
    obj.archived = archived
290 6c5a7090 Michael Hanselmann
291 6c5a7090 Michael Hanselmann
    obj.ops = []
292 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
293 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
294 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
295 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
296 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
297 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
298 6c5a7090 Michael Hanselmann
299 c0f6d0d8 Michael Hanselmann
    cls._InitInMemory(obj, writable)
300 be760ba8 Michael Hanselmann
301 f1da30e6 Michael Hanselmann
    return obj
302 f1da30e6 Michael Hanselmann
303 f1da30e6 Michael Hanselmann
  def Serialize(self):
304 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
305 ea03467c Iustin Pop

306 ea03467c Iustin Pop
    @rtype: dict
307 ea03467c Iustin Pop
    @return: the serialized state
308 ea03467c Iustin Pop

309 ea03467c Iustin Pop
    """
310 f1da30e6 Michael Hanselmann
    return {
311 f1da30e6 Michael Hanselmann
      "id": self.id,
312 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
313 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
314 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
315 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
316 f1da30e6 Michael Hanselmann
      }
317 f1da30e6 Michael Hanselmann
318 85f03e0d Michael Hanselmann
  def CalcStatus(self):
319 ea03467c Iustin Pop
    """Compute the status of this job.
320 ea03467c Iustin Pop

321 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
322 ea03467c Iustin Pop
    based on their status, computes the job status.
323 ea03467c Iustin Pop

324 ea03467c Iustin Pop
    The algorithm is:
325 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
326 ea03467c Iustin Pop
        status will be the same
327 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
328 ea03467c Iustin Pop
          - waitlock
329 fbf0262f Michael Hanselmann
          - canceling
330 ea03467c Iustin Pop
          - running
331 ea03467c Iustin Pop

332 ea03467c Iustin Pop
        will determine the job status
333 ea03467c Iustin Pop

334 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
335 ea03467c Iustin Pop
        and the job status will be the same
336 ea03467c Iustin Pop

337 ea03467c Iustin Pop
    @return: the job status
338 ea03467c Iustin Pop

339 ea03467c Iustin Pop
    """
340 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
341 e2715f69 Michael Hanselmann
342 e2715f69 Michael Hanselmann
    all_success = True
343 85f03e0d Michael Hanselmann
    for op in self.ops:
344 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
345 e2715f69 Michael Hanselmann
        continue
346 e2715f69 Michael Hanselmann
347 e2715f69 Michael Hanselmann
      all_success = False
348 e2715f69 Michael Hanselmann
349 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
350 e2715f69 Michael Hanselmann
        pass
351 47099cd1 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING:
352 47099cd1 Michael Hanselmann
        status = constants.JOB_STATUS_WAITING
353 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
354 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
355 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
356 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
357 fbf0262f Michael Hanselmann
        break
358 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
359 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
360 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
361 f1da30e6 Michael Hanselmann
        break
362 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
363 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
364 4cb1d919 Michael Hanselmann
        break
365 e2715f69 Michael Hanselmann
366 e2715f69 Michael Hanselmann
    if all_success:
367 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
368 e2715f69 Michael Hanselmann
369 e2715f69 Michael Hanselmann
    return status
370 e2715f69 Michael Hanselmann
371 8f5c488d Michael Hanselmann
  def CalcPriority(self):
372 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
373 8f5c488d Michael Hanselmann

374 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
375 8f5c488d Michael Hanselmann
    priority is used.
376 8f5c488d Michael Hanselmann

377 8f5c488d Michael Hanselmann
    @rtype: int
378 8f5c488d Michael Hanselmann

379 8f5c488d Michael Hanselmann
    """
380 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
381 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
382 8f5c488d Michael Hanselmann
383 8f5c488d Michael Hanselmann
    if not priorities:
384 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
385 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
386 8f5c488d Michael Hanselmann
387 8f5c488d Michael Hanselmann
    return min(priorities)
388 8f5c488d Michael Hanselmann
389 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
390 ea03467c Iustin Pop
    """Selectively returns the log entries.
391 ea03467c Iustin Pop

392 ea03467c Iustin Pop
    @type newer_than: None or int
393 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
394 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
395 ea03467c Iustin Pop
        than this value
396 ea03467c Iustin Pop
    @rtype: list
397 ea03467c Iustin Pop
    @return: the list of the log entries selected
398 ea03467c Iustin Pop

399 ea03467c Iustin Pop
    """
400 6c5a7090 Michael Hanselmann
    if newer_than is None:
401 6c5a7090 Michael Hanselmann
      serial = -1
402 6c5a7090 Michael Hanselmann
    else:
403 6c5a7090 Michael Hanselmann
      serial = newer_than
404 6c5a7090 Michael Hanselmann
405 6c5a7090 Michael Hanselmann
    entries = []
406 6c5a7090 Michael Hanselmann
    for op in self.ops:
407 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
408 6c5a7090 Michael Hanselmann
409 6c5a7090 Michael Hanselmann
    return entries
410 6c5a7090 Michael Hanselmann
411 6a290889 Guido Trotter
  def GetInfo(self, fields):
412 6a290889 Guido Trotter
    """Returns information about a job.
413 6a290889 Guido Trotter

414 6a290889 Guido Trotter
    @type fields: list
415 6a290889 Guido Trotter
    @param fields: names of fields to return
416 6a290889 Guido Trotter
    @rtype: list
417 6a290889 Guido Trotter
    @return: list with one element for each field
418 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
419 6a290889 Guido Trotter
        has been passed
420 6a290889 Guido Trotter

421 6a290889 Guido Trotter
    """
422 a06c6ae8 Michael Hanselmann
    return _SimpleJobQuery(fields)(self)
423 6a290889 Guido Trotter
424 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
425 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
426 34327f51 Iustin Pop

427 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
428 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
429 34327f51 Iustin Pop
    finalised are not changed.
430 34327f51 Iustin Pop

431 34327f51 Iustin Pop
    @param status: a given opcode status
432 34327f51 Iustin Pop
    @param result: the opcode result
433 34327f51 Iustin Pop

434 34327f51 Iustin Pop
    """
435 747f6113 Michael Hanselmann
    not_marked = True
436 747f6113 Michael Hanselmann
    for op in self.ops:
437 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
438 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
439 747f6113 Michael Hanselmann
        continue
440 747f6113 Michael Hanselmann
      op.status = status
441 747f6113 Michael Hanselmann
      op.result = result
442 747f6113 Michael Hanselmann
      not_marked = False
443 34327f51 Iustin Pop
444 66bd7445 Michael Hanselmann
  def Finalize(self):
445 66bd7445 Michael Hanselmann
    """Marks the job as finalized.
446 66bd7445 Michael Hanselmann

447 66bd7445 Michael Hanselmann
    """
448 66bd7445 Michael Hanselmann
    self.end_timestamp = TimeStampNow()
449 66bd7445 Michael Hanselmann
450 099b2870 Michael Hanselmann
  def Cancel(self):
451 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
452 a0d2fe2c Michael Hanselmann

453 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
454 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
455 a0d2fe2c Michael Hanselmann
      as canceling and a text message
456 a0d2fe2c Michael Hanselmann

457 a0d2fe2c Michael Hanselmann
    """
458 099b2870 Michael Hanselmann
    status = self.CalcStatus()
459 099b2870 Michael Hanselmann
460 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
461 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
462 099b2870 Michael Hanselmann
                             "Job canceled by request")
463 66bd7445 Michael Hanselmann
      self.Finalize()
464 86b16e9d Michael Hanselmann
      return (True, "Job %s canceled" % self.id)
465 099b2870 Michael Hanselmann
466 47099cd1 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITING:
467 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
468 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
469 86b16e9d Michael Hanselmann
      return (True, "Job %s will be canceled" % self.id)
470 099b2870 Michael Hanselmann
471 86b16e9d Michael Hanselmann
    else:
472 86b16e9d Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
473 86b16e9d Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
474 099b2870 Michael Hanselmann
475 f1048938 Iustin Pop
476 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
477 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
478 031a3e57 Michael Hanselmann
    """Initializes this class.
479 ea03467c Iustin Pop

480 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
481 031a3e57 Michael Hanselmann
    @param queue: Job queue
482 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
483 031a3e57 Michael Hanselmann
    @param job: Job object
484 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
485 031a3e57 Michael Hanselmann
    @param op: OpCode
486 031a3e57 Michael Hanselmann

487 031a3e57 Michael Hanselmann
    """
488 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
489 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
490 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
491 031a3e57 Michael Hanselmann
492 031a3e57 Michael Hanselmann
    self._queue = queue
493 031a3e57 Michael Hanselmann
    self._job = job
494 031a3e57 Michael Hanselmann
    self._op = op
495 031a3e57 Michael Hanselmann
496 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
497 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
498 dc1e2262 Michael Hanselmann

499 dc1e2262 Michael Hanselmann
    """
500 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
501 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
502 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
503 dc1e2262 Michael Hanselmann
      raise CancelJob()
504 dc1e2262 Michael Hanselmann
505 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
506 031a3e57 Michael Hanselmann
  def NotifyStart(self):
507 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
508 e92376d7 Iustin Pop

509 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
510 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
511 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
512 47099cd1 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
513 e92376d7 Iustin Pop

514 e92376d7 Iustin Pop
    """
515 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
516 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
517 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
518 fbf0262f Michael Hanselmann
519 271daef8 Iustin Pop
    # Cancel here if we were asked to
520 dc1e2262 Michael Hanselmann
    self._CheckCancel()
521 fbf0262f Michael Hanselmann
522 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
523 9bdab621 Michael Hanselmann
524 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
525 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
526 271daef8 Iustin Pop
527 271daef8 Iustin Pop
    # And finally replicate the job status
528 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
529 031a3e57 Michael Hanselmann
530 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
531 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
532 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
533 9bf5e01f Guido Trotter

534 9bf5e01f Guido Trotter
    """
535 9bf5e01f Guido Trotter
    self._job.log_serial += 1
536 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
537 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
538 9bf5e01f Guido Trotter
539 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
540 031a3e57 Michael Hanselmann
    """Append a log entry.
541 031a3e57 Michael Hanselmann

542 031a3e57 Michael Hanselmann
    """
543 031a3e57 Michael Hanselmann
    assert len(args) < 3
544 031a3e57 Michael Hanselmann
545 031a3e57 Michael Hanselmann
    if len(args) == 1:
546 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
547 031a3e57 Michael Hanselmann
      log_msg = args[0]
548 031a3e57 Michael Hanselmann
    else:
549 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
550 031a3e57 Michael Hanselmann
551 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
552 031a3e57 Michael Hanselmann
    # precision.
553 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
554 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
555 031a3e57 Michael Hanselmann
556 acf931b7 Michael Hanselmann
  def CheckCancel(self):
557 acf931b7 Michael Hanselmann
    """Check whether job has been cancelled.
558 ef2df7d3 Michael Hanselmann

559 ef2df7d3 Michael Hanselmann
    """
560 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
561 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
562 dc1e2262 Michael Hanselmann
563 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
564 dc1e2262 Michael Hanselmann
    self._CheckCancel()
565 dc1e2262 Michael Hanselmann
566 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
567 6a373640 Michael Hanselmann
    """Submits jobs for processing.
568 6a373640 Michael Hanselmann

569 6a373640 Michael Hanselmann
    See L{JobQueue.SubmitManyJobs}.
570 6a373640 Michael Hanselmann

571 6a373640 Michael Hanselmann
    """
572 6a373640 Michael Hanselmann
    # Locking is done in job queue
573 6a373640 Michael Hanselmann
    return self._queue.SubmitManyJobs(jobs)
574 6a373640 Michael Hanselmann
575 031a3e57 Michael Hanselmann
576 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
577 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
578 989a8bee Michael Hanselmann
    """Initializes this class.
579 6c2549d6 Guido Trotter

580 989a8bee Michael Hanselmann
    @type fields: list of strings
581 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
582 989a8bee Michael Hanselmann
    @type prev_job_info: string
583 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
584 989a8bee Michael Hanselmann
    @type prev_log_serial: string
585 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
586 6c2549d6 Guido Trotter

587 989a8bee Michael Hanselmann
    """
588 dc2879ea Michael Hanselmann
    self._squery = _SimpleJobQuery(fields)
589 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
590 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
591 6c2549d6 Guido Trotter
592 989a8bee Michael Hanselmann
  def __call__(self, job):
593 989a8bee Michael Hanselmann
    """Checks whether job has changed.
594 6c2549d6 Guido Trotter

595 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
596 989a8bee Michael Hanselmann
    @param job: Job object
597 6c2549d6 Guido Trotter

598 6c2549d6 Guido Trotter
    """
599 c0f6d0d8 Michael Hanselmann
    assert not job.writable, "Expected read-only job"
600 c0f6d0d8 Michael Hanselmann
601 989a8bee Michael Hanselmann
    status = job.CalcStatus()
602 dc2879ea Michael Hanselmann
    job_info = self._squery(job)
603 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
604 6c2549d6 Guido Trotter
605 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
606 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
607 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
608 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
609 6c2549d6 Guido Trotter
    # significantly different.
610 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
611 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
612 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
613 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
614 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
615 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
616 6c2549d6 Guido Trotter
617 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
618 6c2549d6 Guido Trotter
    # no changes.
619 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
620 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
621 47099cd1 Michael Hanselmann
                       constants.JOB_STATUS_WAITING) or
622 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
623 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
624 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
625 989a8bee Michael Hanselmann
      return (job_info, log_entries)
626 6c2549d6 Guido Trotter
627 989a8bee Michael Hanselmann
    return None
628 989a8bee Michael Hanselmann
629 989a8bee Michael Hanselmann
630 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
631 989a8bee Michael Hanselmann
  def __init__(self, filename):
632 989a8bee Michael Hanselmann
    """Initializes this class.
633 989a8bee Michael Hanselmann

634 989a8bee Michael Hanselmann
    @type filename: string
635 989a8bee Michael Hanselmann
    @param filename: Path to job file
636 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
637 6c2549d6 Guido Trotter

638 989a8bee Michael Hanselmann
    """
639 989a8bee Michael Hanselmann
    self._wm = pyinotify.WatchManager()
640 989a8bee Michael Hanselmann
    self._inotify_handler = \
641 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
642 989a8bee Michael Hanselmann
    self._notifier = \
643 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
644 989a8bee Michael Hanselmann
    try:
645 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
646 989a8bee Michael Hanselmann
    except Exception:
647 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
648 989a8bee Michael Hanselmann
      self._notifier.stop()
649 989a8bee Michael Hanselmann
      raise
650 989a8bee Michael Hanselmann
651 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
652 989a8bee Michael Hanselmann
    """Callback for inotify.
653 989a8bee Michael Hanselmann

654 989a8bee Michael Hanselmann
    """
655 6c2549d6 Guido Trotter
    if not notifier_enabled:
656 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
657 989a8bee Michael Hanselmann
658 989a8bee Michael Hanselmann
  def Wait(self, timeout):
659 989a8bee Michael Hanselmann
    """Waits for the job file to change.
660 989a8bee Michael Hanselmann

661 989a8bee Michael Hanselmann
    @type timeout: float
662 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
663 989a8bee Michael Hanselmann
    @return: Whether there have been events
664 989a8bee Michael Hanselmann

665 989a8bee Michael Hanselmann
    """
666 989a8bee Michael Hanselmann
    assert timeout >= 0
667 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
668 989a8bee Michael Hanselmann
    if have_events:
669 989a8bee Michael Hanselmann
      self._notifier.read_events()
670 989a8bee Michael Hanselmann
    self._notifier.process_events()
671 989a8bee Michael Hanselmann
    return have_events
672 989a8bee Michael Hanselmann
673 989a8bee Michael Hanselmann
  def Close(self):
674 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
675 989a8bee Michael Hanselmann

676 989a8bee Michael Hanselmann
    """
677 989a8bee Michael Hanselmann
    self._notifier.stop()
678 989a8bee Michael Hanselmann
679 989a8bee Michael Hanselmann
680 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
681 989a8bee Michael Hanselmann
  def __init__(self, filename):
682 989a8bee Michael Hanselmann
    """Initializes this class.
683 989a8bee Michael Hanselmann

684 989a8bee Michael Hanselmann
    @type filename: string
685 989a8bee Michael Hanselmann
    @param filename: Path to job file
686 989a8bee Michael Hanselmann

687 989a8bee Michael Hanselmann
    """
688 989a8bee Michael Hanselmann
    self._filewaiter = None
689 989a8bee Michael Hanselmann
    self._filename = filename
690 6c2549d6 Guido Trotter
691 989a8bee Michael Hanselmann
  def Wait(self, timeout):
692 989a8bee Michael Hanselmann
    """Waits for a job to change.
693 6c2549d6 Guido Trotter

694 989a8bee Michael Hanselmann
    @type timeout: float
695 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
696 989a8bee Michael Hanselmann
    @return: Whether there have been events
697 989a8bee Michael Hanselmann

698 989a8bee Michael Hanselmann
    """
699 989a8bee Michael Hanselmann
    if self._filewaiter:
700 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
701 989a8bee Michael Hanselmann
702 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
703 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
704 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
705 989a8bee Michael Hanselmann
    # race condition.
706 989a8bee Michael Hanselmann
    self._filewaiter = _JobFileChangesWaiter(self._filename)
707 989a8bee Michael Hanselmann
708 989a8bee Michael Hanselmann
    return True
709 989a8bee Michael Hanselmann
710 989a8bee Michael Hanselmann
  def Close(self):
711 989a8bee Michael Hanselmann
    """Closes underlying waiter.
712 989a8bee Michael Hanselmann

713 989a8bee Michael Hanselmann
    """
714 989a8bee Michael Hanselmann
    if self._filewaiter:
715 989a8bee Michael Hanselmann
      self._filewaiter.Close()
716 989a8bee Michael Hanselmann
717 989a8bee Michael Hanselmann
718 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
719 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
720 989a8bee Michael Hanselmann

721 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
722 989a8bee Michael Hanselmann
  the current job status has changed.
723 989a8bee Michael Hanselmann

724 989a8bee Michael Hanselmann
  """
725 989a8bee Michael Hanselmann
  @staticmethod
726 dfc8824a Michael Hanselmann
  def _CheckForChanges(counter, job_load_fn, check_fn):
727 dfc8824a Michael Hanselmann
    if counter.next() > 0:
728 dfc8824a Michael Hanselmann
      # If this isn't the first check the job is given some more time to change
729 dfc8824a Michael Hanselmann
      # again. This gives better performance for jobs generating many
730 dfc8824a Michael Hanselmann
      # changes/messages.
731 dfc8824a Michael Hanselmann
      time.sleep(0.1)
732 dfc8824a Michael Hanselmann
733 989a8bee Michael Hanselmann
    job = job_load_fn()
734 989a8bee Michael Hanselmann
    if not job:
735 989a8bee Michael Hanselmann
      raise errors.JobLost()
736 989a8bee Michael Hanselmann
737 989a8bee Michael Hanselmann
    result = check_fn(job)
738 989a8bee Michael Hanselmann
    if result is None:
739 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
740 989a8bee Michael Hanselmann
741 989a8bee Michael Hanselmann
    return result
742 989a8bee Michael Hanselmann
743 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
744 989a8bee Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout):
745 989a8bee Michael Hanselmann
    """Waits for changes on a job.
746 989a8bee Michael Hanselmann

747 989a8bee Michael Hanselmann
    @type filename: string
748 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
749 989a8bee Michael Hanselmann
    @type job_load_fn: callable
750 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
751 989a8bee Michael Hanselmann
    @type fields: list of strings
752 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
753 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
754 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
755 989a8bee Michael Hanselmann
    @type prev_log_serial: int
756 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
757 989a8bee Michael Hanselmann
    @type timeout: float
758 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
759 989a8bee Michael Hanselmann

760 989a8bee Michael Hanselmann
    """
761 dfc8824a Michael Hanselmann
    counter = itertools.count()
762 6c2549d6 Guido Trotter
    try:
763 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
764 989a8bee Michael Hanselmann
      waiter = _JobChangesWaiter(filename)
765 989a8bee Michael Hanselmann
      try:
766 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
767 dfc8824a Michael Hanselmann
                                          counter, job_load_fn, check_fn),
768 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
769 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
770 989a8bee Michael Hanselmann
      finally:
771 989a8bee Michael Hanselmann
        waiter.Close()
772 6c2549d6 Guido Trotter
    except (errors.InotifyError, errors.JobLost):
773 6c2549d6 Guido Trotter
      return None
774 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
775 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
776 6c2549d6 Guido Trotter
777 6c2549d6 Guido Trotter
778 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
779 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
780 6760e4ed Michael Hanselmann

781 6760e4ed Michael Hanselmann
  """
782 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
783 6760e4ed Michael Hanselmann
    to_encode = err
784 6760e4ed Michael Hanselmann
  else:
785 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
786 6760e4ed Michael Hanselmann
787 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
788 6760e4ed Michael Hanselmann
789 6760e4ed Michael Hanselmann
790 26d3fd2f Michael Hanselmann
class _TimeoutStrategyWrapper:
791 26d3fd2f Michael Hanselmann
  def __init__(self, fn):
792 26d3fd2f Michael Hanselmann
    """Initializes this class.
793 26d3fd2f Michael Hanselmann

794 26d3fd2f Michael Hanselmann
    """
795 26d3fd2f Michael Hanselmann
    self._fn = fn
796 26d3fd2f Michael Hanselmann
    self._next = None
797 26d3fd2f Michael Hanselmann
798 26d3fd2f Michael Hanselmann
  def _Advance(self):
799 26d3fd2f Michael Hanselmann
    """Gets the next timeout if necessary.
800 26d3fd2f Michael Hanselmann

801 26d3fd2f Michael Hanselmann
    """
802 26d3fd2f Michael Hanselmann
    if self._next is None:
803 26d3fd2f Michael Hanselmann
      self._next = self._fn()
804 26d3fd2f Michael Hanselmann
805 26d3fd2f Michael Hanselmann
  def Peek(self):
806 26d3fd2f Michael Hanselmann
    """Returns the next timeout.
807 26d3fd2f Michael Hanselmann

808 26d3fd2f Michael Hanselmann
    """
809 26d3fd2f Michael Hanselmann
    self._Advance()
810 26d3fd2f Michael Hanselmann
    return self._next
811 26d3fd2f Michael Hanselmann
812 26d3fd2f Michael Hanselmann
  def Next(self):
813 26d3fd2f Michael Hanselmann
    """Returns the current timeout and advances the internal state.
814 26d3fd2f Michael Hanselmann

815 26d3fd2f Michael Hanselmann
    """
816 26d3fd2f Michael Hanselmann
    self._Advance()
817 26d3fd2f Michael Hanselmann
    result = self._next
818 26d3fd2f Michael Hanselmann
    self._next = None
819 26d3fd2f Michael Hanselmann
    return result
820 26d3fd2f Michael Hanselmann
821 26d3fd2f Michael Hanselmann
822 b80cc518 Michael Hanselmann
class _OpExecContext:
823 26d3fd2f Michael Hanselmann
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
824 b80cc518 Michael Hanselmann
    """Initializes this class.
825 b80cc518 Michael Hanselmann

826 b80cc518 Michael Hanselmann
    """
827 b80cc518 Michael Hanselmann
    self.op = op
828 b80cc518 Michael Hanselmann
    self.index = index
829 b80cc518 Michael Hanselmann
    self.log_prefix = log_prefix
830 b80cc518 Michael Hanselmann
    self.summary = op.input.Summary()
831 b80cc518 Michael Hanselmann
832 b95479a5 Michael Hanselmann
    # Create local copy to modify
833 b95479a5 Michael Hanselmann
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
834 b95479a5 Michael Hanselmann
      self.jobdeps = op.input.depends[:]
835 b95479a5 Michael Hanselmann
    else:
836 b95479a5 Michael Hanselmann
      self.jobdeps = None
837 b95479a5 Michael Hanselmann
838 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = timeout_strategy_factory
839 26d3fd2f Michael Hanselmann
    self._ResetTimeoutStrategy()
840 26d3fd2f Michael Hanselmann
841 26d3fd2f Michael Hanselmann
  def _ResetTimeoutStrategy(self):
842 26d3fd2f Michael Hanselmann
    """Creates a new timeout strategy.
843 26d3fd2f Michael Hanselmann

844 26d3fd2f Michael Hanselmann
    """
845 26d3fd2f Michael Hanselmann
    self._timeout_strategy = \
846 26d3fd2f Michael Hanselmann
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
847 26d3fd2f Michael Hanselmann
848 26d3fd2f Michael Hanselmann
  def CheckPriorityIncrease(self):
849 26d3fd2f Michael Hanselmann
    """Checks whether priority can and should be increased.
850 26d3fd2f Michael Hanselmann

851 26d3fd2f Michael Hanselmann
    Called when locks couldn't be acquired.
852 26d3fd2f Michael Hanselmann

853 26d3fd2f Michael Hanselmann
    """
854 26d3fd2f Michael Hanselmann
    op = self.op
855 26d3fd2f Michael Hanselmann
856 26d3fd2f Michael Hanselmann
    # Exhausted all retries and next round should not use blocking acquire
857 26d3fd2f Michael Hanselmann
    # for locks?
858 26d3fd2f Michael Hanselmann
    if (self._timeout_strategy.Peek() is None and
859 26d3fd2f Michael Hanselmann
        op.priority > constants.OP_PRIO_HIGHEST):
860 26d3fd2f Michael Hanselmann
      logging.debug("Increasing priority")
861 26d3fd2f Michael Hanselmann
      op.priority -= 1
862 26d3fd2f Michael Hanselmann
      self._ResetTimeoutStrategy()
863 26d3fd2f Michael Hanselmann
      return True
864 26d3fd2f Michael Hanselmann
865 26d3fd2f Michael Hanselmann
    return False
866 26d3fd2f Michael Hanselmann
867 26d3fd2f Michael Hanselmann
  def GetNextLockTimeout(self):
868 26d3fd2f Michael Hanselmann
    """Returns the next lock acquire timeout.
869 26d3fd2f Michael Hanselmann

870 26d3fd2f Michael Hanselmann
    """
871 26d3fd2f Michael Hanselmann
    return self._timeout_strategy.Next()
872 26d3fd2f Michael Hanselmann
873 b80cc518 Michael Hanselmann
874 be760ba8 Michael Hanselmann
class _JobProcessor(object):
875 75d81fc8 Michael Hanselmann
  (DEFER,
876 75d81fc8 Michael Hanselmann
   WAITDEP,
877 75d81fc8 Michael Hanselmann
   FINISHED) = range(1, 4)
878 75d81fc8 Michael Hanselmann
879 26d3fd2f Michael Hanselmann
  def __init__(self, queue, opexec_fn, job,
880 26d3fd2f Michael Hanselmann
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
881 be760ba8 Michael Hanselmann
    """Initializes this class.
882 be760ba8 Michael Hanselmann

883 be760ba8 Michael Hanselmann
    """
884 be760ba8 Michael Hanselmann
    self.queue = queue
885 be760ba8 Michael Hanselmann
    self.opexec_fn = opexec_fn
886 be760ba8 Michael Hanselmann
    self.job = job
887 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = _timeout_strategy_factory
888 be760ba8 Michael Hanselmann
889 be760ba8 Michael Hanselmann
  @staticmethod
890 26d3fd2f Michael Hanselmann
  def _FindNextOpcode(job, timeout_strategy_factory):
891 be760ba8 Michael Hanselmann
    """Locates the next opcode to run.
892 be760ba8 Michael Hanselmann

893 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
894 be760ba8 Michael Hanselmann
    @param job: Job object
895 26d3fd2f Michael Hanselmann
    @param timeout_strategy_factory: Callable to create new timeout strategy
896 be760ba8 Michael Hanselmann

897 be760ba8 Michael Hanselmann
    """
898 be760ba8 Michael Hanselmann
    # Create some sort of a cache to speed up locating next opcode for future
899 be760ba8 Michael Hanselmann
    # lookups
900 be760ba8 Michael Hanselmann
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
901 be760ba8 Michael Hanselmann
    # pending and one for processed ops.
902 03b63608 Michael Hanselmann
    if job.ops_iter is None:
903 03b63608 Michael Hanselmann
      job.ops_iter = enumerate(job.ops)
904 be760ba8 Michael Hanselmann
905 be760ba8 Michael Hanselmann
    # Find next opcode to run
906 be760ba8 Michael Hanselmann
    while True:
907 be760ba8 Michael Hanselmann
      try:
908 03b63608 Michael Hanselmann
        (idx, op) = job.ops_iter.next()
909 be760ba8 Michael Hanselmann
      except StopIteration:
910 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for a finished job")
911 be760ba8 Michael Hanselmann
912 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_RUNNING:
913 be760ba8 Michael Hanselmann
        # Found an opcode already marked as running
914 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for job marked as running")
915 be760ba8 Michael Hanselmann
916 26d3fd2f Michael Hanselmann
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
917 26d3fd2f Michael Hanselmann
                             timeout_strategy_factory)
918 be760ba8 Michael Hanselmann
919 66bd7445 Michael Hanselmann
      if op.status not in constants.OPS_FINALIZED:
920 66bd7445 Michael Hanselmann
        return opctx
921 be760ba8 Michael Hanselmann
922 66bd7445 Michael Hanselmann
      # This is a job that was partially completed before master daemon
923 66bd7445 Michael Hanselmann
      # shutdown, so it can be expected that some opcodes are already
924 66bd7445 Michael Hanselmann
      # completed successfully (if any did error out, then the whole job
925 66bd7445 Michael Hanselmann
      # should have been aborted and not resubmitted for processing).
926 66bd7445 Michael Hanselmann
      logging.info("%s: opcode %s already processed, skipping",
927 66bd7445 Michael Hanselmann
                   opctx.log_prefix, opctx.summary)
928 be760ba8 Michael Hanselmann
929 be760ba8 Michael Hanselmann
  @staticmethod
930 be760ba8 Michael Hanselmann
  def _MarkWaitlock(job, op):
931 be760ba8 Michael Hanselmann
    """Marks an opcode as waiting for locks.
932 be760ba8 Michael Hanselmann

933 be760ba8 Michael Hanselmann
    The job's start timestamp is also set if necessary.
934 be760ba8 Michael Hanselmann

935 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
936 be760ba8 Michael Hanselmann
    @param job: Job object
937 a38e8674 Michael Hanselmann
    @type op: L{_QueuedOpCode}
938 a38e8674 Michael Hanselmann
    @param op: Opcode object
939 be760ba8 Michael Hanselmann

940 be760ba8 Michael Hanselmann
    """
941 be760ba8 Michael Hanselmann
    assert op in job.ops
942 5fd6b694 Michael Hanselmann
    assert op.status in (constants.OP_STATUS_QUEUED,
943 47099cd1 Michael Hanselmann
                         constants.OP_STATUS_WAITING)
944 5fd6b694 Michael Hanselmann
945 5fd6b694 Michael Hanselmann
    update = False
946 be760ba8 Michael Hanselmann
947 be760ba8 Michael Hanselmann
    op.result = None
948 5fd6b694 Michael Hanselmann
949 5fd6b694 Michael Hanselmann
    if op.status == constants.OP_STATUS_QUEUED:
950 47099cd1 Michael Hanselmann
      op.status = constants.OP_STATUS_WAITING
951 5fd6b694 Michael Hanselmann
      update = True
952 5fd6b694 Michael Hanselmann
953 5fd6b694 Michael Hanselmann
    if op.start_timestamp is None:
954 5fd6b694 Michael Hanselmann
      op.start_timestamp = TimeStampNow()
955 5fd6b694 Michael Hanselmann
      update = True
956 be760ba8 Michael Hanselmann
957 be760ba8 Michael Hanselmann
    if job.start_timestamp is None:
958 be760ba8 Michael Hanselmann
      job.start_timestamp = op.start_timestamp
959 5fd6b694 Michael Hanselmann
      update = True
960 5fd6b694 Michael Hanselmann
961 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
962 5fd6b694 Michael Hanselmann
963 5fd6b694 Michael Hanselmann
    return update
964 be760ba8 Michael Hanselmann
965 b95479a5 Michael Hanselmann
  @staticmethod
966 b95479a5 Michael Hanselmann
  def _CheckDependencies(queue, job, opctx):
967 b95479a5 Michael Hanselmann
    """Checks if an opcode has dependencies and if so, processes them.
968 b95479a5 Michael Hanselmann

969 b95479a5 Michael Hanselmann
    @type queue: L{JobQueue}
970 b95479a5 Michael Hanselmann
    @param queue: Queue object
971 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
972 b95479a5 Michael Hanselmann
    @param job: Job object
973 b95479a5 Michael Hanselmann
    @type opctx: L{_OpExecContext}
974 b95479a5 Michael Hanselmann
    @param opctx: Opcode execution context
975 b95479a5 Michael Hanselmann
    @rtype: bool
976 b95479a5 Michael Hanselmann
    @return: Whether opcode will be re-scheduled by dependency tracker
977 b95479a5 Michael Hanselmann

978 b95479a5 Michael Hanselmann
    """
979 b95479a5 Michael Hanselmann
    op = opctx.op
980 b95479a5 Michael Hanselmann
981 b95479a5 Michael Hanselmann
    result = False
982 b95479a5 Michael Hanselmann
983 b95479a5 Michael Hanselmann
    while opctx.jobdeps:
984 b95479a5 Michael Hanselmann
      (dep_job_id, dep_status) = opctx.jobdeps[0]
985 b95479a5 Michael Hanselmann
986 b95479a5 Michael Hanselmann
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
987 b95479a5 Michael Hanselmann
                                                          dep_status)
988 b95479a5 Michael Hanselmann
      assert ht.TNonEmptyString(depmsg), "No dependency message"
989 b95479a5 Michael Hanselmann
990 b95479a5 Michael Hanselmann
      logging.info("%s: %s", opctx.log_prefix, depmsg)
991 b95479a5 Michael Hanselmann
992 b95479a5 Michael Hanselmann
      if depresult == _JobDependencyManager.CONTINUE:
993 b95479a5 Michael Hanselmann
        # Remove dependency and continue
994 b95479a5 Michael Hanselmann
        opctx.jobdeps.pop(0)
995 b95479a5 Michael Hanselmann
996 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.WAIT:
997 b95479a5 Michael Hanselmann
        # Need to wait for notification, dependency tracker will re-add job
998 b95479a5 Michael Hanselmann
        # to workerpool
999 b95479a5 Michael Hanselmann
        result = True
1000 b95479a5 Michael Hanselmann
        break
1001 b95479a5 Michael Hanselmann
1002 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.CANCEL:
1003 b95479a5 Michael Hanselmann
        # Job was cancelled, cancel this job as well
1004 b95479a5 Michael Hanselmann
        job.Cancel()
1005 b95479a5 Michael Hanselmann
        assert op.status == constants.OP_STATUS_CANCELING
1006 b95479a5 Michael Hanselmann
        break
1007 b95479a5 Michael Hanselmann
1008 b95479a5 Michael Hanselmann
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1009 b95479a5 Michael Hanselmann
                         _JobDependencyManager.ERROR):
1010 b95479a5 Michael Hanselmann
        # Job failed or there was an error, this job must fail
1011 b95479a5 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1012 b95479a5 Michael Hanselmann
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1013 b95479a5 Michael Hanselmann
        break
1014 b95479a5 Michael Hanselmann
1015 b95479a5 Michael Hanselmann
      else:
1016 b95479a5 Michael Hanselmann
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1017 b95479a5 Michael Hanselmann
                                     depresult)
1018 b95479a5 Michael Hanselmann
1019 b95479a5 Michael Hanselmann
    return result
1020 b95479a5 Michael Hanselmann
1021 b80cc518 Michael Hanselmann
  def _ExecOpCodeUnlocked(self, opctx):
1022 be760ba8 Michael Hanselmann
    """Processes one opcode and returns the result.
1023 be760ba8 Michael Hanselmann

1024 be760ba8 Michael Hanselmann
    """
1025 b80cc518 Michael Hanselmann
    op = opctx.op
1026 b80cc518 Michael Hanselmann
1027 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1028 be760ba8 Michael Hanselmann
1029 26d3fd2f Michael Hanselmann
    timeout = opctx.GetNextLockTimeout()
1030 26d3fd2f Michael Hanselmann
1031 be760ba8 Michael Hanselmann
    try:
1032 be760ba8 Michael Hanselmann
      # Make sure not to hold queue lock while calling ExecOpCode
1033 be760ba8 Michael Hanselmann
      result = self.opexec_fn(op.input,
1034 26d3fd2f Michael Hanselmann
                              _OpExecCallbacks(self.queue, self.job, op),
1035 f23db633 Michael Hanselmann
                              timeout=timeout, priority=op.priority)
1036 26d3fd2f Michael Hanselmann
    except mcpu.LockAcquireTimeout:
1037 26d3fd2f Michael Hanselmann
      assert timeout is not None, "Received timeout for blocking acquire"
1038 26d3fd2f Michael Hanselmann
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1039 9e49dfc5 Michael Hanselmann
1040 47099cd1 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_WAITING,
1041 9e49dfc5 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1042 9e49dfc5 Michael Hanselmann
1043 9e49dfc5 Michael Hanselmann
      # Was job cancelled while we were waiting for the lock?
1044 9e49dfc5 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELING:
1045 9e49dfc5 Michael Hanselmann
        return (constants.OP_STATUS_CANCELING, None)
1046 9e49dfc5 Michael Hanselmann
1047 5fd6b694 Michael Hanselmann
      # Stay in waitlock while trying to re-acquire lock
1048 47099cd1 Michael Hanselmann
      return (constants.OP_STATUS_WAITING, None)
1049 be760ba8 Michael Hanselmann
    except CancelJob:
1050 b80cc518 Michael Hanselmann
      logging.exception("%s: Canceling job", opctx.log_prefix)
1051 be760ba8 Michael Hanselmann
      assert op.status == constants.OP_STATUS_CANCELING
1052 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_CANCELING, None)
1053 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1054 b80cc518 Michael Hanselmann
      logging.exception("%s: Caught exception in %s",
1055 b80cc518 Michael Hanselmann
                        opctx.log_prefix, opctx.summary)
1056 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1057 be760ba8 Michael Hanselmann
    else:
1058 b80cc518 Michael Hanselmann
      logging.debug("%s: %s successful",
1059 b80cc518 Michael Hanselmann
                    opctx.log_prefix, opctx.summary)
1060 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_SUCCESS, result)
1061 be760ba8 Michael Hanselmann
1062 26d3fd2f Michael Hanselmann
  def __call__(self, _nextop_fn=None):
1063 be760ba8 Michael Hanselmann
    """Continues execution of a job.
1064 be760ba8 Michael Hanselmann

1065 26d3fd2f Michael Hanselmann
    @param _nextop_fn: Callback function for tests
1066 75d81fc8 Michael Hanselmann
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1067 75d81fc8 Michael Hanselmann
      be deferred and C{WAITDEP} if the dependency manager
1068 75d81fc8 Michael Hanselmann
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1069 be760ba8 Michael Hanselmann

1070 be760ba8 Michael Hanselmann
    """
1071 be760ba8 Michael Hanselmann
    queue = self.queue
1072 be760ba8 Michael Hanselmann
    job = self.job
1073 be760ba8 Michael Hanselmann
1074 be760ba8 Michael Hanselmann
    logging.debug("Processing job %s", job.id)
1075 be760ba8 Michael Hanselmann
1076 be760ba8 Michael Hanselmann
    queue.acquire(shared=1)
1077 be760ba8 Michael Hanselmann
    try:
1078 be760ba8 Michael Hanselmann
      opcount = len(job.ops)
1079 be760ba8 Michael Hanselmann
1080 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Expected writable job"
1081 c0f6d0d8 Michael Hanselmann
1082 66bd7445 Michael Hanselmann
      # Don't do anything for finalized jobs
1083 66bd7445 Michael Hanselmann
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1084 75d81fc8 Michael Hanselmann
        return self.FINISHED
1085 66bd7445 Michael Hanselmann
1086 26d3fd2f Michael Hanselmann
      # Is a previous opcode still pending?
1087 26d3fd2f Michael Hanselmann
      if job.cur_opctx:
1088 26d3fd2f Michael Hanselmann
        opctx = job.cur_opctx
1089 5fd6b694 Michael Hanselmann
        job.cur_opctx = None
1090 26d3fd2f Michael Hanselmann
      else:
1091 26d3fd2f Michael Hanselmann
        if __debug__ and _nextop_fn:
1092 26d3fd2f Michael Hanselmann
          _nextop_fn()
1093 26d3fd2f Michael Hanselmann
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1094 26d3fd2f Michael Hanselmann
1095 b80cc518 Michael Hanselmann
      op = opctx.op
1096 be760ba8 Michael Hanselmann
1097 be760ba8 Michael Hanselmann
      # Consistency check
1098 be760ba8 Michael Hanselmann
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1099 66bd7445 Michael Hanselmann
                                     constants.OP_STATUS_CANCELING)
1100 5fd6b694 Michael Hanselmann
                        for i in job.ops[opctx.index + 1:])
1101 be760ba8 Michael Hanselmann
1102 be760ba8 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_QUEUED,
1103 47099cd1 Michael Hanselmann
                           constants.OP_STATUS_WAITING,
1104 66bd7445 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1105 be760ba8 Michael Hanselmann
1106 26d3fd2f Michael Hanselmann
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1107 26d3fd2f Michael Hanselmann
              op.priority >= constants.OP_PRIO_HIGHEST)
1108 26d3fd2f Michael Hanselmann
1109 b95479a5 Michael Hanselmann
      waitjob = None
1110 b95479a5 Michael Hanselmann
1111 66bd7445 Michael Hanselmann
      if op.status != constants.OP_STATUS_CANCELING:
1112 30c945d0 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
1113 47099cd1 Michael Hanselmann
                             constants.OP_STATUS_WAITING)
1114 30c945d0 Michael Hanselmann
1115 be760ba8 Michael Hanselmann
        # Prepare to start opcode
1116 5fd6b694 Michael Hanselmann
        if self._MarkWaitlock(job, op):
1117 5fd6b694 Michael Hanselmann
          # Write to disk
1118 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1119 be760ba8 Michael Hanselmann
1120 47099cd1 Michael Hanselmann
        assert op.status == constants.OP_STATUS_WAITING
1121 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1122 5fd6b694 Michael Hanselmann
        assert job.start_timestamp and op.start_timestamp
1123 b95479a5 Michael Hanselmann
        assert waitjob is None
1124 b95479a5 Michael Hanselmann
1125 b95479a5 Michael Hanselmann
        # Check if waiting for a job is necessary
1126 b95479a5 Michael Hanselmann
        waitjob = self._CheckDependencies(queue, job, opctx)
1127 be760ba8 Michael Hanselmann
1128 47099cd1 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_WAITING,
1129 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_CANCELING,
1130 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_ERROR)
1131 be760ba8 Michael Hanselmann
1132 b95479a5 Michael Hanselmann
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1133 b95479a5 Michael Hanselmann
                                         constants.OP_STATUS_ERROR)):
1134 b95479a5 Michael Hanselmann
          logging.info("%s: opcode %s waiting for locks",
1135 b95479a5 Michael Hanselmann
                       opctx.log_prefix, opctx.summary)
1136 be760ba8 Michael Hanselmann
1137 b95479a5 Michael Hanselmann
          assert not opctx.jobdeps, "Not all dependencies were removed"
1138 b95479a5 Michael Hanselmann
1139 b95479a5 Michael Hanselmann
          queue.release()
1140 b95479a5 Michael Hanselmann
          try:
1141 b95479a5 Michael Hanselmann
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1142 b95479a5 Michael Hanselmann
          finally:
1143 b95479a5 Michael Hanselmann
            queue.acquire(shared=1)
1144 b95479a5 Michael Hanselmann
1145 b95479a5 Michael Hanselmann
          op.status = op_status
1146 b95479a5 Michael Hanselmann
          op.result = op_result
1147 b95479a5 Michael Hanselmann
1148 b95479a5 Michael Hanselmann
          assert not waitjob
1149 be760ba8 Michael Hanselmann
1150 47099cd1 Michael Hanselmann
        if op.status == constants.OP_STATUS_WAITING:
1151 26d3fd2f Michael Hanselmann
          # Couldn't get locks in time
1152 26d3fd2f Michael Hanselmann
          assert not op.end_timestamp
1153 be760ba8 Michael Hanselmann
        else:
1154 26d3fd2f Michael Hanselmann
          # Finalize opcode
1155 26d3fd2f Michael Hanselmann
          op.end_timestamp = TimeStampNow()
1156 be760ba8 Michael Hanselmann
1157 26d3fd2f Michael Hanselmann
          if op.status == constants.OP_STATUS_CANCELING:
1158 26d3fd2f Michael Hanselmann
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1159 26d3fd2f Michael Hanselmann
                                  for i in job.ops[opctx.index:])
1160 26d3fd2f Michael Hanselmann
          else:
1161 26d3fd2f Michael Hanselmann
            assert op.status in constants.OPS_FINALIZED
1162 be760ba8 Michael Hanselmann
1163 47099cd1 Michael Hanselmann
      if op.status == constants.OP_STATUS_WAITING or waitjob:
1164 be760ba8 Michael Hanselmann
        finalize = False
1165 be760ba8 Michael Hanselmann
1166 b95479a5 Michael Hanselmann
        if not waitjob and opctx.CheckPriorityIncrease():
1167 5fd6b694 Michael Hanselmann
          # Priority was changed, need to update on-disk file
1168 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1169 be760ba8 Michael Hanselmann
1170 26d3fd2f Michael Hanselmann
        # Keep around for another round
1171 26d3fd2f Michael Hanselmann
        job.cur_opctx = opctx
1172 be760ba8 Michael Hanselmann
1173 26d3fd2f Michael Hanselmann
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1174 26d3fd2f Michael Hanselmann
                op.priority >= constants.OP_PRIO_HIGHEST)
1175 be760ba8 Michael Hanselmann
1176 26d3fd2f Michael Hanselmann
        # In no case must the status be finalized here
1177 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1178 be760ba8 Michael Hanselmann
1179 be760ba8 Michael Hanselmann
      else:
1180 26d3fd2f Michael Hanselmann
        # Ensure all opcodes so far have been successful
1181 26d3fd2f Michael Hanselmann
        assert (opctx.index == 0 or
1182 26d3fd2f Michael Hanselmann
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1183 26d3fd2f Michael Hanselmann
                           for i in job.ops[:opctx.index]))
1184 26d3fd2f Michael Hanselmann
1185 26d3fd2f Michael Hanselmann
        # Reset context
1186 26d3fd2f Michael Hanselmann
        job.cur_opctx = None
1187 26d3fd2f Michael Hanselmann
1188 26d3fd2f Michael Hanselmann
        if op.status == constants.OP_STATUS_SUCCESS:
1189 26d3fd2f Michael Hanselmann
          finalize = False
1190 26d3fd2f Michael Hanselmann
1191 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_ERROR:
1192 26d3fd2f Michael Hanselmann
          # Ensure failed opcode has an exception as its result
1193 26d3fd2f Michael Hanselmann
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1194 26d3fd2f Michael Hanselmann
1195 26d3fd2f Michael Hanselmann
          to_encode = errors.OpExecError("Preceding opcode failed")
1196 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1197 26d3fd2f Michael Hanselmann
                                _EncodeOpError(to_encode))
1198 26d3fd2f Michael Hanselmann
          finalize = True
1199 be760ba8 Michael Hanselmann
1200 26d3fd2f Michael Hanselmann
          # Consistency check
1201 26d3fd2f Michael Hanselmann
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1202 26d3fd2f Michael Hanselmann
                            errors.GetEncodedError(i.result)
1203 26d3fd2f Michael Hanselmann
                            for i in job.ops[opctx.index:])
1204 be760ba8 Michael Hanselmann
1205 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELING:
1206 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1207 26d3fd2f Michael Hanselmann
                                "Job canceled by request")
1208 26d3fd2f Michael Hanselmann
          finalize = True
1209 26d3fd2f Michael Hanselmann
1210 26d3fd2f Michael Hanselmann
        else:
1211 26d3fd2f Michael Hanselmann
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1212 26d3fd2f Michael Hanselmann
1213 66bd7445 Michael Hanselmann
        if opctx.index == (opcount - 1):
1214 66bd7445 Michael Hanselmann
          # Finalize on last opcode
1215 66bd7445 Michael Hanselmann
          finalize = True
1216 66bd7445 Michael Hanselmann
1217 66bd7445 Michael Hanselmann
        if finalize:
1218 26d3fd2f Michael Hanselmann
          # All opcodes have been run, finalize job
1219 66bd7445 Michael Hanselmann
          job.Finalize()
1220 26d3fd2f Michael Hanselmann
1221 26d3fd2f Michael Hanselmann
        # Write to disk. If the job status is final, this is the final write
1222 26d3fd2f Michael Hanselmann
        # allowed. Once the file has been written, it can be archived anytime.
1223 26d3fd2f Michael Hanselmann
        queue.UpdateJobUnlocked(job)
1224 be760ba8 Michael Hanselmann
1225 b95479a5 Michael Hanselmann
        assert not waitjob
1226 b95479a5 Michael Hanselmann
1227 66bd7445 Michael Hanselmann
        if finalize:
1228 26d3fd2f Michael Hanselmann
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1229 75d81fc8 Michael Hanselmann
          return self.FINISHED
1230 be760ba8 Michael Hanselmann
1231 b95479a5 Michael Hanselmann
      assert not waitjob or queue.depmgr.JobWaiting(job)
1232 b95479a5 Michael Hanselmann
1233 75d81fc8 Michael Hanselmann
      if waitjob:
1234 75d81fc8 Michael Hanselmann
        return self.WAITDEP
1235 75d81fc8 Michael Hanselmann
      else:
1236 75d81fc8 Michael Hanselmann
        return self.DEFER
1237 be760ba8 Michael Hanselmann
    finally:
1238 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Job became read-only while being processed"
1239 be760ba8 Michael Hanselmann
      queue.release()
1240 be760ba8 Michael Hanselmann
1241 be760ba8 Michael Hanselmann
1242 df5a5730 Michael Hanselmann
def _EvaluateJobProcessorResult(depmgr, job, result):
1243 df5a5730 Michael Hanselmann
  """Looks at a result from L{_JobProcessor} for a job.
1244 df5a5730 Michael Hanselmann

1245 df5a5730 Michael Hanselmann
  To be used in a L{_JobQueueWorker}.
1246 df5a5730 Michael Hanselmann

1247 df5a5730 Michael Hanselmann
  """
1248 df5a5730 Michael Hanselmann
  if result == _JobProcessor.FINISHED:
1249 df5a5730 Michael Hanselmann
    # Notify waiting jobs
1250 df5a5730 Michael Hanselmann
    depmgr.NotifyWaiters(job.id)
1251 df5a5730 Michael Hanselmann
1252 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.DEFER:
1253 df5a5730 Michael Hanselmann
    # Schedule again
1254 df5a5730 Michael Hanselmann
    raise workerpool.DeferTask(priority=job.CalcPriority())
1255 df5a5730 Michael Hanselmann
1256 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.WAITDEP:
1257 df5a5730 Michael Hanselmann
    # No-op, dependency manager will re-schedule
1258 df5a5730 Michael Hanselmann
    pass
1259 df5a5730 Michael Hanselmann
1260 df5a5730 Michael Hanselmann
  else:
1261 df5a5730 Michael Hanselmann
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1262 df5a5730 Michael Hanselmann
                                 (result, ))
1263 df5a5730 Michael Hanselmann
1264 df5a5730 Michael Hanselmann
1265 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
1266 031a3e57 Michael Hanselmann
  """The actual job workers.
1267 031a3e57 Michael Hanselmann

1268 031a3e57 Michael Hanselmann
  """
1269 b459a848 Andrea Spadaccini
  def RunTask(self, job): # pylint: disable=W0221
1270 e2715f69 Michael Hanselmann
    """Job executor.
1271 e2715f69 Michael Hanselmann

1272 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1273 ea03467c Iustin Pop
    @param job: the job to be processed
1274 ea03467c Iustin Pop

1275 e2715f69 Michael Hanselmann
    """
1276 f8a4adfa Michael Hanselmann
    assert job.writable, "Expected writable job"
1277 f8a4adfa Michael Hanselmann
1278 b95479a5 Michael Hanselmann
    # Ensure only one worker is active on a single job. If a job registers for
1279 b95479a5 Michael Hanselmann
    # a dependency job, and the other job notifies before the first worker is
1280 b95479a5 Michael Hanselmann
    # done, the job can end up in the tasklist more than once.
1281 b95479a5 Michael Hanselmann
    job.processor_lock.acquire()
1282 b95479a5 Michael Hanselmann
    try:
1283 b95479a5 Michael Hanselmann
      return self._RunTaskInner(job)
1284 b95479a5 Michael Hanselmann
    finally:
1285 b95479a5 Michael Hanselmann
      job.processor_lock.release()
1286 b95479a5 Michael Hanselmann
1287 b95479a5 Michael Hanselmann
  def _RunTaskInner(self, job):
1288 b95479a5 Michael Hanselmann
    """Executes a job.
1289 b95479a5 Michael Hanselmann

1290 b95479a5 Michael Hanselmann
    Must be called with per-job lock acquired.
1291 b95479a5 Michael Hanselmann

1292 b95479a5 Michael Hanselmann
    """
1293 be760ba8 Michael Hanselmann
    queue = job.queue
1294 be760ba8 Michael Hanselmann
    assert queue == self.pool.queue
1295 be760ba8 Michael Hanselmann
1296 0aeeb6e3 Michael Hanselmann
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1297 0aeeb6e3 Michael Hanselmann
    setname_fn(None)
1298 daba67c7 Michael Hanselmann
1299 be760ba8 Michael Hanselmann
    proc = mcpu.Processor(queue.context, job.id)
1300 be760ba8 Michael Hanselmann
1301 0aeeb6e3 Michael Hanselmann
    # Create wrapper for setting thread name
1302 0aeeb6e3 Michael Hanselmann
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1303 0aeeb6e3 Michael Hanselmann
                                    proc.ExecOpCode)
1304 0aeeb6e3 Michael Hanselmann
1305 df5a5730 Michael Hanselmann
    _EvaluateJobProcessorResult(queue.depmgr, job,
1306 df5a5730 Michael Hanselmann
                                _JobProcessor(queue, wrap_execop_fn, job)())
1307 75d81fc8 Michael Hanselmann
1308 0aeeb6e3 Michael Hanselmann
  @staticmethod
1309 0aeeb6e3 Michael Hanselmann
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1310 0aeeb6e3 Michael Hanselmann
    """Updates the worker thread name to include a short summary of the opcode.
1311 0aeeb6e3 Michael Hanselmann

1312 0aeeb6e3 Michael Hanselmann
    @param setname_fn: Callable setting worker thread name
1313 0aeeb6e3 Michael Hanselmann
    @param execop_fn: Callable for executing opcode (usually
1314 0aeeb6e3 Michael Hanselmann
                      L{mcpu.Processor.ExecOpCode})
1315 0aeeb6e3 Michael Hanselmann

1316 0aeeb6e3 Michael Hanselmann
    """
1317 0aeeb6e3 Michael Hanselmann
    setname_fn(op)
1318 0aeeb6e3 Michael Hanselmann
    try:
1319 0aeeb6e3 Michael Hanselmann
      return execop_fn(op, *args, **kwargs)
1320 0aeeb6e3 Michael Hanselmann
    finally:
1321 0aeeb6e3 Michael Hanselmann
      setname_fn(None)
1322 0aeeb6e3 Michael Hanselmann
1323 0aeeb6e3 Michael Hanselmann
  @staticmethod
1324 0aeeb6e3 Michael Hanselmann
  def _GetWorkerName(job, op):
1325 0aeeb6e3 Michael Hanselmann
    """Sets the worker thread name.
1326 0aeeb6e3 Michael Hanselmann

1327 0aeeb6e3 Michael Hanselmann
    @type job: L{_QueuedJob}
1328 0aeeb6e3 Michael Hanselmann
    @type op: L{opcodes.OpCode}
1329 0aeeb6e3 Michael Hanselmann

1330 0aeeb6e3 Michael Hanselmann
    """
1331 0aeeb6e3 Michael Hanselmann
    parts = ["Job%s" % job.id]
1332 0aeeb6e3 Michael Hanselmann
1333 0aeeb6e3 Michael Hanselmann
    if op:
1334 0aeeb6e3 Michael Hanselmann
      parts.append(op.TinySummary())
1335 0aeeb6e3 Michael Hanselmann
1336 0aeeb6e3 Michael Hanselmann
    return "/".join(parts)
1337 0aeeb6e3 Michael Hanselmann
1338 e2715f69 Michael Hanselmann
1339 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
1340 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
1341 ea03467c Iustin Pop

1342 ea03467c Iustin Pop
  """
1343 5bdce580 Michael Hanselmann
  def __init__(self, queue):
1344 0aeeb6e3 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("Jq",
1345 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
1346 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
1347 5bdce580 Michael Hanselmann
    self.queue = queue
1348 e2715f69 Michael Hanselmann
1349 e2715f69 Michael Hanselmann
1350 b95479a5 Michael Hanselmann
class _JobDependencyManager:
1351 b95479a5 Michael Hanselmann
  """Keeps track of job dependencies.
1352 b95479a5 Michael Hanselmann

1353 b95479a5 Michael Hanselmann
  """
1354 b95479a5 Michael Hanselmann
  (WAIT,
1355 b95479a5 Michael Hanselmann
   ERROR,
1356 b95479a5 Michael Hanselmann
   CANCEL,
1357 b95479a5 Michael Hanselmann
   CONTINUE,
1358 b95479a5 Michael Hanselmann
   WRONGSTATUS) = range(1, 6)
1359 b95479a5 Michael Hanselmann
1360 b95479a5 Michael Hanselmann
  def __init__(self, getstatus_fn, enqueue_fn):
1361 b95479a5 Michael Hanselmann
    """Initializes this class.
1362 b95479a5 Michael Hanselmann

1363 b95479a5 Michael Hanselmann
    """
1364 b95479a5 Michael Hanselmann
    self._getstatus_fn = getstatus_fn
1365 b95479a5 Michael Hanselmann
    self._enqueue_fn = enqueue_fn
1366 b95479a5 Michael Hanselmann
1367 b95479a5 Michael Hanselmann
    self._waiters = {}
1368 b95479a5 Michael Hanselmann
    self._lock = locking.SharedLock("JobDepMgr")
1369 b95479a5 Michael Hanselmann
1370 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1371 b459a848 Andrea Spadaccini
  def GetLockInfo(self, requested): # pylint: disable=W0613
1372 fcb21ad7 Michael Hanselmann
    """Retrieves information about waiting jobs.
1373 fcb21ad7 Michael Hanselmann

1374 fcb21ad7 Michael Hanselmann
    @type requested: set
1375 fcb21ad7 Michael Hanselmann
    @param requested: Requested information, see C{query.LQ_*}
1376 fcb21ad7 Michael Hanselmann

1377 fcb21ad7 Michael Hanselmann
    """
1378 fcb21ad7 Michael Hanselmann
    # No need to sort here, that's being done by the lock manager and query
1379 fcb21ad7 Michael Hanselmann
    # library. There are no priorities for notifying jobs, hence all show up as
1380 fcb21ad7 Michael Hanselmann
    # one item under "pending".
1381 fcb21ad7 Michael Hanselmann
    return [("job/%s" % job_id, None, None,
1382 fcb21ad7 Michael Hanselmann
             [("job", [job.id for job in waiters])])
1383 fcb21ad7 Michael Hanselmann
            for job_id, waiters in self._waiters.items()
1384 fcb21ad7 Michael Hanselmann
            if waiters]
1385 fcb21ad7 Michael Hanselmann
1386 fcb21ad7 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1387 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
1388 b95479a5 Michael Hanselmann
    """Checks if a job is waiting.
1389 b95479a5 Michael Hanselmann

1390 b95479a5 Michael Hanselmann
    """
1391 b95479a5 Michael Hanselmann
    return compat.any(job in jobs
1392 b95479a5 Michael Hanselmann
                      for jobs in self._waiters.values())
1393 b95479a5 Michael Hanselmann
1394 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1395 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1396 b95479a5 Michael Hanselmann
    """Checks if a dependency job has the requested status.
1397 b95479a5 Michael Hanselmann

1398 b95479a5 Michael Hanselmann
    If the other job is not yet in a finalized status, the calling job will be
1399 b95479a5 Michael Hanselmann
    notified (re-added to the workerpool) at a later point.
1400 b95479a5 Michael Hanselmann

1401 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1402 b95479a5 Michael Hanselmann
    @param job: Job object
1403 76b62028 Iustin Pop
    @type dep_job_id: int
1404 b95479a5 Michael Hanselmann
    @param dep_job_id: ID of dependency job
1405 b95479a5 Michael Hanselmann
    @type dep_status: list
1406 b95479a5 Michael Hanselmann
    @param dep_status: Required status
1407 b95479a5 Michael Hanselmann

1408 b95479a5 Michael Hanselmann
    """
1409 76b62028 Iustin Pop
    assert ht.TJobId(job.id)
1410 76b62028 Iustin Pop
    assert ht.TJobId(dep_job_id)
1411 b95479a5 Michael Hanselmann
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1412 b95479a5 Michael Hanselmann
1413 b95479a5 Michael Hanselmann
    if job.id == dep_job_id:
1414 b95479a5 Michael Hanselmann
      return (self.ERROR, "Job can't depend on itself")
1415 b95479a5 Michael Hanselmann
1416 b95479a5 Michael Hanselmann
    # Get status of dependency job
1417 b95479a5 Michael Hanselmann
    try:
1418 b95479a5 Michael Hanselmann
      status = self._getstatus_fn(dep_job_id)
1419 b95479a5 Michael Hanselmann
    except errors.JobLost, err:
1420 b95479a5 Michael Hanselmann
      return (self.ERROR, "Dependency error: %s" % err)
1421 b95479a5 Michael Hanselmann
1422 b95479a5 Michael Hanselmann
    assert status in constants.JOB_STATUS_ALL
1423 b95479a5 Michael Hanselmann
1424 b95479a5 Michael Hanselmann
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1425 b95479a5 Michael Hanselmann
1426 b95479a5 Michael Hanselmann
    if status not in constants.JOBS_FINALIZED:
1427 b95479a5 Michael Hanselmann
      # Register for notification and wait for job to finish
1428 b95479a5 Michael Hanselmann
      job_id_waiters.add(job)
1429 b95479a5 Michael Hanselmann
      return (self.WAIT,
1430 b95479a5 Michael Hanselmann
              "Need to wait for job %s, wanted status '%s'" %
1431 b95479a5 Michael Hanselmann
              (dep_job_id, dep_status))
1432 b95479a5 Michael Hanselmann
1433 b95479a5 Michael Hanselmann
    # Remove from waiters list
1434 b95479a5 Michael Hanselmann
    if job in job_id_waiters:
1435 b95479a5 Michael Hanselmann
      job_id_waiters.remove(job)
1436 b95479a5 Michael Hanselmann
1437 b95479a5 Michael Hanselmann
    if (status == constants.JOB_STATUS_CANCELED and
1438 b95479a5 Michael Hanselmann
        constants.JOB_STATUS_CANCELED not in dep_status):
1439 b95479a5 Michael Hanselmann
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1440 b95479a5 Michael Hanselmann
1441 b95479a5 Michael Hanselmann
    elif not dep_status or status in dep_status:
1442 b95479a5 Michael Hanselmann
      return (self.CONTINUE,
1443 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s'" %
1444 b95479a5 Michael Hanselmann
              (dep_job_id, status))
1445 b95479a5 Michael Hanselmann
1446 b95479a5 Michael Hanselmann
    else:
1447 b95479a5 Michael Hanselmann
      return (self.WRONGSTATUS,
1448 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s',"
1449 b95479a5 Michael Hanselmann
              " not one of '%s' as required" %
1450 b95479a5 Michael Hanselmann
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1451 b95479a5 Michael Hanselmann
1452 37d76f1e Michael Hanselmann
  def _RemoveEmptyWaitersUnlocked(self):
1453 37d76f1e Michael Hanselmann
    """Remove all jobs without actual waiters.
1454 37d76f1e Michael Hanselmann

1455 37d76f1e Michael Hanselmann
    """
1456 37d76f1e Michael Hanselmann
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1457 37d76f1e Michael Hanselmann
                   if not waiters]:
1458 37d76f1e Michael Hanselmann
      del self._waiters[job_id]
1459 37d76f1e Michael Hanselmann
1460 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
1461 b95479a5 Michael Hanselmann
    """Notifies all jobs waiting for a certain job ID.
1462 b95479a5 Michael Hanselmann

1463 1316ebc2 Michael Hanselmann
    @attention: Do not call until L{CheckAndRegister} returned a status other
1464 1316ebc2 Michael Hanselmann
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1465 76b62028 Iustin Pop
    @type job_id: int
1466 b95479a5 Michael Hanselmann
    @param job_id: Job ID
1467 b95479a5 Michael Hanselmann

1468 b95479a5 Michael Hanselmann
    """
1469 76b62028 Iustin Pop
    assert ht.TJobId(job_id)
1470 b95479a5 Michael Hanselmann
1471 37d76f1e Michael Hanselmann
    self._lock.acquire()
1472 37d76f1e Michael Hanselmann
    try:
1473 37d76f1e Michael Hanselmann
      self._RemoveEmptyWaitersUnlocked()
1474 37d76f1e Michael Hanselmann
1475 37d76f1e Michael Hanselmann
      jobs = self._waiters.pop(job_id, None)
1476 37d76f1e Michael Hanselmann
    finally:
1477 37d76f1e Michael Hanselmann
      self._lock.release()
1478 37d76f1e Michael Hanselmann
1479 b95479a5 Michael Hanselmann
    if jobs:
1480 b95479a5 Michael Hanselmann
      # Re-add jobs to workerpool
1481 b95479a5 Michael Hanselmann
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1482 b95479a5 Michael Hanselmann
                    len(jobs), job_id)
1483 b95479a5 Michael Hanselmann
      self._enqueue_fn(jobs)
1484 b95479a5 Michael Hanselmann
1485 b95479a5 Michael Hanselmann
1486 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
1487 6c881c52 Iustin Pop
  """Decorator for "public" functions.
1488 ea03467c Iustin Pop

1489 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
1490 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
1491 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
1492 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
1493 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
1494 ea03467c Iustin Pop

1495 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
1496 f1da30e6 Michael Hanselmann

1497 6c881c52 Iustin Pop
  Example::
1498 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
1499 6c881c52 Iustin Pop
    @_RequireOpenQueue
1500 6c881c52 Iustin Pop
    def Example(self):
1501 6c881c52 Iustin Pop
      pass
1502 db37da70 Michael Hanselmann

1503 6c881c52 Iustin Pop
  """
1504 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
1505 b459a848 Andrea Spadaccini
    # pylint: disable=W0212
1506 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
1507 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
1508 6c881c52 Iustin Pop
  return wrapper
1509 db37da70 Michael Hanselmann
1510 db37da70 Michael Hanselmann
1511 c8d0be94 Michael Hanselmann
def _RequireNonDrainedQueue(fn):
1512 c8d0be94 Michael Hanselmann
  """Decorator checking for a non-drained queue.
1513 c8d0be94 Michael Hanselmann

1514 c8d0be94 Michael Hanselmann
  To be used with functions submitting new jobs.
1515 c8d0be94 Michael Hanselmann

1516 c8d0be94 Michael Hanselmann
  """
1517 c8d0be94 Michael Hanselmann
  def wrapper(self, *args, **kwargs):
1518 c8d0be94 Michael Hanselmann
    """Wrapper function.
1519 c8d0be94 Michael Hanselmann

1520 c8d0be94 Michael Hanselmann
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1521 c8d0be94 Michael Hanselmann

1522 c8d0be94 Michael Hanselmann
    """
1523 c8d0be94 Michael Hanselmann
    # Ok when sharing the big job queue lock, as the drain file is created when
1524 c8d0be94 Michael Hanselmann
    # the lock is exclusive.
1525 c8d0be94 Michael Hanselmann
    # Needs access to protected member, pylint: disable=W0212
1526 c8d0be94 Michael Hanselmann
    if self._drained:
1527 c8d0be94 Michael Hanselmann
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1528 6d5ea385 Michael Hanselmann
1529 6d5ea385 Michael Hanselmann
    if not self._accepting_jobs:
1530 6d5ea385 Michael Hanselmann
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1531 6d5ea385 Michael Hanselmann
1532 c8d0be94 Michael Hanselmann
    return fn(self, *args, **kwargs)
1533 c8d0be94 Michael Hanselmann
  return wrapper
1534 c8d0be94 Michael Hanselmann
1535 c8d0be94 Michael Hanselmann
1536 6c881c52 Iustin Pop
class JobQueue(object):
1537 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
1538 db37da70 Michael Hanselmann

1539 6c881c52 Iustin Pop
  """
1540 85f03e0d Michael Hanselmann
  def __init__(self, context):
1541 ea03467c Iustin Pop
    """Constructor for JobQueue.
1542 ea03467c Iustin Pop

1543 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
1544 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
1545 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
1546 ea03467c Iustin Pop
    running).
1547 ea03467c Iustin Pop

1548 ea03467c Iustin Pop
    @type context: GanetiContext
1549 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
1550 ea03467c Iustin Pop
        data and other ganeti objects
1551 ea03467c Iustin Pop

1552 ea03467c Iustin Pop
    """
1553 5bdce580 Michael Hanselmann
    self.context = context
1554 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
1555 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
1556 f1da30e6 Michael Hanselmann
1557 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
1558 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
1559 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
1560 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
1561 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
1562 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
1563 ebb80afa Guido Trotter
1564 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
1565 ebb80afa Guido Trotter
    self.release = self._lock.release
1566 85f03e0d Michael Hanselmann
1567 6d5ea385 Michael Hanselmann
    # Accept jobs by default
1568 6d5ea385 Michael Hanselmann
    self._accepting_jobs = True
1569 6d5ea385 Michael Hanselmann
1570 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
1571 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
1572 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1573 f1da30e6 Michael Hanselmann
1574 04ab05ce Michael Hanselmann
    # Read serial file
1575 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
1576 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
1577 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
1578 c4beba1c Iustin Pop
1579 23752136 Michael Hanselmann
    # Get initial list of nodes
1580 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
1581 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
1582 59303563 Iustin Pop
                       if n.master_candidate)
1583 8e00939c Michael Hanselmann
1584 8e00939c Michael Hanselmann
    # Remove master node
1585 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
1586 23752136 Michael Hanselmann
1587 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
1588 23752136 Michael Hanselmann
1589 6d5ea385 Michael Hanselmann
    self._queue_size = None
1590 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1591 6d5ea385 Michael Hanselmann
    assert ht.TInt(self._queue_size)
1592 ff699aa9 Michael Hanselmann
    self._drained = jstore.CheckDrainFlag()
1593 20571a26 Guido Trotter
1594 b95479a5 Michael Hanselmann
    # Job dependencies
1595 b95479a5 Michael Hanselmann
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1596 b95479a5 Michael Hanselmann
                                        self._EnqueueJobs)
1597 fcb21ad7 Michael Hanselmann
    self.context.glm.AddToLockMonitor(self.depmgr)
1598 b95479a5 Michael Hanselmann
1599 85f03e0d Michael Hanselmann
    # Setup worker pool
1600 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
1601 85f03e0d Michael Hanselmann
    try:
1602 de9d02c7 Michael Hanselmann
      self._InspectQueue()
1603 de9d02c7 Michael Hanselmann
    except:
1604 de9d02c7 Michael Hanselmann
      self._wpool.TerminateWorkers()
1605 de9d02c7 Michael Hanselmann
      raise
1606 711b5124 Michael Hanselmann
1607 de9d02c7 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1608 de9d02c7 Michael Hanselmann
  @_RequireOpenQueue
1609 de9d02c7 Michael Hanselmann
  def _InspectQueue(self):
1610 de9d02c7 Michael Hanselmann
    """Loads the whole job queue and resumes unfinished jobs.
1611 de9d02c7 Michael Hanselmann

1612 de9d02c7 Michael Hanselmann
    This function needs the lock here because WorkerPool.AddTask() may start a
1613 de9d02c7 Michael Hanselmann
    job while we're still doing our work.
1614 711b5124 Michael Hanselmann

1615 de9d02c7 Michael Hanselmann
    """
1616 de9d02c7 Michael Hanselmann
    logging.info("Inspecting job queue")
1617 de9d02c7 Michael Hanselmann
1618 7b5c4a69 Michael Hanselmann
    restartjobs = []
1619 7b5c4a69 Michael Hanselmann
1620 de9d02c7 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked()
1621 de9d02c7 Michael Hanselmann
    jobs_count = len(all_job_ids)
1622 de9d02c7 Michael Hanselmann
    lastinfo = time.time()
1623 de9d02c7 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1624 de9d02c7 Michael Hanselmann
      # Give an update every 1000 jobs or 10 seconds
1625 de9d02c7 Michael Hanselmann
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1626 de9d02c7 Michael Hanselmann
          idx == (jobs_count - 1)):
1627 de9d02c7 Michael Hanselmann
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1628 de9d02c7 Michael Hanselmann
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1629 711b5124 Michael Hanselmann
        lastinfo = time.time()
1630 94ed59a5 Iustin Pop
1631 de9d02c7 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1632 85f03e0d Michael Hanselmann
1633 de9d02c7 Michael Hanselmann
      # a failure in loading the job can cause 'None' to be returned
1634 de9d02c7 Michael Hanselmann
      if job is None:
1635 de9d02c7 Michael Hanselmann
        continue
1636 85f03e0d Michael Hanselmann
1637 de9d02c7 Michael Hanselmann
      status = job.CalcStatus()
1638 711b5124 Michael Hanselmann
1639 320d1daf Michael Hanselmann
      if status == constants.JOB_STATUS_QUEUED:
1640 7b5c4a69 Michael Hanselmann
        restartjobs.append(job)
1641 de9d02c7 Michael Hanselmann
1642 de9d02c7 Michael Hanselmann
      elif status in (constants.JOB_STATUS_RUNNING,
1643 47099cd1 Michael Hanselmann
                      constants.JOB_STATUS_WAITING,
1644 de9d02c7 Michael Hanselmann
                      constants.JOB_STATUS_CANCELING):
1645 de9d02c7 Michael Hanselmann
        logging.warning("Unfinished job %s found: %s", job.id, job)
1646 320d1daf Michael Hanselmann
1647 47099cd1 Michael Hanselmann
        if status == constants.JOB_STATUS_WAITING:
1648 320d1daf Michael Hanselmann
          # Restart job
1649 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1650 320d1daf Michael Hanselmann
          restartjobs.append(job)
1651 320d1daf Michael Hanselmann
        else:
1652 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1653 320d1daf Michael Hanselmann
                                "Unclean master daemon shutdown")
1654 45df0793 Michael Hanselmann
          job.Finalize()
1655 320d1daf Michael Hanselmann
1656 de9d02c7 Michael Hanselmann
        self.UpdateJobUnlocked(job)
1657 de9d02c7 Michael Hanselmann
1658 7b5c4a69 Michael Hanselmann
    if restartjobs:
1659 7b5c4a69 Michael Hanselmann
      logging.info("Restarting %s jobs", len(restartjobs))
1660 75d81fc8 Michael Hanselmann
      self._EnqueueJobsUnlocked(restartjobs)
1661 7b5c4a69 Michael Hanselmann
1662 de9d02c7 Michael Hanselmann
    logging.info("Job queue inspection finished")
1663 85f03e0d Michael Hanselmann
1664 fb1ffbca Michael Hanselmann
  def _GetRpc(self, address_list):
1665 fb1ffbca Michael Hanselmann
    """Gets RPC runner with context.
1666 fb1ffbca Michael Hanselmann

1667 fb1ffbca Michael Hanselmann
    """
1668 fb1ffbca Michael Hanselmann
    return rpc.JobQueueRunner(self.context, address_list)
1669 fb1ffbca Michael Hanselmann
1670 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1671 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1672 99aabbed Iustin Pop
  def AddNode(self, node):
1673 99aabbed Iustin Pop
    """Register a new node with the queue.
1674 99aabbed Iustin Pop

1675 99aabbed Iustin Pop
    @type node: L{objects.Node}
1676 99aabbed Iustin Pop
    @param node: the node object to be added
1677 99aabbed Iustin Pop

1678 99aabbed Iustin Pop
    """
1679 99aabbed Iustin Pop
    node_name = node.name
1680 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1681 23752136 Michael Hanselmann
1682 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1683 fb1ffbca Michael Hanselmann
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1684 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1685 c8457ce7 Iustin Pop
    if msg:
1686 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1687 c8457ce7 Iustin Pop
                      node_name, msg)
1688 23752136 Michael Hanselmann
1689 59303563 Iustin Pop
    if not node.master_candidate:
1690 59303563 Iustin Pop
      # remove if existing, ignoring errors
1691 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1692 59303563 Iustin Pop
      # and skip the replication of the job ids
1693 59303563 Iustin Pop
      return
1694 59303563 Iustin Pop
1695 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1696 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1697 23752136 Michael Hanselmann
1698 d2e03a33 Michael Hanselmann
    # Upload current serial file
1699 e2b4a7ba Michael Hanselmann
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1700 d2e03a33 Michael Hanselmann
1701 fb1ffbca Michael Hanselmann
    # Static address list
1702 fb1ffbca Michael Hanselmann
    addrs = [node.primary_ip]
1703 fb1ffbca Michael Hanselmann
1704 d2e03a33 Michael Hanselmann
    for file_name in files:
1705 9f774ee8 Michael Hanselmann
      # Read file content
1706 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1707 9f774ee8 Michael Hanselmann
1708 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1709 cffbbae7 Michael Hanselmann
                             file_name, content)
1710 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1711 c8457ce7 Iustin Pop
      if msg:
1712 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1713 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1714 d2e03a33 Michael Hanselmann
1715 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1716 d2e03a33 Michael Hanselmann
1717 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1718 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1719 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1720 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1721 ea03467c Iustin Pop

1722 ea03467c Iustin Pop
    @type node_name: str
1723 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1724 ea03467c Iustin Pop

1725 ea03467c Iustin Pop
    """
1726 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1727 23752136 Michael Hanselmann
1728 7e950d31 Iustin Pop
  @staticmethod
1729 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1730 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1731 ea03467c Iustin Pop

1732 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1733 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1734 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1735 ea03467c Iustin Pop

1736 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1737 ea03467c Iustin Pop
    @type nodes: list
1738 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1739 ea03467c Iustin Pop
    @type failmsg: str
1740 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1741 ea03467c Iustin Pop

1742 ea03467c Iustin Pop
    """
1743 e74798c1 Michael Hanselmann
    failed = []
1744 e74798c1 Michael Hanselmann
    success = []
1745 e74798c1 Michael Hanselmann
1746 e74798c1 Michael Hanselmann
    for node in nodes:
1747 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1748 c8457ce7 Iustin Pop
      if msg:
1749 e74798c1 Michael Hanselmann
        failed.append(node)
1750 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1751 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1752 c8457ce7 Iustin Pop
      else:
1753 c8457ce7 Iustin Pop
        success.append(node)
1754 e74798c1 Michael Hanselmann
1755 e74798c1 Michael Hanselmann
    # +1 for the master node
1756 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1757 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1758 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1759 e74798c1 Michael Hanselmann
1760 99aabbed Iustin Pop
  def _GetNodeIp(self):
1761 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1762 99aabbed Iustin Pop

1763 ea03467c Iustin Pop
    @rtype: (list, list)
1764 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1765 ea03467c Iustin Pop
        names and the second one with the node addresses
1766 ea03467c Iustin Pop

1767 99aabbed Iustin Pop
    """
1768 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1769 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1770 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1771 99aabbed Iustin Pop
    return name_list, addr_list
1772 99aabbed Iustin Pop
1773 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1774 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1775 8e00939c Michael Hanselmann

1776 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1777 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1778 ea03467c Iustin Pop

1779 ea03467c Iustin Pop
    @type file_name: str
1780 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1781 ea03467c Iustin Pop
    @type data: str
1782 ea03467c Iustin Pop
    @param data: the new contents of the file
1783 4c36bdf5 Guido Trotter
    @type replicate: boolean
1784 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1785 ea03467c Iustin Pop

1786 8e00939c Michael Hanselmann
    """
1787 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1788 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1789 82b22e19 René Nussbaumer
                    gid=getents.masterd_gid)
1790 8e00939c Michael Hanselmann
1791 4c36bdf5 Guido Trotter
    if replicate:
1792 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1793 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1794 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1795 23752136 Michael Hanselmann
1796 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1797 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1798 ea03467c Iustin Pop

1799 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1800 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1801 ea03467c Iustin Pop

1802 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1803 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1804 ea03467c Iustin Pop

1805 ea03467c Iustin Pop
    """
1806 dd875d32 Michael Hanselmann
    # Rename them locally
1807 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1808 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1809 abc1f2ce Michael Hanselmann
1810 dd875d32 Michael Hanselmann
    # ... and on all nodes
1811 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1812 fb1ffbca Michael Hanselmann
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1813 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1814 abc1f2ce Michael Hanselmann
1815 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1816 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1817 f1da30e6 Michael Hanselmann

1818 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1819 f1da30e6 Michael Hanselmann

1820 009e73d0 Iustin Pop
    @type count: integer
1821 009e73d0 Iustin Pop
    @param count: how many serials to return
1822 76b62028 Iustin Pop
    @rtype: list of int
1823 76b62028 Iustin Pop
    @return: a list of job identifiers.
1824 f1da30e6 Michael Hanselmann

1825 f1da30e6 Michael Hanselmann
    """
1826 719f8fba Michael Hanselmann
    assert ht.TPositiveInt(count)
1827 719f8fba Michael Hanselmann
1828 f1da30e6 Michael Hanselmann
    # New number
1829 009e73d0 Iustin Pop
    serial = self._last_serial + count
1830 f1da30e6 Michael Hanselmann
1831 f1da30e6 Michael Hanselmann
    # Write to file
1832 e2b4a7ba Michael Hanselmann
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1833 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1834 f1da30e6 Michael Hanselmann
1835 1410a389 Michael Hanselmann
    result = [jstore.FormatJobID(v)
1836 3c88bf36 Michael Hanselmann
              for v in range(self._last_serial + 1, serial + 1)]
1837 3c88bf36 Michael Hanselmann
1838 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1839 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1840 f1da30e6 Michael Hanselmann
1841 3c88bf36 Michael Hanselmann
    assert len(result) == count
1842 3c88bf36 Michael Hanselmann
1843 009e73d0 Iustin Pop
    return result
1844 f1da30e6 Michael Hanselmann
1845 85f03e0d Michael Hanselmann
  @staticmethod
1846 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1847 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1848 ea03467c Iustin Pop

1849 ea03467c Iustin Pop
    @type job_id: str
1850 ea03467c Iustin Pop
    @param job_id: the job identifier
1851 ea03467c Iustin Pop
    @rtype: str
1852 ea03467c Iustin Pop
    @return: the path to the job file
1853 ea03467c Iustin Pop

1854 ea03467c Iustin Pop
    """
1855 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1856 f1da30e6 Michael Hanselmann
1857 1410a389 Michael Hanselmann
  @staticmethod
1858 1410a389 Michael Hanselmann
  def _GetArchivedJobPath(job_id):
1859 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1860 ea03467c Iustin Pop

1861 ea03467c Iustin Pop
    @type job_id: str
1862 ea03467c Iustin Pop
    @param job_id: the job identifier
1863 ea03467c Iustin Pop
    @rtype: str
1864 ea03467c Iustin Pop
    @return: the path to the archived job file
1865 ea03467c Iustin Pop

1866 ea03467c Iustin Pop
    """
1867 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1868 1410a389 Michael Hanselmann
                          jstore.GetArchiveDirectory(job_id),
1869 1410a389 Michael Hanselmann
                          "job-%s" % job_id)
1870 0cb94105 Michael Hanselmann
1871 cb66225d Michael Hanselmann
  @staticmethod
1872 0422250e Michael Hanselmann
  def _DetermineJobDirectories(archived):
1873 bb921668 Michael Hanselmann
    """Build list of directories containing job files.
1874 bb921668 Michael Hanselmann

1875 bb921668 Michael Hanselmann
    @type archived: bool
1876 bb921668 Michael Hanselmann
    @param archived: Whether to include directories for archived jobs
1877 bb921668 Michael Hanselmann
    @rtype: list
1878 bb921668 Michael Hanselmann

1879 bb921668 Michael Hanselmann
    """
1880 0422250e Michael Hanselmann
    result = [pathutils.QUEUE_DIR]
1881 0422250e Michael Hanselmann
1882 0422250e Michael Hanselmann
    if archived:
1883 0422250e Michael Hanselmann
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1884 0422250e Michael Hanselmann
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1885 0422250e Michael Hanselmann
                        utils.ListVisibleFiles(archive_path)))
1886 0422250e Michael Hanselmann
1887 0422250e Michael Hanselmann
    return result
1888 0422250e Michael Hanselmann
1889 0422250e Michael Hanselmann
  @classmethod
1890 0422250e Michael Hanselmann
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1891 911a495b Iustin Pop
    """Return all known job IDs.
1892 911a495b Iustin Pop

1893 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1894 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1895 ac0930b9 Iustin Pop
    extra IDs).
1896 ac0930b9 Iustin Pop

1897 85a1c57d Guido Trotter
    @type sort: boolean
1898 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1899 ea03467c Iustin Pop
    @rtype: list
1900 ea03467c Iustin Pop
    @return: the list of job IDs
1901 ea03467c Iustin Pop

1902 911a495b Iustin Pop
    """
1903 85a1c57d Guido Trotter
    jlist = []
1904 0422250e Michael Hanselmann
1905 0422250e Michael Hanselmann
    for path in cls._DetermineJobDirectories(archived):
1906 0422250e Michael Hanselmann
      for filename in utils.ListVisibleFiles(path):
1907 0422250e Michael Hanselmann
        m = constants.JOB_FILE_RE.match(filename)
1908 0422250e Michael Hanselmann
        if m:
1909 0422250e Michael Hanselmann
          jlist.append(int(m.group(1)))
1910 0422250e Michael Hanselmann
1911 85a1c57d Guido Trotter
    if sort:
1912 76b62028 Iustin Pop
      jlist.sort()
1913 f0d874fe Iustin Pop
    return jlist
1914 911a495b Iustin Pop
1915 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1916 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1917 ea03467c Iustin Pop

1918 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1919 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1920 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1921 ea03467c Iustin Pop

1922 76b62028 Iustin Pop
    @type job_id: int
1923 ea03467c Iustin Pop
    @param job_id: the job id
1924 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1925 ea03467c Iustin Pop
    @return: either None or the job object
1926 ea03467c Iustin Pop

1927 ea03467c Iustin Pop
    """
1928 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
1929 5685c1a5 Michael Hanselmann
    if job:
1930 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
1931 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Found read-only job in memcache"
1932 5685c1a5 Michael Hanselmann
      return job
1933 ac0930b9 Iustin Pop
1934 3d6c5566 Guido Trotter
    try:
1935 194c8ca4 Michael Hanselmann
      job = self._LoadJobFromDisk(job_id, False)
1936 aa9f8167 Iustin Pop
      if job is None:
1937 aa9f8167 Iustin Pop
        return job
1938 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
1939 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
1940 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
1941 3d6c5566 Guido Trotter
      if old_path == new_path:
1942 3d6c5566 Guido Trotter
        # job already archived (future case)
1943 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
1944 3d6c5566 Guido Trotter
      else:
1945 3d6c5566 Guido Trotter
        # non-archived case
1946 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
1947 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
1948 3d6c5566 Guido Trotter
      return None
1949 162c8636 Guido Trotter
1950 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Job just loaded is not writable"
1951 c0f6d0d8 Michael Hanselmann
1952 162c8636 Guido Trotter
    self._memcache[job_id] = job
1953 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
1954 162c8636 Guido Trotter
    return job
1955 162c8636 Guido Trotter
1956 c0f6d0d8 Michael Hanselmann
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1957 162c8636 Guido Trotter
    """Load the given job file from disk.
1958 162c8636 Guido Trotter

1959 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1960 162c8636 Guido Trotter

1961 76b62028 Iustin Pop
    @type job_id: int
1962 162c8636 Guido Trotter
    @param job_id: job identifier
1963 194c8ca4 Michael Hanselmann
    @type try_archived: bool
1964 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
1965 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
1966 162c8636 Guido Trotter
    @return: either None or the job object
1967 162c8636 Guido Trotter

1968 162c8636 Guido Trotter
    """
1969 8a3cd185 Michael Hanselmann
    path_functions = [(self._GetJobPath, False)]
1970 194c8ca4 Michael Hanselmann
1971 194c8ca4 Michael Hanselmann
    if try_archived:
1972 8a3cd185 Michael Hanselmann
      path_functions.append((self._GetArchivedJobPath, True))
1973 194c8ca4 Michael Hanselmann
1974 194c8ca4 Michael Hanselmann
    raw_data = None
1975 8a3cd185 Michael Hanselmann
    archived = None
1976 194c8ca4 Michael Hanselmann
1977 8a3cd185 Michael Hanselmann
    for (fn, archived) in path_functions:
1978 194c8ca4 Michael Hanselmann
      filepath = fn(job_id)
1979 194c8ca4 Michael Hanselmann
      logging.debug("Loading job from %s", filepath)
1980 194c8ca4 Michael Hanselmann
      try:
1981 194c8ca4 Michael Hanselmann
        raw_data = utils.ReadFile(filepath)
1982 194c8ca4 Michael Hanselmann
      except EnvironmentError, err:
1983 194c8ca4 Michael Hanselmann
        if err.errno != errno.ENOENT:
1984 194c8ca4 Michael Hanselmann
          raise
1985 194c8ca4 Michael Hanselmann
      else:
1986 194c8ca4 Michael Hanselmann
        break
1987 194c8ca4 Michael Hanselmann
1988 194c8ca4 Michael Hanselmann
    if not raw_data:
1989 194c8ca4 Michael Hanselmann
      return None
1990 13998ef2 Michael Hanselmann
1991 c0f6d0d8 Michael Hanselmann
    if writable is None:
1992 8a3cd185 Michael Hanselmann
      writable = not archived
1993 c0f6d0d8 Michael Hanselmann
1994 94ed59a5 Iustin Pop
    try:
1995 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
1996 8a3cd185 Michael Hanselmann
      job = _QueuedJob.Restore(self, data, writable, archived)
1997 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1998 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
1999 94ed59a5 Iustin Pop
2000 ac0930b9 Iustin Pop
    return job
2001 f1da30e6 Michael Hanselmann
2002 c0f6d0d8 Michael Hanselmann
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2003 0f9c08dc Guido Trotter
    """Load the given job file from disk.
2004 0f9c08dc Guido Trotter

2005 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2006 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
2007 0f9c08dc Guido Trotter
    exception is logged.
2008 0f9c08dc Guido Trotter

2009 76b62028 Iustin Pop
    @type job_id: int
2010 0f9c08dc Guido Trotter
    @param job_id: job identifier
2011 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2012 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2013 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
2014 0f9c08dc Guido Trotter
    @return: either None or the job object
2015 0f9c08dc Guido Trotter

2016 0f9c08dc Guido Trotter
    """
2017 0f9c08dc Guido Trotter
    try:
2018 c0f6d0d8 Michael Hanselmann
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2019 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
2020 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
2021 0f9c08dc Guido Trotter
      return None
2022 0f9c08dc Guido Trotter
2023 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
2024 20571a26 Guido Trotter
    """Update the queue size.
2025 20571a26 Guido Trotter

2026 20571a26 Guido Trotter
    """
2027 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2028 20571a26 Guido Trotter
2029 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2030 20571a26 Guido Trotter
  @_RequireOpenQueue
2031 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
2032 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
2033 3ccafd0e Iustin Pop

2034 ea03467c Iustin Pop
    @type drain_flag: boolean
2035 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
2036 ea03467c Iustin Pop

2037 3ccafd0e Iustin Pop
    """
2038 ff699aa9 Michael Hanselmann
    jstore.SetDrainFlag(drain_flag)
2039 20571a26 Guido Trotter
2040 20571a26 Guido Trotter
    self._drained = drain_flag
2041 20571a26 Guido Trotter
2042 3ccafd0e Iustin Pop
    return True
2043 3ccafd0e Iustin Pop
2044 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2045 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
2046 85f03e0d Michael Hanselmann
    """Create and store a new job.
2047 f1da30e6 Michael Hanselmann

2048 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
2049 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
2050 c3f0a12f Iustin Pop

2051 009e73d0 Iustin Pop
    @type job_id: job ID
2052 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
2053 c3f0a12f Iustin Pop
    @type ops: list
2054 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
2055 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
2056 7beb1e53 Guido Trotter
    @return: the job object to be queued
2057 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2058 e71c8147 Michael Hanselmann
    @raise errors.GenericError: If an opcode is not valid
2059 c3f0a12f Iustin Pop

2060 c3f0a12f Iustin Pop
    """
2061 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2062 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
2063 f87b405e Michael Hanselmann
2064 c0f6d0d8 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops, True)
2065 f1da30e6 Michael Hanselmann
2066 e71c8147 Michael Hanselmann
    for idx, op in enumerate(job.ops):
2067 42d49574 Michael Hanselmann
      # Check priority
2068 e71c8147 Michael Hanselmann
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2069 e71c8147 Michael Hanselmann
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2070 e71c8147 Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2071 e71c8147 Michael Hanselmann
                                  " are %s" % (idx, op.priority, allowed))
2072 e71c8147 Michael Hanselmann
2073 42d49574 Michael Hanselmann
      # Check job dependencies
2074 b247c6fc Michael Hanselmann
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2075 b247c6fc Michael Hanselmann
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2076 b247c6fc Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2077 b247c6fc Michael Hanselmann
                                  " match %s: %s" %
2078 b247c6fc Michael Hanselmann
                                  (idx, opcodes.TNoRelativeJobDependencies,
2079 b247c6fc Michael Hanselmann
                                   dependencies))
2080 b247c6fc Michael Hanselmann
2081 f1da30e6 Michael Hanselmann
    # Write to disk
2082 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
2083 f1da30e6 Michael Hanselmann
2084 20571a26 Guido Trotter
    self._queue_size += 1
2085 20571a26 Guido Trotter
2086 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
2087 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
2088 ac0930b9 Iustin Pop
2089 7beb1e53 Guido Trotter
    return job
2090 f1da30e6 Michael Hanselmann
2091 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2092 2971c913 Iustin Pop
  @_RequireOpenQueue
2093 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2094 2971c913 Iustin Pop
  def SubmitJob(self, ops):
2095 2971c913 Iustin Pop
    """Create and store a new job.
2096 2971c913 Iustin Pop

2097 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2098 2971c913 Iustin Pop

2099 2971c913 Iustin Pop
    """
2100 b247c6fc Michael Hanselmann
    (job_id, ) = self._NewSerialsUnlocked(1)
2101 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2102 7beb1e53 Guido Trotter
    return job_id
2103 2971c913 Iustin Pop
2104 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2105 2971c913 Iustin Pop
  @_RequireOpenQueue
2106 c8d0be94 Michael Hanselmann
  @_RequireNonDrainedQueue
2107 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
2108 2971c913 Iustin Pop
    """Create and store multiple jobs.
2109 2971c913 Iustin Pop

2110 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
2111 2971c913 Iustin Pop

2112 2971c913 Iustin Pop
    """
2113 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2114 b247c6fc Michael Hanselmann
2115 b247c6fc Michael Hanselmann
    (results, added_jobs) = \
2116 b247c6fc Michael Hanselmann
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2117 7b5c4a69 Michael Hanselmann
2118 75d81fc8 Michael Hanselmann
    self._EnqueueJobsUnlocked(added_jobs)
2119 2971c913 Iustin Pop
2120 2971c913 Iustin Pop
    return results
2121 2971c913 Iustin Pop
2122 b247c6fc Michael Hanselmann
  @staticmethod
2123 b247c6fc Michael Hanselmann
  def _FormatSubmitError(msg, ops):
2124 b247c6fc Michael Hanselmann
    """Formats errors which occurred while submitting a job.
2125 b247c6fc Michael Hanselmann

2126 b247c6fc Michael Hanselmann
    """
2127 b247c6fc Michael Hanselmann
    return ("%s; opcodes %s" %
2128 b247c6fc Michael Hanselmann
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2129 b247c6fc Michael Hanselmann
2130 b247c6fc Michael Hanselmann
  @staticmethod
2131 b247c6fc Michael Hanselmann
  def _ResolveJobDependencies(resolve_fn, deps):
2132 b247c6fc Michael Hanselmann
    """Resolves relative job IDs in dependencies.
2133 b247c6fc Michael Hanselmann

2134 b247c6fc Michael Hanselmann
    @type resolve_fn: callable
2135 b247c6fc Michael Hanselmann
    @param resolve_fn: Function to resolve a relative job ID
2136 b247c6fc Michael Hanselmann
    @type deps: list
2137 b247c6fc Michael Hanselmann
    @param deps: Dependencies
2138 4c27b231 Michael Hanselmann
    @rtype: tuple; (boolean, string or list)
2139 4c27b231 Michael Hanselmann
    @return: If successful (first tuple item), the returned list contains
2140 4c27b231 Michael Hanselmann
      resolved job IDs along with the requested status; if not successful,
2141 4c27b231 Michael Hanselmann
      the second element is an error message
2142 b247c6fc Michael Hanselmann

2143 b247c6fc Michael Hanselmann
    """
2144 b247c6fc Michael Hanselmann
    result = []
2145 b247c6fc Michael Hanselmann
2146 b247c6fc Michael Hanselmann
    for (dep_job_id, dep_status) in deps:
2147 b247c6fc Michael Hanselmann
      if ht.TRelativeJobId(dep_job_id):
2148 b247c6fc Michael Hanselmann
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2149 b247c6fc Michael Hanselmann
        try:
2150 b247c6fc Michael Hanselmann
          job_id = resolve_fn(dep_job_id)
2151 b247c6fc Michael Hanselmann
        except IndexError:
2152 b247c6fc Michael Hanselmann
          # Abort
2153 b247c6fc Michael Hanselmann
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2154 b247c6fc Michael Hanselmann
      else:
2155 b247c6fc Michael Hanselmann
        job_id = dep_job_id
2156 b247c6fc Michael Hanselmann
2157 b247c6fc Michael Hanselmann
      result.append((job_id, dep_status))
2158 b247c6fc Michael Hanselmann
2159 b247c6fc Michael Hanselmann
    return (True, result)
2160 b247c6fc Michael Hanselmann
2161 b247c6fc Michael Hanselmann
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2162 b247c6fc Michael Hanselmann
    """Create and store multiple jobs.
2163 b247c6fc Michael Hanselmann

2164 b247c6fc Michael Hanselmann
    @see: L{_SubmitJobUnlocked}
2165 b247c6fc Michael Hanselmann

2166 b247c6fc Michael Hanselmann
    """
2167 b247c6fc Michael Hanselmann
    results = []
2168 b247c6fc Michael Hanselmann
    added_jobs = []
2169 b247c6fc Michael Hanselmann
2170 b247c6fc Michael Hanselmann
    def resolve_fn(job_idx, reljobid):
2171 b247c6fc Michael Hanselmann
      assert reljobid < 0
2172 b247c6fc Michael Hanselmann
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2173 b247c6fc Michael Hanselmann
2174 b247c6fc Michael Hanselmann
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2175 b247c6fc Michael Hanselmann
      for op in ops:
2176 b247c6fc Michael Hanselmann
        if getattr(op, opcodes.DEPEND_ATTR, None):
2177 b247c6fc Michael Hanselmann
          (status, data) = \
2178 b247c6fc Michael Hanselmann
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2179 b247c6fc Michael Hanselmann
                                         op.depends)
2180 b247c6fc Michael Hanselmann
          if not status:
2181 b247c6fc Michael Hanselmann
            # Abort resolving dependencies
2182 b247c6fc Michael Hanselmann
            assert ht.TNonEmptyString(data), "No error message"
2183 b247c6fc Michael Hanselmann
            break
2184 b247c6fc Michael Hanselmann
          # Use resolved dependencies
2185 b247c6fc Michael Hanselmann
          op.depends = data
2186 b247c6fc Michael Hanselmann
      else:
2187 b247c6fc Michael Hanselmann
        try:
2188 b247c6fc Michael Hanselmann
          job = self._SubmitJobUnlocked(job_id, ops)
2189 b247c6fc Michael Hanselmann
        except errors.GenericError, err:
2190 b247c6fc Michael Hanselmann
          status = False
2191 b247c6fc Michael Hanselmann
          data = self._FormatSubmitError(str(err), ops)
2192 b247c6fc Michael Hanselmann
        else:
2193 b247c6fc Michael Hanselmann
          status = True
2194 b247c6fc Michael Hanselmann
          data = job_id
2195 b247c6fc Michael Hanselmann
          added_jobs.append(job)
2196 b247c6fc Michael Hanselmann
2197 b247c6fc Michael Hanselmann
      results.append((status, data))
2198 b247c6fc Michael Hanselmann
2199 b247c6fc Michael Hanselmann
    return (results, added_jobs)
2200 b247c6fc Michael Hanselmann
2201 75d81fc8 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2202 7b5c4a69 Michael Hanselmann
  def _EnqueueJobs(self, jobs):
2203 7b5c4a69 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2204 7b5c4a69 Michael Hanselmann

2205 7b5c4a69 Michael Hanselmann
    @type jobs: list
2206 7b5c4a69 Michael Hanselmann
    @param jobs: List of all jobs
2207 7b5c4a69 Michael Hanselmann

2208 7b5c4a69 Michael Hanselmann
    """
2209 75d81fc8 Michael Hanselmann
    return self._EnqueueJobsUnlocked(jobs)
2210 75d81fc8 Michael Hanselmann
2211 75d81fc8 Michael Hanselmann
  def _EnqueueJobsUnlocked(self, jobs):
2212 75d81fc8 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2213 75d81fc8 Michael Hanselmann

2214 75d81fc8 Michael Hanselmann
    @type jobs: list
2215 75d81fc8 Michael Hanselmann
    @param jobs: List of all jobs
2216 75d81fc8 Michael Hanselmann

2217 75d81fc8 Michael Hanselmann
    """
2218 75d81fc8 Michael Hanselmann
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2219 7b5c4a69 Michael Hanselmann
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2220 7b5c4a69 Michael Hanselmann
                             priority=[job.CalcPriority() for job in jobs])
2221 7b5c4a69 Michael Hanselmann
2222 b95479a5 Michael Hanselmann
  def _GetJobStatusForDependencies(self, job_id):
2223 b95479a5 Michael Hanselmann
    """Gets the status of a job for dependencies.
2224 b95479a5 Michael Hanselmann

2225 76b62028 Iustin Pop
    @type job_id: int
2226 b95479a5 Michael Hanselmann
    @param job_id: Job ID
2227 b95479a5 Michael Hanselmann
    @raise errors.JobLost: If job can't be found
2228 b95479a5 Michael Hanselmann

2229 b95479a5 Michael Hanselmann
    """
2230 b95479a5 Michael Hanselmann
    # Not using in-memory cache as doing so would require an exclusive lock
2231 b95479a5 Michael Hanselmann
2232 b95479a5 Michael Hanselmann
    # Try to load from disk
2233 c0f6d0d8 Michael Hanselmann
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2234 c0f6d0d8 Michael Hanselmann
2235 17385bd2 Andrea Spadaccini
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2236 b95479a5 Michael Hanselmann
2237 b95479a5 Michael Hanselmann
    if job:
2238 b95479a5 Michael Hanselmann
      return job.CalcStatus()
2239 b95479a5 Michael Hanselmann
2240 b95479a5 Michael Hanselmann
    raise errors.JobLost("Job %s not found" % job_id)
2241 b95479a5 Michael Hanselmann
2242 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2243 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
2244 ea03467c Iustin Pop
    """Update a job's on disk storage.
2245 ea03467c Iustin Pop

2246 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
2247 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
2248 ea03467c Iustin Pop
    nodes.
2249 ea03467c Iustin Pop

2250 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
2251 ea03467c Iustin Pop
    @param job: the changed job
2252 4c36bdf5 Guido Trotter
    @type replicate: boolean
2253 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
2254 ea03467c Iustin Pop

2255 ea03467c Iustin Pop
    """
2256 66bd7445 Michael Hanselmann
    if __debug__:
2257 66bd7445 Michael Hanselmann
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2258 66bd7445 Michael Hanselmann
      assert (finalized ^ (job.end_timestamp is None))
2259 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't update read-only job"
2260 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't update archived job"
2261 66bd7445 Michael Hanselmann
2262 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
2263 a182a3ed Michael Hanselmann
    data = serializer.DumpJson(job.Serialize())
2264 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
2265 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
2266 ac0930b9 Iustin Pop
2267 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2268 5c735209 Iustin Pop
                        timeout):
2269 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
2270 6c5a7090 Michael Hanselmann

2271 76b62028 Iustin Pop
    @type job_id: int
2272 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
2273 6c5a7090 Michael Hanselmann
    @type fields: list of strings
2274 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
2275 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
2276 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
2277 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
2278 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
2279 5c735209 Iustin Pop
    @type timeout: float
2280 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
2281 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
2282 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
2283 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
2284 ea03467c Iustin Pop

2285 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
2286 ea03467c Iustin Pop
        we instead return a special value,
2287 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2288 ea03467c Iustin Pop
        as such by the clients
2289 6c5a7090 Michael Hanselmann

2290 6c5a7090 Michael Hanselmann
    """
2291 04569469 Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2292 c0f6d0d8 Michael Hanselmann
                             writable=False)
2293 989a8bee Michael Hanselmann
2294 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
2295 989a8bee Michael Hanselmann
2296 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
2297 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
2298 dfe57c22 Michael Hanselmann
2299 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2300 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2301 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
2302 188c5e0a Michael Hanselmann
    """Cancels a job.
2303 188c5e0a Michael Hanselmann

2304 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
2305 ea03467c Iustin Pop

2306 76b62028 Iustin Pop
    @type job_id: int
2307 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
2308 188c5e0a Michael Hanselmann

2309 188c5e0a Michael Hanselmann
    """
2310 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
2311 188c5e0a Michael Hanselmann
2312 aebd0e4e Michael Hanselmann
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2313 aebd0e4e Michael Hanselmann
2314 aebd0e4e Michael Hanselmann
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2315 aebd0e4e Michael Hanselmann
    """Modifies a job.
2316 aebd0e4e Michael Hanselmann

2317 aebd0e4e Michael Hanselmann
    @type job_id: int
2318 aebd0e4e Michael Hanselmann
    @param job_id: Job ID
2319 aebd0e4e Michael Hanselmann
    @type mod_fn: callable
2320 aebd0e4e Michael Hanselmann
    @param mod_fn: Modifying function, receiving job object as parameter,
2321 aebd0e4e Michael Hanselmann
      returning tuple of (status boolean, message string)
2322 aebd0e4e Michael Hanselmann

2323 aebd0e4e Michael Hanselmann
    """
2324 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2325 188c5e0a Michael Hanselmann
    if not job:
2326 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2327 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
2328 fbf0262f Michael Hanselmann
2329 aebd0e4e Michael Hanselmann
    assert job.writable, "Can't modify read-only job"
2330 aebd0e4e Michael Hanselmann
    assert not job.archived, "Can't modify archived job"
2331 c0f6d0d8 Michael Hanselmann
2332 aebd0e4e Michael Hanselmann
    (success, msg) = mod_fn(job)
2333 188c5e0a Michael Hanselmann
2334 099b2870 Michael Hanselmann
    if success:
2335 66bd7445 Michael Hanselmann
      # If the job was finalized (e.g. cancelled), this is the final write
2336 66bd7445 Michael Hanselmann
      # allowed. The job can be archived anytime.
2337 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
2338 fbf0262f Michael Hanselmann
2339 099b2870 Michael Hanselmann
    return (success, msg)
2340 fbf0262f Michael Hanselmann
2341 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
2342 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
2343 d7fd1f28 Michael Hanselmann
    """Archives jobs.
2344 c609f802 Michael Hanselmann

2345 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
2346 25e7b43f Iustin Pop
    @param jobs: Job objects
2347 d7fd1f28 Michael Hanselmann
    @rtype: int
2348 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
2349 c609f802 Michael Hanselmann

2350 c609f802 Michael Hanselmann
    """
2351 d7fd1f28 Michael Hanselmann
    archive_jobs = []
2352 d7fd1f28 Michael Hanselmann
    rename_files = []
2353 d7fd1f28 Michael Hanselmann
    for job in jobs:
2354 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't archive read-only job"
2355 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't cancel archived job"
2356 c0f6d0d8 Michael Hanselmann
2357 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2358 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
2359 d7fd1f28 Michael Hanselmann
        continue
2360 c609f802 Michael Hanselmann
2361 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
2362 c609f802 Michael Hanselmann
2363 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
2364 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
2365 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
2366 c609f802 Michael Hanselmann
2367 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
2368 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
2369 f1da30e6 Michael Hanselmann
2370 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
2371 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
2372 d7fd1f28 Michael Hanselmann
2373 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2374 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
2375 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
2376 20571a26 Guido Trotter
    # archived jobs to fix this.
2377 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
2378 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
2379 78d12585 Michael Hanselmann
2380 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2381 07cd723a Iustin Pop
  @_RequireOpenQueue
2382 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
2383 07cd723a Iustin Pop
    """Archives a job.
2384 07cd723a Iustin Pop

2385 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2386 ea03467c Iustin Pop

2387 76b62028 Iustin Pop
    @type job_id: int
2388 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
2389 78d12585 Michael Hanselmann
    @rtype: bool
2390 78d12585 Michael Hanselmann
    @return: Whether job was archived
2391 07cd723a Iustin Pop

2392 07cd723a Iustin Pop
    """
2393 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
2394 78d12585 Michael Hanselmann
2395 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2396 78d12585 Michael Hanselmann
    if not job:
2397 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2398 78d12585 Michael Hanselmann
      return False
2399 78d12585 Michael Hanselmann
2400 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
2401 07cd723a Iustin Pop
2402 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2403 07cd723a Iustin Pop
  @_RequireOpenQueue
2404 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
2405 07cd723a Iustin Pop
    """Archives all jobs based on age.
2406 07cd723a Iustin Pop

2407 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
2408 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
2409 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
2410 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
2411 07cd723a Iustin Pop

2412 07cd723a Iustin Pop
    @type age: int
2413 07cd723a Iustin Pop
    @param age: the minimum age in seconds
2414 07cd723a Iustin Pop

2415 07cd723a Iustin Pop
    """
2416 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
2417 07cd723a Iustin Pop
2418 07cd723a Iustin Pop
    now = time.time()
2419 f8ad5591 Michael Hanselmann
    end_time = now + timeout
2420 f8ad5591 Michael Hanselmann
    archived_count = 0
2421 f8ad5591 Michael Hanselmann
    last_touched = 0
2422 f8ad5591 Michael Hanselmann
2423 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
2424 d7fd1f28 Michael Hanselmann
    pending = []
2425 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
2426 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
2427 f8ad5591 Michael Hanselmann
2428 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
2429 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
2430 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
2431 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
2432 f8ad5591 Michael Hanselmann
        break
2433 f8ad5591 Michael Hanselmann
2434 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
2435 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
2436 f8ad5591 Michael Hanselmann
      if job:
2437 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
2438 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
2439 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
2440 f8ad5591 Michael Hanselmann
          else:
2441 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
2442 07cd723a Iustin Pop
        else:
2443 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
2444 f8ad5591 Michael Hanselmann
2445 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
2446 d7fd1f28 Michael Hanselmann
          pending.append(job)
2447 d7fd1f28 Michael Hanselmann
2448 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
2449 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
2450 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
2451 d7fd1f28 Michael Hanselmann
            pending = []
2452 f8ad5591 Michael Hanselmann
2453 d7fd1f28 Michael Hanselmann
    if pending:
2454 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
2455 07cd723a Iustin Pop
2456 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
2457 07cd723a Iustin Pop
2458 e07f7f7a Michael Hanselmann
  def _Query(self, fields, qfilter):
2459 e07f7f7a Michael Hanselmann
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2460 e07f7f7a Michael Hanselmann
                       namefield="id")
2461 e07f7f7a Michael Hanselmann
2462 0422250e Michael Hanselmann
    # Archived jobs are only looked at if the "archived" field is referenced
2463 0422250e Michael Hanselmann
    # either as a requested field or in the filter. By default archived jobs
2464 0422250e Michael Hanselmann
    # are ignored.
2465 0422250e Michael Hanselmann
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2466 0422250e Michael Hanselmann
2467 e07f7f7a Michael Hanselmann
    job_ids = qobj.RequestedNames()
2468 e07f7f7a Michael Hanselmann
2469 e07f7f7a Michael Hanselmann
    list_all = (job_ids is None)
2470 e07f7f7a Michael Hanselmann
2471 e07f7f7a Michael Hanselmann
    if list_all:
2472 e07f7f7a Michael Hanselmann
      # Since files are added to/removed from the queue atomically, there's no
2473 e07f7f7a Michael Hanselmann
      # risk of getting the job ids in an inconsistent state.
2474 0422250e Michael Hanselmann
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2475 e07f7f7a Michael Hanselmann
2476 e07f7f7a Michael Hanselmann
    jobs = []
2477 e07f7f7a Michael Hanselmann
2478 e07f7f7a Michael Hanselmann
    for job_id in job_ids:
2479 e07f7f7a Michael Hanselmann
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2480 e07f7f7a Michael Hanselmann
      if job is not None or not list_all:
2481 e07f7f7a Michael Hanselmann
        jobs.append((job_id, job))
2482 e07f7f7a Michael Hanselmann
2483 e07f7f7a Michael Hanselmann
    return (qobj, jobs, list_all)
2484 e07f7f7a Michael Hanselmann
2485 e07f7f7a Michael Hanselmann
  def QueryJobs(self, fields, qfilter):
2486 e07f7f7a Michael Hanselmann
    """Returns a list of jobs in queue.
2487 e07f7f7a Michael Hanselmann

2488 e07f7f7a Michael Hanselmann
    @type fields: sequence
2489 e07f7f7a Michael Hanselmann
    @param fields: List of wanted fields
2490 e07f7f7a Michael Hanselmann
    @type qfilter: None or query2 filter (list)
2491 e07f7f7a Michael Hanselmann
    @param qfilter: Query filter
2492 e07f7f7a Michael Hanselmann

2493 e07f7f7a Michael Hanselmann
    """
2494 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2495 e07f7f7a Michael Hanselmann
2496 76b62028 Iustin Pop
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2497 e07f7f7a Michael Hanselmann
2498 e07f7f7a Michael Hanselmann
  def OldStyleQueryJobs(self, job_ids, fields):
2499 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
2500 e2715f69 Michael Hanselmann

2501 ea03467c Iustin Pop
    @type job_ids: list
2502 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
2503 ea03467c Iustin Pop
    @type fields: list
2504 ea03467c Iustin Pop
    @param fields: names of fields to return
2505 ea03467c Iustin Pop
    @rtype: list
2506 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
2507 ea03467c Iustin Pop
        the requested fields
2508 e2715f69 Michael Hanselmann

2509 e2715f69 Michael Hanselmann
    """
2510 76b62028 Iustin Pop
    # backwards compat:
2511 76b62028 Iustin Pop
    job_ids = [int(jid) for jid in job_ids]
2512 e07f7f7a Michael Hanselmann
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2513 e2715f69 Michael Hanselmann
2514 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2515 e2715f69 Michael Hanselmann
2516 76b62028 Iustin Pop
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2517 e2715f69 Michael Hanselmann
2518 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2519 6d5ea385 Michael Hanselmann
  def PrepareShutdown(self):
2520 6d5ea385 Michael Hanselmann
    """Prepare to stop the job queue.
2521 6d5ea385 Michael Hanselmann

2522 6d5ea385 Michael Hanselmann
    Disables execution of jobs in the workerpool and returns whether there are
2523 6d5ea385 Michael Hanselmann
    any jobs currently running. If the latter is the case, the job queue is not
2524 6d5ea385 Michael Hanselmann
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2525 6d5ea385 Michael Hanselmann
    be called without interfering with any job. Queued and unfinished jobs will
2526 6d5ea385 Michael Hanselmann
    be resumed next time.
2527 6d5ea385 Michael Hanselmann

2528 6d5ea385 Michael Hanselmann
    Once this function has been called no new job submissions will be accepted
2529 6d5ea385 Michael Hanselmann
    (see L{_RequireNonDrainedQueue}).
2530 6d5ea385 Michael Hanselmann

2531 6d5ea385 Michael Hanselmann
    @rtype: bool
2532 6d5ea385 Michael Hanselmann
    @return: Whether there are any running jobs
2533 6d5ea385 Michael Hanselmann

2534 6d5ea385 Michael Hanselmann
    """
2535 6d5ea385 Michael Hanselmann
    if self._accepting_jobs:
2536 6d5ea385 Michael Hanselmann
      self._accepting_jobs = False
2537 6d5ea385 Michael Hanselmann
2538 6d5ea385 Michael Hanselmann
      # Tell worker pool to stop processing pending tasks
2539 6d5ea385 Michael Hanselmann
      self._wpool.SetActive(False)
2540 6d5ea385 Michael Hanselmann
2541 6d5ea385 Michael Hanselmann
    return self._wpool.HasRunningTasks()
2542 6d5ea385 Michael Hanselmann
2543 6d5ea385 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2544 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2545 e2715f69 Michael Hanselmann
  def Shutdown(self):
2546 e2715f69 Michael Hanselmann
    """Stops the job queue.
2547 e2715f69 Michael Hanselmann

2548 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
2549 ea03467c Iustin Pop

2550 e2715f69 Michael Hanselmann
    """
2551 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
2552 85f03e0d Michael Hanselmann
2553 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
2554 a71f9c7d Guido Trotter
    self._queue_filelock = None