Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue / __init__.py @ 58e4df3c

History | View | Annotate | Download (76.2 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 95a4e33f Hrvoje Ribicic
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 e2715f69 Michael Hanselmann
import logging
33 f1da30e6 Michael Hanselmann
import errno
34 f1048938 Iustin Pop
import time
35 5685c1a5 Michael Hanselmann
import weakref
36 b95479a5 Michael Hanselmann
import threading
37 dfc8824a Michael Hanselmann
import itertools
38 99fb250b Michael Hanselmann
import operator
39 ea0a6023 Petr Pudlak
import os
40 498ae1cc Iustin Pop
41 6c2549d6 Guido Trotter
try:
42 b459a848 Andrea Spadaccini
  # pylint: disable=E0611
43 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
44 6c2549d6 Guido Trotter
except ImportError:
45 6c2549d6 Guido Trotter
  import pyinotify
46 6c2549d6 Guido Trotter
47 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
48 e2715f69 Michael Hanselmann
from ganeti import constants
49 f1da30e6 Michael Hanselmann
from ganeti import serializer
50 e2715f69 Michael Hanselmann
from ganeti import workerpool
51 99bd4f0a Guido Trotter
from ganeti import locking
52 704b51ff Klaus Aehlig
from ganeti import luxi
53 f1da30e6 Michael Hanselmann
from ganeti import opcodes
54 580b1fdd Jose A. Lopes
from ganeti import opcodes_base
55 7a1ecaed Iustin Pop
from ganeti import errors
56 e2715f69 Michael Hanselmann
from ganeti import mcpu
57 7996a135 Iustin Pop
from ganeti import utils
58 04ab05ce Michael Hanselmann
from ganeti import jstore
59 4869595d Petr Pudlak
import ganeti.rpc.node as rpc
60 82b22e19 René Nussbaumer
from ganeti import runtime
61 a744b676 Manuel Franceschini
from ganeti import netutils
62 989a8bee Michael Hanselmann
from ganeti import compat
63 b95479a5 Michael Hanselmann
from ganeti import ht
64 a06c6ae8 Michael Hanselmann
from ganeti import query
65 a06c6ae8 Michael Hanselmann
from ganeti import qlang
66 e2b4a7ba Michael Hanselmann
from ganeti import pathutils
67 cffbbae7 Michael Hanselmann
from ganeti import vcluster
68 e2715f69 Michael Hanselmann
69 fbf0262f Michael Hanselmann
70 942817f2 Petr Pudlak
JOBQUEUE_THREADS = 1
71 e2715f69 Michael Hanselmann
72 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
73 ebb80afa Guido Trotter
_LOCK = "_lock"
74 ebb80afa Guido Trotter
_QUEUE = "_queue"
75 99bd4f0a Guido Trotter
76 99fb250b Michael Hanselmann
#: Retrieves "id" attribute
77 99fb250b Michael Hanselmann
_GetIdAttr = operator.attrgetter("id")
78 99fb250b Michael Hanselmann
79 498ae1cc Iustin Pop
80 9728ae5d Iustin Pop
class CancelJob(Exception):
81 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
82 fbf0262f Michael Hanselmann

83 fbf0262f Michael Hanselmann
  """
84 fbf0262f Michael Hanselmann
85 fbf0262f Michael Hanselmann
86 942e2262 Michael Hanselmann
class QueueShutdown(Exception):
87 942e2262 Michael Hanselmann
  """Special exception to abort a job when the job queue is shutting down.
88 942e2262 Michael Hanselmann

89 942e2262 Michael Hanselmann
  """
90 942e2262 Michael Hanselmann
91 942e2262 Michael Hanselmann
92 70552c46 Michael Hanselmann
def TimeStampNow():
93 ea03467c Iustin Pop
  """Returns the current timestamp.
94 ea03467c Iustin Pop

95 ea03467c Iustin Pop
  @rtype: tuple
96 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
97 ea03467c Iustin Pop

98 ea03467c Iustin Pop
  """
99 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
100 70552c46 Michael Hanselmann
101 70552c46 Michael Hanselmann
102 cffbbae7 Michael Hanselmann
def _CallJqUpdate(runner, names, file_name, content):
103 cffbbae7 Michael Hanselmann
  """Updates job queue file after virtualizing filename.
104 cffbbae7 Michael Hanselmann

105 cffbbae7 Michael Hanselmann
  """
106 cffbbae7 Michael Hanselmann
  virt_file_name = vcluster.MakeVirtualPath(file_name)
107 cffbbae7 Michael Hanselmann
  return runner.call_jobqueue_update(names, virt_file_name, content)
108 cffbbae7 Michael Hanselmann
109 cffbbae7 Michael Hanselmann
110 a06c6ae8 Michael Hanselmann
class _SimpleJobQuery:
111 a06c6ae8 Michael Hanselmann
  """Wrapper for job queries.
112 a06c6ae8 Michael Hanselmann

113 a06c6ae8 Michael Hanselmann
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
114 a06c6ae8 Michael Hanselmann

115 a06c6ae8 Michael Hanselmann
  """
116 a06c6ae8 Michael Hanselmann
  def __init__(self, fields):
117 a06c6ae8 Michael Hanselmann
    """Initializes this class.
118 a06c6ae8 Michael Hanselmann

119 a06c6ae8 Michael Hanselmann
    """
120 a06c6ae8 Michael Hanselmann
    self._query = query.Query(query.JOB_FIELDS, fields)
121 a06c6ae8 Michael Hanselmann
122 a06c6ae8 Michael Hanselmann
  def __call__(self, job):
123 a06c6ae8 Michael Hanselmann
    """Executes a job query using cached field list.
124 a06c6ae8 Michael Hanselmann

125 a06c6ae8 Michael Hanselmann
    """
126 a06c6ae8 Michael Hanselmann
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
127 a06c6ae8 Michael Hanselmann
128 a06c6ae8 Michael Hanselmann
129 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
130 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
131 e2715f69 Michael Hanselmann

132 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
133 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
134 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
135 ea03467c Iustin Pop
  @ivar status: the current status
136 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
137 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
138 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
139 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
140 f1048938 Iustin Pop

141 e2715f69 Michael Hanselmann
  """
142 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
143 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
144 66d895a8 Iustin Pop
               "__weakref__"]
145 66d895a8 Iustin Pop
146 85f03e0d Michael Hanselmann
  def __init__(self, op):
147 66abb9ff Michael Hanselmann
    """Initializes instances of this class.
148 ea03467c Iustin Pop

149 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
150 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
151 ea03467c Iustin Pop

152 ea03467c Iustin Pop
    """
153 85f03e0d Michael Hanselmann
    self.input = op
154 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
155 85f03e0d Michael Hanselmann
    self.result = None
156 85f03e0d Michael Hanselmann
    self.log = []
157 70552c46 Michael Hanselmann
    self.start_timestamp = None
158 b9b5abcb Iustin Pop
    self.exec_timestamp = None
159 70552c46 Michael Hanselmann
    self.end_timestamp = None
160 f1da30e6 Michael Hanselmann
161 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
162 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
163 8f5c488d Michael Hanselmann
164 f1da30e6 Michael Hanselmann
  @classmethod
165 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
166 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
167 ea03467c Iustin Pop

168 ea03467c Iustin Pop
    @type state: dict
169 ea03467c Iustin Pop
    @param state: the serialized state
170 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
171 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
172 ea03467c Iustin Pop

173 ea03467c Iustin Pop
    """
174 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
175 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
176 85f03e0d Michael Hanselmann
    obj.status = state["status"]
177 85f03e0d Michael Hanselmann
    obj.result = state["result"]
178 85f03e0d Michael Hanselmann
    obj.log = state["log"]
179 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
180 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
181 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
182 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
183 f1da30e6 Michael Hanselmann
    return obj
184 f1da30e6 Michael Hanselmann
185 f1da30e6 Michael Hanselmann
  def Serialize(self):
186 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
187 ea03467c Iustin Pop

188 ea03467c Iustin Pop
    @rtype: dict
189 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
190 ea03467c Iustin Pop

191 ea03467c Iustin Pop
    """
192 6c5a7090 Michael Hanselmann
    return {
193 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
194 6c5a7090 Michael Hanselmann
      "status": self.status,
195 6c5a7090 Michael Hanselmann
      "result": self.result,
196 6c5a7090 Michael Hanselmann
      "log": self.log,
197 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
198 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
199 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
200 8f5c488d Michael Hanselmann
      "priority": self.priority,
201 6c5a7090 Michael Hanselmann
      }
202 f1048938 Iustin Pop
203 e2715f69 Michael Hanselmann
204 e2715f69 Michael Hanselmann
class _QueuedJob(object):
205 e2715f69 Michael Hanselmann
  """In-memory job representation.
206 e2715f69 Michael Hanselmann

207 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
208 ea03467c Iustin Pop
  be taken care of by users of this class.
209 ea03467c Iustin Pop

210 ea03467c Iustin Pop
  @type queue: L{JobQueue}
211 ea03467c Iustin Pop
  @ivar queue: the parent queue
212 ea03467c Iustin Pop
  @ivar id: the job ID
213 ea03467c Iustin Pop
  @type ops: list
214 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
215 ea03467c Iustin Pop
  @type log_serial: int
216 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
217 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
218 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
219 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
220 c0f6d0d8 Michael Hanselmann
  @ivar writable: Whether the job is allowed to be modified
221 e2715f69 Michael Hanselmann

222 e2715f69 Michael Hanselmann
  """
223 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
224 26d3fd2f Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
225 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
226 76b4ac58 Petr Pudlak
               "processor_lock", "writable", "archived",
227 76b4ac58 Petr Pudlak
               "livelock", "process_id",
228 76b4ac58 Petr Pudlak
               "__weakref__"]
229 66d895a8 Iustin Pop
230 be6cdf67 Michele Tartara
  def AddReasons(self, pickup=False):
231 e0f2bf1e Michele Tartara
    """Extend the reason trail
232 e0f2bf1e Michele Tartara

233 e0f2bf1e Michele Tartara
    Add the reason for all the opcodes of this job to be executed.
234 e0f2bf1e Michele Tartara

235 e0f2bf1e Michele Tartara
    """
236 e0f2bf1e Michele Tartara
    count = 0
237 e0f2bf1e Michele Tartara
    for queued_op in self.ops:
238 e0f2bf1e Michele Tartara
      op = queued_op.input
239 be6cdf67 Michele Tartara
      if pickup:
240 be6cdf67 Michele Tartara
        reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP
241 be6cdf67 Michele Tartara
      else:
242 be6cdf67 Michele Tartara
        reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE
243 be6cdf67 Michele Tartara
      reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__,
244 be6cdf67 Michele Tartara
                                                reason_src_prefix)
245 e0f2bf1e Michele Tartara
      reason_text = "job=%d;index=%d" % (self.id, count)
246 e0f2bf1e Michele Tartara
      reason = getattr(op, "reason", [])
247 e0f2bf1e Michele Tartara
      reason.append((reason_src, reason_text, utils.EpochNano()))
248 e0f2bf1e Michele Tartara
      op.reason = reason
249 e0f2bf1e Michele Tartara
      count = count + 1
250 e0f2bf1e Michele Tartara
251 c0f6d0d8 Michael Hanselmann
  def __init__(self, queue, job_id, ops, writable):
252 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
253 ea03467c Iustin Pop

254 ea03467c Iustin Pop
    @type queue: L{JobQueue}
255 ea03467c Iustin Pop
    @param queue: our parent queue
256 ea03467c Iustin Pop
    @type job_id: job_id
257 ea03467c Iustin Pop
    @param job_id: our job id
258 ea03467c Iustin Pop
    @type ops: list
259 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
260 ea03467c Iustin Pop
        in _QueuedOpCodes
261 c0f6d0d8 Michael Hanselmann
    @type writable: bool
262 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
263 ea03467c Iustin Pop

264 ea03467c Iustin Pop
    """
265 e2715f69 Michael Hanselmann
    if not ops:
266 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
267 e2715f69 Michael Hanselmann
268 85f03e0d Michael Hanselmann
    self.queue = queue
269 76b62028 Iustin Pop
    self.id = int(job_id)
270 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
271 653bc0f1 Michele Tartara
    self.AddReasons()
272 6c5a7090 Michael Hanselmann
    self.log_serial = 0
273 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
274 c56ec146 Iustin Pop
    self.start_timestamp = None
275 c56ec146 Iustin Pop
    self.end_timestamp = None
276 8a3cd185 Michael Hanselmann
    self.archived = False
277 76b4ac58 Petr Pudlak
    self.livelock = None
278 76b4ac58 Petr Pudlak
    self.process_id = None
279 6c5a7090 Michael Hanselmann
280 c0f6d0d8 Michael Hanselmann
    self._InitInMemory(self, writable)
281 fa4aa6b4 Michael Hanselmann
282 8a3cd185 Michael Hanselmann
    assert not self.archived, "New jobs can not be marked as archived"
283 8a3cd185 Michael Hanselmann
284 fa4aa6b4 Michael Hanselmann
  @staticmethod
285 c0f6d0d8 Michael Hanselmann
  def _InitInMemory(obj, writable):
286 fa4aa6b4 Michael Hanselmann
    """Initializes in-memory variables.
287 fa4aa6b4 Michael Hanselmann

288 fa4aa6b4 Michael Hanselmann
    """
289 c0f6d0d8 Michael Hanselmann
    obj.writable = writable
290 03b63608 Michael Hanselmann
    obj.ops_iter = None
291 26d3fd2f Michael Hanselmann
    obj.cur_opctx = None
292 f8a4adfa Michael Hanselmann
293 f8a4adfa Michael Hanselmann
    # Read-only jobs are not processed and therefore don't need a lock
294 f8a4adfa Michael Hanselmann
    if writable:
295 f8a4adfa Michael Hanselmann
      obj.processor_lock = threading.Lock()
296 f8a4adfa Michael Hanselmann
    else:
297 f8a4adfa Michael Hanselmann
      obj.processor_lock = None
298 be760ba8 Michael Hanselmann
299 9fa2e150 Michael Hanselmann
  def __repr__(self):
300 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
301 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
302 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
303 9fa2e150 Michael Hanselmann
304 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
305 9fa2e150 Michael Hanselmann
306 f1da30e6 Michael Hanselmann
  @classmethod
307 8a3cd185 Michael Hanselmann
  def Restore(cls, queue, state, writable, archived):
308 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
309 ea03467c Iustin Pop

310 ea03467c Iustin Pop
    @type queue: L{JobQueue}
311 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
312 ea03467c Iustin Pop
    @type state: dict
313 ea03467c Iustin Pop
    @param state: the serialized state
314 c0f6d0d8 Michael Hanselmann
    @type writable: bool
315 c0f6d0d8 Michael Hanselmann
    @param writable: Whether job can be modified
316 8a3cd185 Michael Hanselmann
    @type archived: bool
317 8a3cd185 Michael Hanselmann
    @param archived: Whether job was already archived
318 ea03467c Iustin Pop
    @rtype: _JobQueue
319 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
320 ea03467c Iustin Pop

321 ea03467c Iustin Pop
    """
322 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
323 85f03e0d Michael Hanselmann
    obj.queue = queue
324 76b62028 Iustin Pop
    obj.id = int(state["id"])
325 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
326 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
327 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
328 8a3cd185 Michael Hanselmann
    obj.archived = archived
329 76b4ac58 Petr Pudlak
    obj.livelock = state.get("livelock", None)
330 76b4ac58 Petr Pudlak
    obj.process_id = state.get("process_id", None)
331 76b4ac58 Petr Pudlak
    if obj.process_id is not None:
332 76b4ac58 Petr Pudlak
      obj.process_id = int(obj.process_id)
333 6c5a7090 Michael Hanselmann
334 6c5a7090 Michael Hanselmann
    obj.ops = []
335 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
336 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
337 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
338 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
339 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
340 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
341 6c5a7090 Michael Hanselmann
342 c0f6d0d8 Michael Hanselmann
    cls._InitInMemory(obj, writable)
343 be760ba8 Michael Hanselmann
344 f1da30e6 Michael Hanselmann
    return obj
345 f1da30e6 Michael Hanselmann
346 f1da30e6 Michael Hanselmann
  def Serialize(self):
347 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
348 ea03467c Iustin Pop

349 ea03467c Iustin Pop
    @rtype: dict
350 ea03467c Iustin Pop
    @return: the serialized state
351 ea03467c Iustin Pop

352 ea03467c Iustin Pop
    """
353 f1da30e6 Michael Hanselmann
    return {
354 f1da30e6 Michael Hanselmann
      "id": self.id,
355 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
356 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
357 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
358 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
359 76b4ac58 Petr Pudlak
      "livelock": self.livelock,
360 76b4ac58 Petr Pudlak
      "process_id": self.process_id,
361 f1da30e6 Michael Hanselmann
      }
362 f1da30e6 Michael Hanselmann
363 85f03e0d Michael Hanselmann
  def CalcStatus(self):
364 ea03467c Iustin Pop
    """Compute the status of this job.
365 ea03467c Iustin Pop

366 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
367 ea03467c Iustin Pop
    based on their status, computes the job status.
368 ea03467c Iustin Pop

369 ea03467c Iustin Pop
    The algorithm is:
370 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
371 ea03467c Iustin Pop
        status will be the same
372 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
373 ea03467c Iustin Pop
          - waitlock
374 fbf0262f Michael Hanselmann
          - canceling
375 ea03467c Iustin Pop
          - running
376 ea03467c Iustin Pop

377 ea03467c Iustin Pop
        will determine the job status
378 ea03467c Iustin Pop

379 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
380 ea03467c Iustin Pop
        and the job status will be the same
381 ea03467c Iustin Pop

382 ea03467c Iustin Pop
    @return: the job status
383 ea03467c Iustin Pop

384 ea03467c Iustin Pop
    """
385 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
386 e2715f69 Michael Hanselmann
387 e2715f69 Michael Hanselmann
    all_success = True
388 85f03e0d Michael Hanselmann
    for op in self.ops:
389 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
390 e2715f69 Michael Hanselmann
        continue
391 e2715f69 Michael Hanselmann
392 e2715f69 Michael Hanselmann
      all_success = False
393 e2715f69 Michael Hanselmann
394 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
395 e2715f69 Michael Hanselmann
        pass
396 47099cd1 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING:
397 47099cd1 Michael Hanselmann
        status = constants.JOB_STATUS_WAITING
398 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
399 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
400 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
401 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
402 fbf0262f Michael Hanselmann
        break
403 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
404 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
405 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
406 f1da30e6 Michael Hanselmann
        break
407 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
408 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
409 4cb1d919 Michael Hanselmann
        break
410 e2715f69 Michael Hanselmann
411 e2715f69 Michael Hanselmann
    if all_success:
412 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
413 e2715f69 Michael Hanselmann
414 e2715f69 Michael Hanselmann
    return status
415 e2715f69 Michael Hanselmann
416 8f5c488d Michael Hanselmann
  def CalcPriority(self):
417 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
418 8f5c488d Michael Hanselmann

419 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
420 8f5c488d Michael Hanselmann
    priority is used.
421 8f5c488d Michael Hanselmann

422 8f5c488d Michael Hanselmann
    @rtype: int
423 8f5c488d Michael Hanselmann

424 8f5c488d Michael Hanselmann
    """
425 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
426 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
427 8f5c488d Michael Hanselmann
428 8f5c488d Michael Hanselmann
    if not priorities:
429 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
430 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
431 8f5c488d Michael Hanselmann
432 8f5c488d Michael Hanselmann
    return min(priorities)
433 8f5c488d Michael Hanselmann
434 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
435 ea03467c Iustin Pop
    """Selectively returns the log entries.
436 ea03467c Iustin Pop

437 ea03467c Iustin Pop
    @type newer_than: None or int
438 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
439 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
440 ea03467c Iustin Pop
        than this value
441 ea03467c Iustin Pop
    @rtype: list
442 ea03467c Iustin Pop
    @return: the list of the log entries selected
443 ea03467c Iustin Pop

444 ea03467c Iustin Pop
    """
445 6c5a7090 Michael Hanselmann
    if newer_than is None:
446 6c5a7090 Michael Hanselmann
      serial = -1
447 6c5a7090 Michael Hanselmann
    else:
448 6c5a7090 Michael Hanselmann
      serial = newer_than
449 6c5a7090 Michael Hanselmann
450 6c5a7090 Michael Hanselmann
    entries = []
451 6c5a7090 Michael Hanselmann
    for op in self.ops:
452 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
453 6c5a7090 Michael Hanselmann
454 6c5a7090 Michael Hanselmann
    return entries
455 6c5a7090 Michael Hanselmann
456 6a290889 Guido Trotter
  def GetInfo(self, fields):
457 6a290889 Guido Trotter
    """Returns information about a job.
458 6a290889 Guido Trotter

459 6a290889 Guido Trotter
    @type fields: list
460 6a290889 Guido Trotter
    @param fields: names of fields to return
461 6a290889 Guido Trotter
    @rtype: list
462 6a290889 Guido Trotter
    @return: list with one element for each field
463 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
464 6a290889 Guido Trotter
        has been passed
465 6a290889 Guido Trotter

466 6a290889 Guido Trotter
    """
467 a06c6ae8 Michael Hanselmann
    return _SimpleJobQuery(fields)(self)
468 6a290889 Guido Trotter
469 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
470 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
471 34327f51 Iustin Pop

472 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
473 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
474 34327f51 Iustin Pop
    finalised are not changed.
475 34327f51 Iustin Pop

476 34327f51 Iustin Pop
    @param status: a given opcode status
477 34327f51 Iustin Pop
    @param result: the opcode result
478 34327f51 Iustin Pop

479 34327f51 Iustin Pop
    """
480 747f6113 Michael Hanselmann
    not_marked = True
481 747f6113 Michael Hanselmann
    for op in self.ops:
482 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
483 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
484 747f6113 Michael Hanselmann
        continue
485 747f6113 Michael Hanselmann
      op.status = status
486 747f6113 Michael Hanselmann
      op.result = result
487 747f6113 Michael Hanselmann
      not_marked = False
488 34327f51 Iustin Pop
489 66bd7445 Michael Hanselmann
  def Finalize(self):
490 66bd7445 Michael Hanselmann
    """Marks the job as finalized.
491 66bd7445 Michael Hanselmann

492 66bd7445 Michael Hanselmann
    """
493 66bd7445 Michael Hanselmann
    self.end_timestamp = TimeStampNow()
494 66bd7445 Michael Hanselmann
495 099b2870 Michael Hanselmann
  def Cancel(self):
496 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
497 a0d2fe2c Michael Hanselmann

498 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
499 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
500 a0d2fe2c Michael Hanselmann
      as canceling and a text message
501 a0d2fe2c Michael Hanselmann

502 a0d2fe2c Michael Hanselmann
    """
503 099b2870 Michael Hanselmann
    status = self.CalcStatus()
504 099b2870 Michael Hanselmann
505 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
506 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
507 099b2870 Michael Hanselmann
                             "Job canceled by request")
508 66bd7445 Michael Hanselmann
      self.Finalize()
509 86b16e9d Michael Hanselmann
      return (True, "Job %s canceled" % self.id)
510 099b2870 Michael Hanselmann
511 47099cd1 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITING:
512 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
513 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
514 86b16e9d Michael Hanselmann
      return (True, "Job %s will be canceled" % self.id)
515 099b2870 Michael Hanselmann
516 86b16e9d Michael Hanselmann
    else:
517 86b16e9d Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
518 86b16e9d Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
519 099b2870 Michael Hanselmann
520 4679547e Michael Hanselmann
  def ChangePriority(self, priority):
521 4679547e Michael Hanselmann
    """Changes the job priority.
522 4679547e Michael Hanselmann

523 4679547e Michael Hanselmann
    @type priority: int
524 4679547e Michael Hanselmann
    @param priority: New priority
525 4679547e Michael Hanselmann
    @rtype: tuple; (bool, string)
526 4679547e Michael Hanselmann
    @return: Boolean describing whether job's priority was successfully changed
527 4679547e Michael Hanselmann
      and a text message
528 4679547e Michael Hanselmann

529 4679547e Michael Hanselmann
    """
530 4679547e Michael Hanselmann
    status = self.CalcStatus()
531 4679547e Michael Hanselmann
532 4679547e Michael Hanselmann
    if status in constants.JOBS_FINALIZED:
533 4679547e Michael Hanselmann
      return (False, "Job %s is finished" % self.id)
534 4679547e Michael Hanselmann
    elif status == constants.JOB_STATUS_CANCELING:
535 4679547e Michael Hanselmann
      return (False, "Job %s is cancelling" % self.id)
536 4679547e Michael Hanselmann
    else:
537 4679547e Michael Hanselmann
      assert status in (constants.JOB_STATUS_QUEUED,
538 4679547e Michael Hanselmann
                        constants.JOB_STATUS_WAITING,
539 4679547e Michael Hanselmann
                        constants.JOB_STATUS_RUNNING)
540 4679547e Michael Hanselmann
541 4679547e Michael Hanselmann
      changed = False
542 4679547e Michael Hanselmann
      for op in self.ops:
543 4679547e Michael Hanselmann
        if (op.status == constants.OP_STATUS_RUNNING or
544 4679547e Michael Hanselmann
            op.status in constants.OPS_FINALIZED):
545 4679547e Michael Hanselmann
          assert not changed, \
546 4679547e Michael Hanselmann
            ("Found opcode for which priority should not be changed after"
547 4679547e Michael Hanselmann
             " priority has been changed for previous opcodes")
548 4679547e Michael Hanselmann
          continue
549 4679547e Michael Hanselmann
550 4679547e Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
551 4679547e Michael Hanselmann
                             constants.OP_STATUS_WAITING)
552 4679547e Michael Hanselmann
553 4679547e Michael Hanselmann
        changed = True
554 4679547e Michael Hanselmann
555 3c631ea2 Michael Hanselmann
        # Set new priority (doesn't modify opcode input)
556 4679547e Michael Hanselmann
        op.priority = priority
557 4679547e Michael Hanselmann
558 4679547e Michael Hanselmann
      if changed:
559 4679547e Michael Hanselmann
        return (True, ("Priorities of pending opcodes for job %s have been"
560 4679547e Michael Hanselmann
                       " changed to %s" % (self.id, priority)))
561 4679547e Michael Hanselmann
      else:
562 4679547e Michael Hanselmann
        return (False, "Job %s had no pending opcodes" % self.id)
563 4679547e Michael Hanselmann
564 ea0a6023 Petr Pudlak
  def SetPid(self, pid):
565 ea0a6023 Petr Pudlak
    """Sets the job's process ID
566 ea0a6023 Petr Pudlak

567 ea0a6023 Petr Pudlak
    @type pid: int
568 ea0a6023 Petr Pudlak
    @param pid: the process ID
569 ea0a6023 Petr Pudlak

570 ea0a6023 Petr Pudlak
    """
571 ea0a6023 Petr Pudlak
    status = self.CalcStatus()
572 ea0a6023 Petr Pudlak
573 ea0a6023 Petr Pudlak
    if status in (constants.JOB_STATUS_QUEUED,
574 ea0a6023 Petr Pudlak
                  constants.JOB_STATUS_WAITING):
575 ea0a6023 Petr Pudlak
      if self.process_id is not None:
576 ea0a6023 Petr Pudlak
        logging.warning("Replacing the process id %s of job %s with %s",
577 ea0a6023 Petr Pudlak
                        self.process_id, self.id, pid)
578 ea0a6023 Petr Pudlak
      self.process_id = pid
579 ea0a6023 Petr Pudlak
    else:
580 ea0a6023 Petr Pudlak
      logging.warning("Can set pid only for queued/waiting jobs")
581 ea0a6023 Petr Pudlak
582 f1048938 Iustin Pop
583 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
584 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
585 031a3e57 Michael Hanselmann
    """Initializes this class.
586 ea03467c Iustin Pop

587 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
588 031a3e57 Michael Hanselmann
    @param queue: Job queue
589 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
590 031a3e57 Michael Hanselmann
    @param job: Job object
591 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
592 031a3e57 Michael Hanselmann
    @param op: OpCode
593 031a3e57 Michael Hanselmann

594 031a3e57 Michael Hanselmann
    """
595 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
596 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
597 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
598 031a3e57 Michael Hanselmann
599 031a3e57 Michael Hanselmann
    self._queue = queue
600 031a3e57 Michael Hanselmann
    self._job = job
601 031a3e57 Michael Hanselmann
    self._op = op
602 031a3e57 Michael Hanselmann
603 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
604 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
605 dc1e2262 Michael Hanselmann

606 dc1e2262 Michael Hanselmann
    """
607 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
608 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
609 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
610 dc1e2262 Michael Hanselmann
      raise CancelJob()
611 dc1e2262 Michael Hanselmann
612 942e2262 Michael Hanselmann
    # See if queue is shutting down
613 942e2262 Michael Hanselmann
    if not self._queue.AcceptingJobsUnlocked():
614 942e2262 Michael Hanselmann
      logging.debug("Queue is shutting down")
615 942e2262 Michael Hanselmann
      raise QueueShutdown()
616 942e2262 Michael Hanselmann
617 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
618 031a3e57 Michael Hanselmann
  def NotifyStart(self):
619 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
620 e92376d7 Iustin Pop

621 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
622 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
623 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
624 47099cd1 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
625 e92376d7 Iustin Pop

626 e92376d7 Iustin Pop
    """
627 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
628 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
629 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
630 fbf0262f Michael Hanselmann
631 271daef8 Iustin Pop
    # Cancel here if we were asked to
632 dc1e2262 Michael Hanselmann
    self._CheckCancel()
633 fbf0262f Michael Hanselmann
634 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
635 9bdab621 Michael Hanselmann
636 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
637 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
638 271daef8 Iustin Pop
639 271daef8 Iustin Pop
    # And finally replicate the job status
640 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
641 031a3e57 Michael Hanselmann
642 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
643 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
644 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
645 9bf5e01f Guido Trotter

646 9bf5e01f Guido Trotter
    """
647 9bf5e01f Guido Trotter
    self._job.log_serial += 1
648 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
649 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
650 9bf5e01f Guido Trotter
651 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
652 031a3e57 Michael Hanselmann
    """Append a log entry.
653 031a3e57 Michael Hanselmann

654 031a3e57 Michael Hanselmann
    """
655 031a3e57 Michael Hanselmann
    assert len(args) < 3
656 031a3e57 Michael Hanselmann
657 031a3e57 Michael Hanselmann
    if len(args) == 1:
658 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
659 031a3e57 Michael Hanselmann
      log_msg = args[0]
660 031a3e57 Michael Hanselmann
    else:
661 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
662 031a3e57 Michael Hanselmann
663 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
664 031a3e57 Michael Hanselmann
    # precision.
665 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
666 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
667 031a3e57 Michael Hanselmann
668 e4e59de8 Michael Hanselmann
  def CurrentPriority(self):
669 e4e59de8 Michael Hanselmann
    """Returns current priority for opcode.
670 ef2df7d3 Michael Hanselmann

671 ef2df7d3 Michael Hanselmann
    """
672 47099cd1 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITING,
673 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
674 dc1e2262 Michael Hanselmann
675 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
676 dc1e2262 Michael Hanselmann
    self._CheckCancel()
677 dc1e2262 Michael Hanselmann
678 e4e59de8 Michael Hanselmann
    return self._op.priority
679 e4e59de8 Michael Hanselmann
680 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
681 6a373640 Michael Hanselmann
    """Submits jobs for processing.
682 6a373640 Michael Hanselmann

683 6a373640 Michael Hanselmann
    See L{JobQueue.SubmitManyJobs}.
684 6a373640 Michael Hanselmann

685 6a373640 Michael Hanselmann
    """
686 6a373640 Michael Hanselmann
    # Locking is done in job queue
687 6a373640 Michael Hanselmann
    return self._queue.SubmitManyJobs(jobs)
688 6a373640 Michael Hanselmann
689 031a3e57 Michael Hanselmann
690 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
691 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
692 989a8bee Michael Hanselmann
    """Initializes this class.
693 6c2549d6 Guido Trotter

694 989a8bee Michael Hanselmann
    @type fields: list of strings
695 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
696 989a8bee Michael Hanselmann
    @type prev_job_info: string
697 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
698 989a8bee Michael Hanselmann
    @type prev_log_serial: string
699 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
700 6c2549d6 Guido Trotter

701 989a8bee Michael Hanselmann
    """
702 dc2879ea Michael Hanselmann
    self._squery = _SimpleJobQuery(fields)
703 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
704 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
705 6c2549d6 Guido Trotter
706 989a8bee Michael Hanselmann
  def __call__(self, job):
707 989a8bee Michael Hanselmann
    """Checks whether job has changed.
708 6c2549d6 Guido Trotter

709 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
710 989a8bee Michael Hanselmann
    @param job: Job object
711 6c2549d6 Guido Trotter

712 6c2549d6 Guido Trotter
    """
713 c0f6d0d8 Michael Hanselmann
    assert not job.writable, "Expected read-only job"
714 c0f6d0d8 Michael Hanselmann
715 989a8bee Michael Hanselmann
    status = job.CalcStatus()
716 dc2879ea Michael Hanselmann
    job_info = self._squery(job)
717 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
718 6c2549d6 Guido Trotter
719 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
720 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
721 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
722 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
723 6c2549d6 Guido Trotter
    # significantly different.
724 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
725 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
726 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
727 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
728 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
729 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
730 6c2549d6 Guido Trotter
731 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
732 6c2549d6 Guido Trotter
    # no changes.
733 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
734 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
735 47099cd1 Michael Hanselmann
                       constants.JOB_STATUS_WAITING) or
736 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
737 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
738 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
739 989a8bee Michael Hanselmann
      return (job_info, log_entries)
740 6c2549d6 Guido Trotter
741 989a8bee Michael Hanselmann
    return None
742 989a8bee Michael Hanselmann
743 989a8bee Michael Hanselmann
744 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
745 383477e9 Michael Hanselmann
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
746 989a8bee Michael Hanselmann
    """Initializes this class.
747 989a8bee Michael Hanselmann

748 989a8bee Michael Hanselmann
    @type filename: string
749 989a8bee Michael Hanselmann
    @param filename: Path to job file
750 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
751 6c2549d6 Guido Trotter

752 989a8bee Michael Hanselmann
    """
753 383477e9 Michael Hanselmann
    self._wm = _inotify_wm_cls()
754 989a8bee Michael Hanselmann
    self._inotify_handler = \
755 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
756 989a8bee Michael Hanselmann
    self._notifier = \
757 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
758 989a8bee Michael Hanselmann
    try:
759 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
760 989a8bee Michael Hanselmann
    except Exception:
761 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
762 989a8bee Michael Hanselmann
      self._notifier.stop()
763 989a8bee Michael Hanselmann
      raise
764 989a8bee Michael Hanselmann
765 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
766 989a8bee Michael Hanselmann
    """Callback for inotify.
767 989a8bee Michael Hanselmann

768 989a8bee Michael Hanselmann
    """
769 6c2549d6 Guido Trotter
    if not notifier_enabled:
770 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
771 989a8bee Michael Hanselmann
772 989a8bee Michael Hanselmann
  def Wait(self, timeout):
773 989a8bee Michael Hanselmann
    """Waits for the job file to change.
774 989a8bee Michael Hanselmann

775 989a8bee Michael Hanselmann
    @type timeout: float
776 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
777 989a8bee Michael Hanselmann
    @return: Whether there have been events
778 989a8bee Michael Hanselmann

779 989a8bee Michael Hanselmann
    """
780 989a8bee Michael Hanselmann
    assert timeout >= 0
781 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
782 989a8bee Michael Hanselmann
    if have_events:
783 989a8bee Michael Hanselmann
      self._notifier.read_events()
784 989a8bee Michael Hanselmann
    self._notifier.process_events()
785 989a8bee Michael Hanselmann
    return have_events
786 989a8bee Michael Hanselmann
787 989a8bee Michael Hanselmann
  def Close(self):
788 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
789 989a8bee Michael Hanselmann

790 989a8bee Michael Hanselmann
    """
791 989a8bee Michael Hanselmann
    self._notifier.stop()
792 989a8bee Michael Hanselmann
793 989a8bee Michael Hanselmann
794 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
795 383477e9 Michael Hanselmann
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
796 989a8bee Michael Hanselmann
    """Initializes this class.
797 989a8bee Michael Hanselmann

798 989a8bee Michael Hanselmann
    @type filename: string
799 989a8bee Michael Hanselmann
    @param filename: Path to job file
800 989a8bee Michael Hanselmann

801 989a8bee Michael Hanselmann
    """
802 989a8bee Michael Hanselmann
    self._filewaiter = None
803 989a8bee Michael Hanselmann
    self._filename = filename
804 383477e9 Michael Hanselmann
    self._waiter_cls = _waiter_cls
805 6c2549d6 Guido Trotter
806 989a8bee Michael Hanselmann
  def Wait(self, timeout):
807 989a8bee Michael Hanselmann
    """Waits for a job to change.
808 6c2549d6 Guido Trotter

809 989a8bee Michael Hanselmann
    @type timeout: float
810 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
811 989a8bee Michael Hanselmann
    @return: Whether there have been events
812 989a8bee Michael Hanselmann

813 989a8bee Michael Hanselmann
    """
814 989a8bee Michael Hanselmann
    if self._filewaiter:
815 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
816 989a8bee Michael Hanselmann
817 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
818 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
819 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
820 989a8bee Michael Hanselmann
    # race condition.
821 383477e9 Michael Hanselmann
    self._filewaiter = self._waiter_cls(self._filename)
822 989a8bee Michael Hanselmann
823 989a8bee Michael Hanselmann
    return True
824 989a8bee Michael Hanselmann
825 989a8bee Michael Hanselmann
  def Close(self):
826 989a8bee Michael Hanselmann
    """Closes underlying waiter.
827 989a8bee Michael Hanselmann

828 989a8bee Michael Hanselmann
    """
829 989a8bee Michael Hanselmann
    if self._filewaiter:
830 989a8bee Michael Hanselmann
      self._filewaiter.Close()
831 989a8bee Michael Hanselmann
832 989a8bee Michael Hanselmann
833 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
834 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
835 989a8bee Michael Hanselmann

836 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
837 989a8bee Michael Hanselmann
  the current job status has changed.
838 989a8bee Michael Hanselmann

839 989a8bee Michael Hanselmann
  """
840 989a8bee Michael Hanselmann
  @staticmethod
841 dfc8824a Michael Hanselmann
  def _CheckForChanges(counter, job_load_fn, check_fn):
842 dfc8824a Michael Hanselmann
    if counter.next() > 0:
843 dfc8824a Michael Hanselmann
      # If this isn't the first check the job is given some more time to change
844 dfc8824a Michael Hanselmann
      # again. This gives better performance for jobs generating many
845 dfc8824a Michael Hanselmann
      # changes/messages.
846 dfc8824a Michael Hanselmann
      time.sleep(0.1)
847 dfc8824a Michael Hanselmann
848 989a8bee Michael Hanselmann
    job = job_load_fn()
849 989a8bee Michael Hanselmann
    if not job:
850 989a8bee Michael Hanselmann
      raise errors.JobLost()
851 989a8bee Michael Hanselmann
852 989a8bee Michael Hanselmann
    result = check_fn(job)
853 989a8bee Michael Hanselmann
    if result is None:
854 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
855 989a8bee Michael Hanselmann
856 989a8bee Michael Hanselmann
    return result
857 989a8bee Michael Hanselmann
858 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
859 383477e9 Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout,
860 383477e9 Michael Hanselmann
               _waiter_cls=_JobChangesWaiter):
861 989a8bee Michael Hanselmann
    """Waits for changes on a job.
862 989a8bee Michael Hanselmann

863 989a8bee Michael Hanselmann
    @type filename: string
864 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
865 989a8bee Michael Hanselmann
    @type job_load_fn: callable
866 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
867 989a8bee Michael Hanselmann
    @type fields: list of strings
868 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
869 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
870 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
871 989a8bee Michael Hanselmann
    @type prev_log_serial: int
872 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
873 989a8bee Michael Hanselmann
    @type timeout: float
874 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
875 989a8bee Michael Hanselmann

876 989a8bee Michael Hanselmann
    """
877 dfc8824a Michael Hanselmann
    counter = itertools.count()
878 6c2549d6 Guido Trotter
    try:
879 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
880 383477e9 Michael Hanselmann
      waiter = _waiter_cls(filename)
881 989a8bee Michael Hanselmann
      try:
882 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
883 dfc8824a Michael Hanselmann
                                          counter, job_load_fn, check_fn),
884 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
885 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
886 989a8bee Michael Hanselmann
      finally:
887 989a8bee Michael Hanselmann
        waiter.Close()
888 383477e9 Michael Hanselmann
    except errors.JobLost:
889 6c2549d6 Guido Trotter
      return None
890 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
891 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
892 6c2549d6 Guido Trotter
893 6c2549d6 Guido Trotter
894 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
895 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
896 6760e4ed Michael Hanselmann

897 6760e4ed Michael Hanselmann
  """
898 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
899 6760e4ed Michael Hanselmann
    to_encode = err
900 6760e4ed Michael Hanselmann
  else:
901 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
902 6760e4ed Michael Hanselmann
903 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
904 6760e4ed Michael Hanselmann
905 6760e4ed Michael Hanselmann
906 26d3fd2f Michael Hanselmann
class _TimeoutStrategyWrapper:
907 26d3fd2f Michael Hanselmann
  def __init__(self, fn):
908 26d3fd2f Michael Hanselmann
    """Initializes this class.
909 26d3fd2f Michael Hanselmann

910 26d3fd2f Michael Hanselmann
    """
911 26d3fd2f Michael Hanselmann
    self._fn = fn
912 26d3fd2f Michael Hanselmann
    self._next = None
913 26d3fd2f Michael Hanselmann
914 26d3fd2f Michael Hanselmann
  def _Advance(self):
915 26d3fd2f Michael Hanselmann
    """Gets the next timeout if necessary.
916 26d3fd2f Michael Hanselmann

917 26d3fd2f Michael Hanselmann
    """
918 26d3fd2f Michael Hanselmann
    if self._next is None:
919 26d3fd2f Michael Hanselmann
      self._next = self._fn()
920 26d3fd2f Michael Hanselmann
921 26d3fd2f Michael Hanselmann
  def Peek(self):
922 26d3fd2f Michael Hanselmann
    """Returns the next timeout.
923 26d3fd2f Michael Hanselmann

924 26d3fd2f Michael Hanselmann
    """
925 26d3fd2f Michael Hanselmann
    self._Advance()
926 26d3fd2f Michael Hanselmann
    return self._next
927 26d3fd2f Michael Hanselmann
928 26d3fd2f Michael Hanselmann
  def Next(self):
929 26d3fd2f Michael Hanselmann
    """Returns the current timeout and advances the internal state.
930 26d3fd2f Michael Hanselmann

931 26d3fd2f Michael Hanselmann
    """
932 26d3fd2f Michael Hanselmann
    self._Advance()
933 26d3fd2f Michael Hanselmann
    result = self._next
934 26d3fd2f Michael Hanselmann
    self._next = None
935 26d3fd2f Michael Hanselmann
    return result
936 26d3fd2f Michael Hanselmann
937 26d3fd2f Michael Hanselmann
938 b80cc518 Michael Hanselmann
class _OpExecContext:
939 26d3fd2f Michael Hanselmann
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
940 b80cc518 Michael Hanselmann
    """Initializes this class.
941 b80cc518 Michael Hanselmann

942 b80cc518 Michael Hanselmann
    """
943 b80cc518 Michael Hanselmann
    self.op = op
944 b80cc518 Michael Hanselmann
    self.index = index
945 b80cc518 Michael Hanselmann
    self.log_prefix = log_prefix
946 b80cc518 Michael Hanselmann
    self.summary = op.input.Summary()
947 b80cc518 Michael Hanselmann
948 b95479a5 Michael Hanselmann
    # Create local copy to modify
949 580b1fdd Jose A. Lopes
    if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
950 b95479a5 Michael Hanselmann
      self.jobdeps = op.input.depends[:]
951 b95479a5 Michael Hanselmann
    else:
952 b95479a5 Michael Hanselmann
      self.jobdeps = None
953 b95479a5 Michael Hanselmann
954 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = timeout_strategy_factory
955 26d3fd2f Michael Hanselmann
    self._ResetTimeoutStrategy()
956 26d3fd2f Michael Hanselmann
957 26d3fd2f Michael Hanselmann
  def _ResetTimeoutStrategy(self):
958 26d3fd2f Michael Hanselmann
    """Creates a new timeout strategy.
959 26d3fd2f Michael Hanselmann

960 26d3fd2f Michael Hanselmann
    """
961 26d3fd2f Michael Hanselmann
    self._timeout_strategy = \
962 26d3fd2f Michael Hanselmann
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
963 26d3fd2f Michael Hanselmann
964 26d3fd2f Michael Hanselmann
  def CheckPriorityIncrease(self):
965 26d3fd2f Michael Hanselmann
    """Checks whether priority can and should be increased.
966 26d3fd2f Michael Hanselmann

967 26d3fd2f Michael Hanselmann
    Called when locks couldn't be acquired.
968 26d3fd2f Michael Hanselmann

969 26d3fd2f Michael Hanselmann
    """
970 26d3fd2f Michael Hanselmann
    op = self.op
971 26d3fd2f Michael Hanselmann
972 26d3fd2f Michael Hanselmann
    # Exhausted all retries and next round should not use blocking acquire
973 26d3fd2f Michael Hanselmann
    # for locks?
974 26d3fd2f Michael Hanselmann
    if (self._timeout_strategy.Peek() is None and
975 26d3fd2f Michael Hanselmann
        op.priority > constants.OP_PRIO_HIGHEST):
976 26d3fd2f Michael Hanselmann
      logging.debug("Increasing priority")
977 26d3fd2f Michael Hanselmann
      op.priority -= 1
978 26d3fd2f Michael Hanselmann
      self._ResetTimeoutStrategy()
979 26d3fd2f Michael Hanselmann
      return True
980 26d3fd2f Michael Hanselmann
981 26d3fd2f Michael Hanselmann
    return False
982 26d3fd2f Michael Hanselmann
983 26d3fd2f Michael Hanselmann
  def GetNextLockTimeout(self):
984 26d3fd2f Michael Hanselmann
    """Returns the next lock acquire timeout.
985 26d3fd2f Michael Hanselmann

986 26d3fd2f Michael Hanselmann
    """
987 26d3fd2f Michael Hanselmann
    return self._timeout_strategy.Next()
988 26d3fd2f Michael Hanselmann
989 b80cc518 Michael Hanselmann
990 be760ba8 Michael Hanselmann
class _JobProcessor(object):
991 75d81fc8 Michael Hanselmann
  (DEFER,
992 75d81fc8 Michael Hanselmann
   WAITDEP,
993 75d81fc8 Michael Hanselmann
   FINISHED) = range(1, 4)
994 75d81fc8 Michael Hanselmann
995 26d3fd2f Michael Hanselmann
  def __init__(self, queue, opexec_fn, job,
996 26d3fd2f Michael Hanselmann
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
997 be760ba8 Michael Hanselmann
    """Initializes this class.
998 be760ba8 Michael Hanselmann

999 be760ba8 Michael Hanselmann
    """
1000 be760ba8 Michael Hanselmann
    self.queue = queue
1001 be760ba8 Michael Hanselmann
    self.opexec_fn = opexec_fn
1002 be760ba8 Michael Hanselmann
    self.job = job
1003 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = _timeout_strategy_factory
1004 be760ba8 Michael Hanselmann
1005 be760ba8 Michael Hanselmann
  @staticmethod
1006 26d3fd2f Michael Hanselmann
  def _FindNextOpcode(job, timeout_strategy_factory):
1007 be760ba8 Michael Hanselmann
    """Locates the next opcode to run.
1008 be760ba8 Michael Hanselmann

1009 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
1010 be760ba8 Michael Hanselmann
    @param job: Job object
1011 26d3fd2f Michael Hanselmann
    @param timeout_strategy_factory: Callable to create new timeout strategy
1012 be760ba8 Michael Hanselmann

1013 be760ba8 Michael Hanselmann
    """
1014 be760ba8 Michael Hanselmann
    # Create some sort of a cache to speed up locating next opcode for future
1015 be760ba8 Michael Hanselmann
    # lookups
1016 be760ba8 Michael Hanselmann
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
1017 be760ba8 Michael Hanselmann
    # pending and one for processed ops.
1018 03b63608 Michael Hanselmann
    if job.ops_iter is None:
1019 03b63608 Michael Hanselmann
      job.ops_iter = enumerate(job.ops)
1020 be760ba8 Michael Hanselmann
1021 be760ba8 Michael Hanselmann
    # Find next opcode to run
1022 be760ba8 Michael Hanselmann
    while True:
1023 be760ba8 Michael Hanselmann
      try:
1024 03b63608 Michael Hanselmann
        (idx, op) = job.ops_iter.next()
1025 be760ba8 Michael Hanselmann
      except StopIteration:
1026 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for a finished job")
1027 be760ba8 Michael Hanselmann
1028 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_RUNNING:
1029 be760ba8 Michael Hanselmann
        # Found an opcode already marked as running
1030 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for job marked as running")
1031 be760ba8 Michael Hanselmann
1032 26d3fd2f Michael Hanselmann
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
1033 26d3fd2f Michael Hanselmann
                             timeout_strategy_factory)
1034 be760ba8 Michael Hanselmann
1035 66bd7445 Michael Hanselmann
      if op.status not in constants.OPS_FINALIZED:
1036 66bd7445 Michael Hanselmann
        return opctx
1037 be760ba8 Michael Hanselmann
1038 66bd7445 Michael Hanselmann
      # This is a job that was partially completed before master daemon
1039 66bd7445 Michael Hanselmann
      # shutdown, so it can be expected that some opcodes are already
1040 66bd7445 Michael Hanselmann
      # completed successfully (if any did error out, then the whole job
1041 66bd7445 Michael Hanselmann
      # should have been aborted and not resubmitted for processing).
1042 66bd7445 Michael Hanselmann
      logging.info("%s: opcode %s already processed, skipping",
1043 66bd7445 Michael Hanselmann
                   opctx.log_prefix, opctx.summary)
1044 be760ba8 Michael Hanselmann
1045 be760ba8 Michael Hanselmann
  @staticmethod
1046 be760ba8 Michael Hanselmann
  def _MarkWaitlock(job, op):
1047 be760ba8 Michael Hanselmann
    """Marks an opcode as waiting for locks.
1048 be760ba8 Michael Hanselmann

1049 be760ba8 Michael Hanselmann
    The job's start timestamp is also set if necessary.
1050 be760ba8 Michael Hanselmann

1051 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
1052 be760ba8 Michael Hanselmann
    @param job: Job object
1053 a38e8674 Michael Hanselmann
    @type op: L{_QueuedOpCode}
1054 a38e8674 Michael Hanselmann
    @param op: Opcode object
1055 be760ba8 Michael Hanselmann

1056 be760ba8 Michael Hanselmann
    """
1057 be760ba8 Michael Hanselmann
    assert op in job.ops
1058 5fd6b694 Michael Hanselmann
    assert op.status in (constants.OP_STATUS_QUEUED,
1059 47099cd1 Michael Hanselmann
                         constants.OP_STATUS_WAITING)
1060 5fd6b694 Michael Hanselmann
1061 5fd6b694 Michael Hanselmann
    update = False
1062 be760ba8 Michael Hanselmann
1063 be760ba8 Michael Hanselmann
    op.result = None
1064 5fd6b694 Michael Hanselmann
1065 5fd6b694 Michael Hanselmann
    if op.status == constants.OP_STATUS_QUEUED:
1066 47099cd1 Michael Hanselmann
      op.status = constants.OP_STATUS_WAITING
1067 5fd6b694 Michael Hanselmann
      update = True
1068 5fd6b694 Michael Hanselmann
1069 5fd6b694 Michael Hanselmann
    if op.start_timestamp is None:
1070 5fd6b694 Michael Hanselmann
      op.start_timestamp = TimeStampNow()
1071 5fd6b694 Michael Hanselmann
      update = True
1072 be760ba8 Michael Hanselmann
1073 be760ba8 Michael Hanselmann
    if job.start_timestamp is None:
1074 be760ba8 Michael Hanselmann
      job.start_timestamp = op.start_timestamp
1075 5fd6b694 Michael Hanselmann
      update = True
1076 5fd6b694 Michael Hanselmann
1077 47099cd1 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITING
1078 5fd6b694 Michael Hanselmann
1079 5fd6b694 Michael Hanselmann
    return update
1080 be760ba8 Michael Hanselmann
1081 b95479a5 Michael Hanselmann
  @staticmethod
1082 b95479a5 Michael Hanselmann
  def _CheckDependencies(queue, job, opctx):
1083 b95479a5 Michael Hanselmann
    """Checks if an opcode has dependencies and if so, processes them.
1084 b95479a5 Michael Hanselmann

1085 b95479a5 Michael Hanselmann
    @type queue: L{JobQueue}
1086 b95479a5 Michael Hanselmann
    @param queue: Queue object
1087 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1088 b95479a5 Michael Hanselmann
    @param job: Job object
1089 b95479a5 Michael Hanselmann
    @type opctx: L{_OpExecContext}
1090 b95479a5 Michael Hanselmann
    @param opctx: Opcode execution context
1091 b95479a5 Michael Hanselmann
    @rtype: bool
1092 b95479a5 Michael Hanselmann
    @return: Whether opcode will be re-scheduled by dependency tracker
1093 b95479a5 Michael Hanselmann

1094 b95479a5 Michael Hanselmann
    """
1095 b95479a5 Michael Hanselmann
    op = opctx.op
1096 b95479a5 Michael Hanselmann
1097 b95479a5 Michael Hanselmann
    result = False
1098 b95479a5 Michael Hanselmann
1099 b95479a5 Michael Hanselmann
    while opctx.jobdeps:
1100 b95479a5 Michael Hanselmann
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1101 b95479a5 Michael Hanselmann
1102 b95479a5 Michael Hanselmann
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1103 b95479a5 Michael Hanselmann
                                                          dep_status)
1104 b95479a5 Michael Hanselmann
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1105 b95479a5 Michael Hanselmann
1106 b95479a5 Michael Hanselmann
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1107 b95479a5 Michael Hanselmann
1108 b95479a5 Michael Hanselmann
      if depresult == _JobDependencyManager.CONTINUE:
1109 b95479a5 Michael Hanselmann
        # Remove dependency and continue
1110 b95479a5 Michael Hanselmann
        opctx.jobdeps.pop(0)
1111 b95479a5 Michael Hanselmann
1112 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.WAIT:
1113 b95479a5 Michael Hanselmann
        # Need to wait for notification, dependency tracker will re-add job
1114 b95479a5 Michael Hanselmann
        # to workerpool
1115 b95479a5 Michael Hanselmann
        result = True
1116 b95479a5 Michael Hanselmann
        break
1117 b95479a5 Michael Hanselmann
1118 b95479a5 Michael Hanselmann
      elif depresult == _JobDependencyManager.CANCEL:
1119 b95479a5 Michael Hanselmann
        # Job was cancelled, cancel this job as well
1120 b95479a5 Michael Hanselmann
        job.Cancel()
1121 b95479a5 Michael Hanselmann
        assert op.status == constants.OP_STATUS_CANCELING
1122 b95479a5 Michael Hanselmann
        break
1123 b95479a5 Michael Hanselmann
1124 b95479a5 Michael Hanselmann
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1125 b95479a5 Michael Hanselmann
                         _JobDependencyManager.ERROR):
1126 b95479a5 Michael Hanselmann
        # Job failed or there was an error, this job must fail
1127 b95479a5 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
1128 b95479a5 Michael Hanselmann
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1129 b95479a5 Michael Hanselmann
        break
1130 b95479a5 Michael Hanselmann
1131 b95479a5 Michael Hanselmann
      else:
1132 b95479a5 Michael Hanselmann
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1133 b95479a5 Michael Hanselmann
                                     depresult)
1134 b95479a5 Michael Hanselmann
1135 b95479a5 Michael Hanselmann
    return result
1136 b95479a5 Michael Hanselmann
1137 b80cc518 Michael Hanselmann
  def _ExecOpCodeUnlocked(self, opctx):
1138 be760ba8 Michael Hanselmann
    """Processes one opcode and returns the result.
1139 be760ba8 Michael Hanselmann

1140 be760ba8 Michael Hanselmann
    """
1141 b80cc518 Michael Hanselmann
    op = opctx.op
1142 b80cc518 Michael Hanselmann
1143 e6e17529 Hrvoje Ribicic
    assert op.status in (constants.OP_STATUS_WAITING,
1144 e6e17529 Hrvoje Ribicic
                         constants.OP_STATUS_CANCELING)
1145 e6e17529 Hrvoje Ribicic
1146 e6e17529 Hrvoje Ribicic
    # The very last check if the job was cancelled before trying to execute
1147 e6e17529 Hrvoje Ribicic
    if op.status == constants.OP_STATUS_CANCELING:
1148 e6e17529 Hrvoje Ribicic
      return (constants.OP_STATUS_CANCELING, None)
1149 be760ba8 Michael Hanselmann
1150 26d3fd2f Michael Hanselmann
    timeout = opctx.GetNextLockTimeout()
1151 26d3fd2f Michael Hanselmann
1152 be760ba8 Michael Hanselmann
    try:
1153 be760ba8 Michael Hanselmann
      # Make sure not to hold queue lock while calling ExecOpCode
1154 be760ba8 Michael Hanselmann
      result = self.opexec_fn(op.input,
1155 26d3fd2f Michael Hanselmann
                              _OpExecCallbacks(self.queue, self.job, op),
1156 e4e59de8 Michael Hanselmann
                              timeout=timeout)
1157 26d3fd2f Michael Hanselmann
    except mcpu.LockAcquireTimeout:
1158 26d3fd2f Michael Hanselmann
      assert timeout is not None, "Received timeout for blocking acquire"
1159 26d3fd2f Michael Hanselmann
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1160 9e49dfc5 Michael Hanselmann
1161 47099cd1 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_WAITING,
1162 9e49dfc5 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1163 9e49dfc5 Michael Hanselmann
1164 9e49dfc5 Michael Hanselmann
      # Was job cancelled while we were waiting for the lock?
1165 9e49dfc5 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELING:
1166 9e49dfc5 Michael Hanselmann
        return (constants.OP_STATUS_CANCELING, None)
1167 9e49dfc5 Michael Hanselmann
1168 942e2262 Michael Hanselmann
      # Queue is shutting down, return to queued
1169 942e2262 Michael Hanselmann
      if not self.queue.AcceptingJobsUnlocked():
1170 942e2262 Michael Hanselmann
        return (constants.OP_STATUS_QUEUED, None)
1171 942e2262 Michael Hanselmann
1172 5fd6b694 Michael Hanselmann
      # Stay in waitlock while trying to re-acquire lock
1173 47099cd1 Michael Hanselmann
      return (constants.OP_STATUS_WAITING, None)
1174 be760ba8 Michael Hanselmann
    except CancelJob:
1175 b80cc518 Michael Hanselmann
      logging.exception("%s: Canceling job", opctx.log_prefix)
1176 be760ba8 Michael Hanselmann
      assert op.status == constants.OP_STATUS_CANCELING
1177 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_CANCELING, None)
1178 942e2262 Michael Hanselmann
1179 942e2262 Michael Hanselmann
    except QueueShutdown:
1180 942e2262 Michael Hanselmann
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1181 942e2262 Michael Hanselmann
1182 942e2262 Michael Hanselmann
      assert op.status == constants.OP_STATUS_WAITING
1183 942e2262 Michael Hanselmann
1184 942e2262 Michael Hanselmann
      # Job hadn't been started yet, so it should return to the queue
1185 942e2262 Michael Hanselmann
      return (constants.OP_STATUS_QUEUED, None)
1186 942e2262 Michael Hanselmann
1187 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
1188 b80cc518 Michael Hanselmann
      logging.exception("%s: Caught exception in %s",
1189 b80cc518 Michael Hanselmann
                        opctx.log_prefix, opctx.summary)
1190 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1191 be760ba8 Michael Hanselmann
    else:
1192 b80cc518 Michael Hanselmann
      logging.debug("%s: %s successful",
1193 b80cc518 Michael Hanselmann
                    opctx.log_prefix, opctx.summary)
1194 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_SUCCESS, result)
1195 be760ba8 Michael Hanselmann
1196 26d3fd2f Michael Hanselmann
  def __call__(self, _nextop_fn=None):
1197 be760ba8 Michael Hanselmann
    """Continues execution of a job.
1198 be760ba8 Michael Hanselmann

1199 26d3fd2f Michael Hanselmann
    @param _nextop_fn: Callback function for tests
1200 75d81fc8 Michael Hanselmann
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1201 75d81fc8 Michael Hanselmann
      be deferred and C{WAITDEP} if the dependency manager
1202 75d81fc8 Michael Hanselmann
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1203 be760ba8 Michael Hanselmann

1204 be760ba8 Michael Hanselmann
    """
1205 be760ba8 Michael Hanselmann
    queue = self.queue
1206 be760ba8 Michael Hanselmann
    job = self.job
1207 be760ba8 Michael Hanselmann
1208 be760ba8 Michael Hanselmann
    logging.debug("Processing job %s", job.id)
1209 be760ba8 Michael Hanselmann
1210 be760ba8 Michael Hanselmann
    queue.acquire(shared=1)
1211 be760ba8 Michael Hanselmann
    try:
1212 be760ba8 Michael Hanselmann
      opcount = len(job.ops)
1213 be760ba8 Michael Hanselmann
1214 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Expected writable job"
1215 c0f6d0d8 Michael Hanselmann
1216 66bd7445 Michael Hanselmann
      # Don't do anything for finalized jobs
1217 66bd7445 Michael Hanselmann
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1218 75d81fc8 Michael Hanselmann
        return self.FINISHED
1219 66bd7445 Michael Hanselmann
1220 26d3fd2f Michael Hanselmann
      # Is a previous opcode still pending?
1221 26d3fd2f Michael Hanselmann
      if job.cur_opctx:
1222 26d3fd2f Michael Hanselmann
        opctx = job.cur_opctx
1223 5fd6b694 Michael Hanselmann
        job.cur_opctx = None
1224 26d3fd2f Michael Hanselmann
      else:
1225 26d3fd2f Michael Hanselmann
        if __debug__ and _nextop_fn:
1226 26d3fd2f Michael Hanselmann
          _nextop_fn()
1227 26d3fd2f Michael Hanselmann
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1228 26d3fd2f Michael Hanselmann
1229 b80cc518 Michael Hanselmann
      op = opctx.op
1230 be760ba8 Michael Hanselmann
1231 be760ba8 Michael Hanselmann
      # Consistency check
1232 be760ba8 Michael Hanselmann
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1233 66bd7445 Michael Hanselmann
                                     constants.OP_STATUS_CANCELING)
1234 5fd6b694 Michael Hanselmann
                        for i in job.ops[opctx.index + 1:])
1235 be760ba8 Michael Hanselmann
1236 be760ba8 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_QUEUED,
1237 47099cd1 Michael Hanselmann
                           constants.OP_STATUS_WAITING,
1238 66bd7445 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
1239 be760ba8 Michael Hanselmann
1240 26d3fd2f Michael Hanselmann
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1241 26d3fd2f Michael Hanselmann
              op.priority >= constants.OP_PRIO_HIGHEST)
1242 26d3fd2f Michael Hanselmann
1243 b95479a5 Michael Hanselmann
      waitjob = None
1244 b95479a5 Michael Hanselmann
1245 66bd7445 Michael Hanselmann
      if op.status != constants.OP_STATUS_CANCELING:
1246 30c945d0 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
1247 47099cd1 Michael Hanselmann
                             constants.OP_STATUS_WAITING)
1248 30c945d0 Michael Hanselmann
1249 be760ba8 Michael Hanselmann
        # Prepare to start opcode
1250 5fd6b694 Michael Hanselmann
        if self._MarkWaitlock(job, op):
1251 5fd6b694 Michael Hanselmann
          # Write to disk
1252 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1253 be760ba8 Michael Hanselmann
1254 47099cd1 Michael Hanselmann
        assert op.status == constants.OP_STATUS_WAITING
1255 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1256 5fd6b694 Michael Hanselmann
        assert job.start_timestamp and op.start_timestamp
1257 b95479a5 Michael Hanselmann
        assert waitjob is None
1258 b95479a5 Michael Hanselmann
1259 b95479a5 Michael Hanselmann
        # Check if waiting for a job is necessary
1260 b95479a5 Michael Hanselmann
        waitjob = self._CheckDependencies(queue, job, opctx)
1261 be760ba8 Michael Hanselmann
1262 47099cd1 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_WAITING,
1263 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_CANCELING,
1264 b95479a5 Michael Hanselmann
                             constants.OP_STATUS_ERROR)
1265 be760ba8 Michael Hanselmann
1266 b95479a5 Michael Hanselmann
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1267 b95479a5 Michael Hanselmann
                                         constants.OP_STATUS_ERROR)):
1268 b95479a5 Michael Hanselmann
          logging.info("%s: opcode %s waiting for locks",
1269 b95479a5 Michael Hanselmann
                       opctx.log_prefix, opctx.summary)
1270 be760ba8 Michael Hanselmann
1271 b95479a5 Michael Hanselmann
          assert not opctx.jobdeps, "Not all dependencies were removed"
1272 b95479a5 Michael Hanselmann
1273 b95479a5 Michael Hanselmann
          queue.release()
1274 b95479a5 Michael Hanselmann
          try:
1275 b95479a5 Michael Hanselmann
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1276 b95479a5 Michael Hanselmann
          finally:
1277 b95479a5 Michael Hanselmann
            queue.acquire(shared=1)
1278 b95479a5 Michael Hanselmann
1279 b95479a5 Michael Hanselmann
          op.status = op_status
1280 b95479a5 Michael Hanselmann
          op.result = op_result
1281 b95479a5 Michael Hanselmann
1282 b95479a5 Michael Hanselmann
          assert not waitjob
1283 be760ba8 Michael Hanselmann
1284 942e2262 Michael Hanselmann
        if op.status in (constants.OP_STATUS_WAITING,
1285 942e2262 Michael Hanselmann
                         constants.OP_STATUS_QUEUED):
1286 942e2262 Michael Hanselmann
          # waiting: Couldn't get locks in time
1287 942e2262 Michael Hanselmann
          # queued: Queue is shutting down
1288 26d3fd2f Michael Hanselmann
          assert not op.end_timestamp
1289 be760ba8 Michael Hanselmann
        else:
1290 26d3fd2f Michael Hanselmann
          # Finalize opcode
1291 26d3fd2f Michael Hanselmann
          op.end_timestamp = TimeStampNow()
1292 be760ba8 Michael Hanselmann
1293 26d3fd2f Michael Hanselmann
          if op.status == constants.OP_STATUS_CANCELING:
1294 26d3fd2f Michael Hanselmann
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1295 26d3fd2f Michael Hanselmann
                                  for i in job.ops[opctx.index:])
1296 26d3fd2f Michael Hanselmann
          else:
1297 26d3fd2f Michael Hanselmann
            assert op.status in constants.OPS_FINALIZED
1298 be760ba8 Michael Hanselmann
1299 942e2262 Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
1300 942e2262 Michael Hanselmann
        # Queue is shutting down
1301 942e2262 Michael Hanselmann
        assert not waitjob
1302 942e2262 Michael Hanselmann
1303 942e2262 Michael Hanselmann
        finalize = False
1304 942e2262 Michael Hanselmann
1305 942e2262 Michael Hanselmann
        # Reset context
1306 942e2262 Michael Hanselmann
        job.cur_opctx = None
1307 942e2262 Michael Hanselmann
1308 942e2262 Michael Hanselmann
        # In no case must the status be finalized here
1309 942e2262 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1310 942e2262 Michael Hanselmann
1311 942e2262 Michael Hanselmann
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1312 be760ba8 Michael Hanselmann
        finalize = False
1313 be760ba8 Michael Hanselmann
1314 b95479a5 Michael Hanselmann
        if not waitjob and opctx.CheckPriorityIncrease():
1315 5fd6b694 Michael Hanselmann
          # Priority was changed, need to update on-disk file
1316 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1317 be760ba8 Michael Hanselmann
1318 26d3fd2f Michael Hanselmann
        # Keep around for another round
1319 26d3fd2f Michael Hanselmann
        job.cur_opctx = opctx
1320 be760ba8 Michael Hanselmann
1321 26d3fd2f Michael Hanselmann
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1322 26d3fd2f Michael Hanselmann
                op.priority >= constants.OP_PRIO_HIGHEST)
1323 be760ba8 Michael Hanselmann
1324 26d3fd2f Michael Hanselmann
        # In no case must the status be finalized here
1325 47099cd1 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1326 be760ba8 Michael Hanselmann
1327 be760ba8 Michael Hanselmann
      else:
1328 26d3fd2f Michael Hanselmann
        # Ensure all opcodes so far have been successful
1329 26d3fd2f Michael Hanselmann
        assert (opctx.index == 0 or
1330 26d3fd2f Michael Hanselmann
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1331 26d3fd2f Michael Hanselmann
                           for i in job.ops[:opctx.index]))
1332 26d3fd2f Michael Hanselmann
1333 26d3fd2f Michael Hanselmann
        # Reset context
1334 26d3fd2f Michael Hanselmann
        job.cur_opctx = None
1335 26d3fd2f Michael Hanselmann
1336 26d3fd2f Michael Hanselmann
        if op.status == constants.OP_STATUS_SUCCESS:
1337 26d3fd2f Michael Hanselmann
          finalize = False
1338 26d3fd2f Michael Hanselmann
1339 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_ERROR:
1340 26d3fd2f Michael Hanselmann
          # Ensure failed opcode has an exception as its result
1341 26d3fd2f Michael Hanselmann
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1342 26d3fd2f Michael Hanselmann
1343 26d3fd2f Michael Hanselmann
          to_encode = errors.OpExecError("Preceding opcode failed")
1344 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1345 26d3fd2f Michael Hanselmann
                                _EncodeOpError(to_encode))
1346 26d3fd2f Michael Hanselmann
          finalize = True
1347 be760ba8 Michael Hanselmann
1348 26d3fd2f Michael Hanselmann
          # Consistency check
1349 26d3fd2f Michael Hanselmann
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1350 26d3fd2f Michael Hanselmann
                            errors.GetEncodedError(i.result)
1351 26d3fd2f Michael Hanselmann
                            for i in job.ops[opctx.index:])
1352 be760ba8 Michael Hanselmann
1353 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELING:
1354 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1355 26d3fd2f Michael Hanselmann
                                "Job canceled by request")
1356 26d3fd2f Michael Hanselmann
          finalize = True
1357 26d3fd2f Michael Hanselmann
1358 26d3fd2f Michael Hanselmann
        else:
1359 26d3fd2f Michael Hanselmann
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1360 26d3fd2f Michael Hanselmann
1361 66bd7445 Michael Hanselmann
        if opctx.index == (opcount - 1):
1362 66bd7445 Michael Hanselmann
          # Finalize on last opcode
1363 66bd7445 Michael Hanselmann
          finalize = True
1364 66bd7445 Michael Hanselmann
1365 66bd7445 Michael Hanselmann
        if finalize:
1366 26d3fd2f Michael Hanselmann
          # All opcodes have been run, finalize job
1367 66bd7445 Michael Hanselmann
          job.Finalize()
1368 26d3fd2f Michael Hanselmann
1369 26d3fd2f Michael Hanselmann
        # Write to disk. If the job status is final, this is the final write
1370 26d3fd2f Michael Hanselmann
        # allowed. Once the file has been written, it can be archived anytime.
1371 26d3fd2f Michael Hanselmann
        queue.UpdateJobUnlocked(job)
1372 be760ba8 Michael Hanselmann
1373 b95479a5 Michael Hanselmann
        assert not waitjob
1374 b95479a5 Michael Hanselmann
1375 66bd7445 Michael Hanselmann
        if finalize:
1376 26d3fd2f Michael Hanselmann
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1377 75d81fc8 Michael Hanselmann
          return self.FINISHED
1378 be760ba8 Michael Hanselmann
1379 b95479a5 Michael Hanselmann
      assert not waitjob or queue.depmgr.JobWaiting(job)
1380 b95479a5 Michael Hanselmann
1381 75d81fc8 Michael Hanselmann
      if waitjob:
1382 75d81fc8 Michael Hanselmann
        return self.WAITDEP
1383 75d81fc8 Michael Hanselmann
      else:
1384 75d81fc8 Michael Hanselmann
        return self.DEFER
1385 be760ba8 Michael Hanselmann
    finally:
1386 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Job became read-only while being processed"
1387 be760ba8 Michael Hanselmann
      queue.release()
1388 be760ba8 Michael Hanselmann
1389 be760ba8 Michael Hanselmann
1390 df5a5730 Michael Hanselmann
def _EvaluateJobProcessorResult(depmgr, job, result):
1391 df5a5730 Michael Hanselmann
  """Looks at a result from L{_JobProcessor} for a job.
1392 df5a5730 Michael Hanselmann

1393 df5a5730 Michael Hanselmann
  To be used in a L{_JobQueueWorker}.
1394 df5a5730 Michael Hanselmann

1395 df5a5730 Michael Hanselmann
  """
1396 df5a5730 Michael Hanselmann
  if result == _JobProcessor.FINISHED:
1397 df5a5730 Michael Hanselmann
    # Notify waiting jobs
1398 df5a5730 Michael Hanselmann
    depmgr.NotifyWaiters(job.id)
1399 df5a5730 Michael Hanselmann
1400 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.DEFER:
1401 df5a5730 Michael Hanselmann
    # Schedule again
1402 df5a5730 Michael Hanselmann
    raise workerpool.DeferTask(priority=job.CalcPriority())
1403 df5a5730 Michael Hanselmann
1404 df5a5730 Michael Hanselmann
  elif result == _JobProcessor.WAITDEP:
1405 df5a5730 Michael Hanselmann
    # No-op, dependency manager will re-schedule
1406 df5a5730 Michael Hanselmann
    pass
1407 df5a5730 Michael Hanselmann
1408 df5a5730 Michael Hanselmann
  else:
1409 df5a5730 Michael Hanselmann
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1410 df5a5730 Michael Hanselmann
                                 (result, ))
1411 df5a5730 Michael Hanselmann
1412 df5a5730 Michael Hanselmann
1413 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
1414 031a3e57 Michael Hanselmann
  """The actual job workers.
1415 031a3e57 Michael Hanselmann

1416 031a3e57 Michael Hanselmann
  """
1417 b459a848 Andrea Spadaccini
  def RunTask(self, job): # pylint: disable=W0221
1418 e2715f69 Michael Hanselmann
    """Job executor.
1419 e2715f69 Michael Hanselmann

1420 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1421 ea03467c Iustin Pop
    @param job: the job to be processed
1422 ea03467c Iustin Pop

1423 e2715f69 Michael Hanselmann
    """
1424 f8a4adfa Michael Hanselmann
    assert job.writable, "Expected writable job"
1425 f8a4adfa Michael Hanselmann
1426 b95479a5 Michael Hanselmann
    # Ensure only one worker is active on a single job. If a job registers for
1427 b95479a5 Michael Hanselmann
    # a dependency job, and the other job notifies before the first worker is
1428 b95479a5 Michael Hanselmann
    # done, the job can end up in the tasklist more than once.
1429 b95479a5 Michael Hanselmann
    job.processor_lock.acquire()
1430 b95479a5 Michael Hanselmann
    try:
1431 b95479a5 Michael Hanselmann
      return self._RunTaskInner(job)
1432 b95479a5 Michael Hanselmann
    finally:
1433 b95479a5 Michael Hanselmann
      job.processor_lock.release()
1434 b95479a5 Michael Hanselmann
1435 b95479a5 Michael Hanselmann
  def _RunTaskInner(self, job):
1436 b95479a5 Michael Hanselmann
    """Executes a job.
1437 b95479a5 Michael Hanselmann

1438 b95479a5 Michael Hanselmann
    Must be called with per-job lock acquired.
1439 b95479a5 Michael Hanselmann

1440 b95479a5 Michael Hanselmann
    """
1441 be760ba8 Michael Hanselmann
    queue = job.queue
1442 be760ba8 Michael Hanselmann
    assert queue == self.pool.queue
1443 be760ba8 Michael Hanselmann
1444 0aeeb6e3 Michael Hanselmann
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1445 0aeeb6e3 Michael Hanselmann
    setname_fn(None)
1446 daba67c7 Michael Hanselmann
1447 be760ba8 Michael Hanselmann
    proc = mcpu.Processor(queue.context, job.id)
1448 be760ba8 Michael Hanselmann
1449 0aeeb6e3 Michael Hanselmann
    # Create wrapper for setting thread name
1450 0aeeb6e3 Michael Hanselmann
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1451 0aeeb6e3 Michael Hanselmann
                                    proc.ExecOpCode)
1452 0aeeb6e3 Michael Hanselmann
1453 df5a5730 Michael Hanselmann
    _EvaluateJobProcessorResult(queue.depmgr, job,
1454 df5a5730 Michael Hanselmann
                                _JobProcessor(queue, wrap_execop_fn, job)())
1455 75d81fc8 Michael Hanselmann
1456 0aeeb6e3 Michael Hanselmann
  @staticmethod
1457 0aeeb6e3 Michael Hanselmann
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1458 0aeeb6e3 Michael Hanselmann
    """Updates the worker thread name to include a short summary of the opcode.
1459 0aeeb6e3 Michael Hanselmann

1460 0aeeb6e3 Michael Hanselmann
    @param setname_fn: Callable setting worker thread name
1461 0aeeb6e3 Michael Hanselmann
    @param execop_fn: Callable for executing opcode (usually
1462 0aeeb6e3 Michael Hanselmann
                      L{mcpu.Processor.ExecOpCode})
1463 0aeeb6e3 Michael Hanselmann

1464 0aeeb6e3 Michael Hanselmann
    """
1465 0aeeb6e3 Michael Hanselmann
    setname_fn(op)
1466 0aeeb6e3 Michael Hanselmann
    try:
1467 0aeeb6e3 Michael Hanselmann
      return execop_fn(op, *args, **kwargs)
1468 0aeeb6e3 Michael Hanselmann
    finally:
1469 0aeeb6e3 Michael Hanselmann
      setname_fn(None)
1470 0aeeb6e3 Michael Hanselmann
1471 0aeeb6e3 Michael Hanselmann
  @staticmethod
1472 0aeeb6e3 Michael Hanselmann
  def _GetWorkerName(job, op):
1473 0aeeb6e3 Michael Hanselmann
    """Sets the worker thread name.
1474 0aeeb6e3 Michael Hanselmann

1475 0aeeb6e3 Michael Hanselmann
    @type job: L{_QueuedJob}
1476 0aeeb6e3 Michael Hanselmann
    @type op: L{opcodes.OpCode}
1477 0aeeb6e3 Michael Hanselmann

1478 0aeeb6e3 Michael Hanselmann
    """
1479 0aeeb6e3 Michael Hanselmann
    parts = ["Job%s" % job.id]
1480 0aeeb6e3 Michael Hanselmann
1481 0aeeb6e3 Michael Hanselmann
    if op:
1482 0aeeb6e3 Michael Hanselmann
      parts.append(op.TinySummary())
1483 0aeeb6e3 Michael Hanselmann
1484 0aeeb6e3 Michael Hanselmann
    return "/".join(parts)
1485 0aeeb6e3 Michael Hanselmann
1486 e2715f69 Michael Hanselmann
1487 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
1488 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
1489 ea03467c Iustin Pop

1490 ea03467c Iustin Pop
  """
1491 5bdce580 Michael Hanselmann
  def __init__(self, queue):
1492 0aeeb6e3 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("Jq",
1493 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
1494 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
1495 5bdce580 Michael Hanselmann
    self.queue = queue
1496 e2715f69 Michael Hanselmann
1497 e2715f69 Michael Hanselmann
1498 b95479a5 Michael Hanselmann
class _JobDependencyManager:
1499 b95479a5 Michael Hanselmann
  """Keeps track of job dependencies.
1500 b95479a5 Michael Hanselmann

1501 b95479a5 Michael Hanselmann
  """
1502 b95479a5 Michael Hanselmann
  (WAIT,
1503 b95479a5 Michael Hanselmann
   ERROR,
1504 b95479a5 Michael Hanselmann
   CANCEL,
1505 b95479a5 Michael Hanselmann
   CONTINUE,
1506 b95479a5 Michael Hanselmann
   WRONGSTATUS) = range(1, 6)
1507 b95479a5 Michael Hanselmann
1508 b95479a5 Michael Hanselmann
  def __init__(self, getstatus_fn, enqueue_fn):
1509 b95479a5 Michael Hanselmann
    """Initializes this class.
1510 b95479a5 Michael Hanselmann

1511 b95479a5 Michael Hanselmann
    """
1512 b95479a5 Michael Hanselmann
    self._getstatus_fn = getstatus_fn
1513 b95479a5 Michael Hanselmann
    self._enqueue_fn = enqueue_fn
1514 b95479a5 Michael Hanselmann
1515 b95479a5 Michael Hanselmann
    self._waiters = {}
1516 b95479a5 Michael Hanselmann
    self._lock = locking.SharedLock("JobDepMgr")
1517 b95479a5 Michael Hanselmann
1518 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1519 b459a848 Andrea Spadaccini
  def GetLockInfo(self, requested): # pylint: disable=W0613
1520 fcb21ad7 Michael Hanselmann
    """Retrieves information about waiting jobs.
1521 fcb21ad7 Michael Hanselmann

1522 fcb21ad7 Michael Hanselmann
    @type requested: set
1523 fcb21ad7 Michael Hanselmann
    @param requested: Requested information, see C{query.LQ_*}
1524 fcb21ad7 Michael Hanselmann

1525 fcb21ad7 Michael Hanselmann
    """
1526 fcb21ad7 Michael Hanselmann
    # No need to sort here, that's being done by the lock manager and query
1527 fcb21ad7 Michael Hanselmann
    # library. There are no priorities for notifying jobs, hence all show up as
1528 fcb21ad7 Michael Hanselmann
    # one item under "pending".
1529 fcb21ad7 Michael Hanselmann
    return [("job/%s" % job_id, None, None,
1530 fcb21ad7 Michael Hanselmann
             [("job", [job.id for job in waiters])])
1531 fcb21ad7 Michael Hanselmann
            for job_id, waiters in self._waiters.items()
1532 fcb21ad7 Michael Hanselmann
            if waiters]
1533 fcb21ad7 Michael Hanselmann
1534 fcb21ad7 Michael Hanselmann
  @locking.ssynchronized(_LOCK, shared=1)
1535 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
1536 b95479a5 Michael Hanselmann
    """Checks if a job is waiting.
1537 b95479a5 Michael Hanselmann

1538 b95479a5 Michael Hanselmann
    """
1539 b95479a5 Michael Hanselmann
    return compat.any(job in jobs
1540 b95479a5 Michael Hanselmann
                      for jobs in self._waiters.values())
1541 b95479a5 Michael Hanselmann
1542 b95479a5 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1543 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1544 b95479a5 Michael Hanselmann
    """Checks if a dependency job has the requested status.
1545 b95479a5 Michael Hanselmann

1546 b95479a5 Michael Hanselmann
    If the other job is not yet in a finalized status, the calling job will be
1547 b95479a5 Michael Hanselmann
    notified (re-added to the workerpool) at a later point.
1548 b95479a5 Michael Hanselmann

1549 b95479a5 Michael Hanselmann
    @type job: L{_QueuedJob}
1550 b95479a5 Michael Hanselmann
    @param job: Job object
1551 76b62028 Iustin Pop
    @type dep_job_id: int
1552 b95479a5 Michael Hanselmann
    @param dep_job_id: ID of dependency job
1553 b95479a5 Michael Hanselmann
    @type dep_status: list
1554 b95479a5 Michael Hanselmann
    @param dep_status: Required status
1555 b95479a5 Michael Hanselmann

1556 b95479a5 Michael Hanselmann
    """
1557 76b62028 Iustin Pop
    assert ht.TJobId(job.id)
1558 76b62028 Iustin Pop
    assert ht.TJobId(dep_job_id)
1559 b95479a5 Michael Hanselmann
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1560 b95479a5 Michael Hanselmann
1561 b95479a5 Michael Hanselmann
    if job.id == dep_job_id:
1562 b95479a5 Michael Hanselmann
      return (self.ERROR, "Job can't depend on itself")
1563 b95479a5 Michael Hanselmann
1564 b95479a5 Michael Hanselmann
    # Get status of dependency job
1565 b95479a5 Michael Hanselmann
    try:
1566 b95479a5 Michael Hanselmann
      status = self._getstatus_fn(dep_job_id)
1567 b95479a5 Michael Hanselmann
    except errors.JobLost, err:
1568 b95479a5 Michael Hanselmann
      return (self.ERROR, "Dependency error: %s" % err)
1569 b95479a5 Michael Hanselmann
1570 b95479a5 Michael Hanselmann
    assert status in constants.JOB_STATUS_ALL
1571 b95479a5 Michael Hanselmann
1572 b95479a5 Michael Hanselmann
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1573 b95479a5 Michael Hanselmann
1574 b95479a5 Michael Hanselmann
    if status not in constants.JOBS_FINALIZED:
1575 b95479a5 Michael Hanselmann
      # Register for notification and wait for job to finish
1576 b95479a5 Michael Hanselmann
      job_id_waiters.add(job)
1577 b95479a5 Michael Hanselmann
      return (self.WAIT,
1578 b95479a5 Michael Hanselmann
              "Need to wait for job %s, wanted status '%s'" %
1579 b95479a5 Michael Hanselmann
              (dep_job_id, dep_status))
1580 b95479a5 Michael Hanselmann
1581 b95479a5 Michael Hanselmann
    # Remove from waiters list
1582 b95479a5 Michael Hanselmann
    if job in job_id_waiters:
1583 b95479a5 Michael Hanselmann
      job_id_waiters.remove(job)
1584 b95479a5 Michael Hanselmann
1585 b95479a5 Michael Hanselmann
    if (status == constants.JOB_STATUS_CANCELED and
1586 b95479a5 Michael Hanselmann
        constants.JOB_STATUS_CANCELED not in dep_status):
1587 b95479a5 Michael Hanselmann
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1588 b95479a5 Michael Hanselmann
1589 b95479a5 Michael Hanselmann
    elif not dep_status or status in dep_status:
1590 b95479a5 Michael Hanselmann
      return (self.CONTINUE,
1591 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s'" %
1592 b95479a5 Michael Hanselmann
              (dep_job_id, status))
1593 b95479a5 Michael Hanselmann
1594 b95479a5 Michael Hanselmann
    else:
1595 b95479a5 Michael Hanselmann
      return (self.WRONGSTATUS,
1596 b95479a5 Michael Hanselmann
              "Dependency job %s finished with status '%s',"
1597 b95479a5 Michael Hanselmann
              " not one of '%s' as required" %
1598 b95479a5 Michael Hanselmann
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1599 b95479a5 Michael Hanselmann
1600 37d76f1e Michael Hanselmann
  def _RemoveEmptyWaitersUnlocked(self):
1601 37d76f1e Michael Hanselmann
    """Remove all jobs without actual waiters.
1602 37d76f1e Michael Hanselmann

1603 37d76f1e Michael Hanselmann
    """
1604 37d76f1e Michael Hanselmann
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1605 37d76f1e Michael Hanselmann
                   if not waiters]:
1606 37d76f1e Michael Hanselmann
      del self._waiters[job_id]
1607 37d76f1e Michael Hanselmann
1608 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
1609 b95479a5 Michael Hanselmann
    """Notifies all jobs waiting for a certain job ID.
1610 b95479a5 Michael Hanselmann

1611 1316ebc2 Michael Hanselmann
    @attention: Do not call until L{CheckAndRegister} returned a status other
1612 1316ebc2 Michael Hanselmann
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1613 76b62028 Iustin Pop
    @type job_id: int
1614 b95479a5 Michael Hanselmann
    @param job_id: Job ID
1615 b95479a5 Michael Hanselmann

1616 b95479a5 Michael Hanselmann
    """
1617 76b62028 Iustin Pop
    assert ht.TJobId(job_id)
1618 b95479a5 Michael Hanselmann
1619 37d76f1e Michael Hanselmann
    self._lock.acquire()
1620 37d76f1e Michael Hanselmann
    try:
1621 37d76f1e Michael Hanselmann
      self._RemoveEmptyWaitersUnlocked()
1622 37d76f1e Michael Hanselmann
1623 37d76f1e Michael Hanselmann
      jobs = self._waiters.pop(job_id, None)
1624 37d76f1e Michael Hanselmann
    finally:
1625 37d76f1e Michael Hanselmann
      self._lock.release()
1626 37d76f1e Michael Hanselmann
1627 b95479a5 Michael Hanselmann
    if jobs:
1628 b95479a5 Michael Hanselmann
      # Re-add jobs to workerpool
1629 b95479a5 Michael Hanselmann
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1630 b95479a5 Michael Hanselmann
                    len(jobs), job_id)
1631 b95479a5 Michael Hanselmann
      self._enqueue_fn(jobs)
1632 b95479a5 Michael Hanselmann
1633 b95479a5 Michael Hanselmann
1634 c8d0be94 Michael Hanselmann
def _RequireNonDrainedQueue(fn):
1635 c8d0be94 Michael Hanselmann
  """Decorator checking for a non-drained queue.
1636 c8d0be94 Michael Hanselmann

1637 c8d0be94 Michael Hanselmann
  To be used with functions submitting new jobs.
1638 c8d0be94 Michael Hanselmann

1639 c8d0be94 Michael Hanselmann
  """
1640 c8d0be94 Michael Hanselmann
  def wrapper(self, *args, **kwargs):
1641 c8d0be94 Michael Hanselmann
    """Wrapper function.
1642 c8d0be94 Michael Hanselmann

1643 c8d0be94 Michael Hanselmann
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1644 c8d0be94 Michael Hanselmann

1645 c8d0be94 Michael Hanselmann
    """
1646 c8d0be94 Michael Hanselmann
    # Ok when sharing the big job queue lock, as the drain file is created when
1647 c8d0be94 Michael Hanselmann
    # the lock is exclusive.
1648 c8d0be94 Michael Hanselmann
    # Needs access to protected member, pylint: disable=W0212
1649 c8d0be94 Michael Hanselmann
    if self._drained:
1650 c8d0be94 Michael Hanselmann
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1651 6d5ea385 Michael Hanselmann
1652 6d5ea385 Michael Hanselmann
    if not self._accepting_jobs:
1653 6d5ea385 Michael Hanselmann
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1654 6d5ea385 Michael Hanselmann
1655 c8d0be94 Michael Hanselmann
    return fn(self, *args, **kwargs)
1656 c8d0be94 Michael Hanselmann
  return wrapper
1657 c8d0be94 Michael Hanselmann
1658 c8d0be94 Michael Hanselmann
1659 6c881c52 Iustin Pop
class JobQueue(object):
1660 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
1661 db37da70 Michael Hanselmann

1662 6c881c52 Iustin Pop
  """
1663 f47b32a8 Petr Pudlak
  def __init__(self, context, cfg):
1664 ea03467c Iustin Pop
    """Constructor for JobQueue.
1665 ea03467c Iustin Pop

1666 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
1667 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
1668 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
1669 ea03467c Iustin Pop
    running).
1670 ea03467c Iustin Pop

1671 ea03467c Iustin Pop
    @type context: GanetiContext
1672 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
1673 ea03467c Iustin Pop
        data and other ganeti objects
1674 ea03467c Iustin Pop

1675 ea03467c Iustin Pop
    """
1676 5bdce580 Michael Hanselmann
    self.context = context
1677 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
1678 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
1679 f1da30e6 Michael Hanselmann
1680 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
1681 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
1682 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
1683 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
1684 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
1685 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
1686 ebb80afa Guido Trotter
1687 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
1688 ebb80afa Guido Trotter
    self.release = self._lock.release
1689 85f03e0d Michael Hanselmann
1690 6d5ea385 Michael Hanselmann
    # Accept jobs by default
1691 6d5ea385 Michael Hanselmann
    self._accepting_jobs = True
1692 6d5ea385 Michael Hanselmann
1693 04ab05ce Michael Hanselmann
    # Read serial file
1694 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
1695 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
1696 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
1697 c4beba1c Iustin Pop
1698 23752136 Michael Hanselmann
    # Get initial list of nodes
1699 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
1700 f47b32a8 Petr Pudlak
                       for n in cfg.GetAllNodesInfo().values()
1701 59303563 Iustin Pop
                       if n.master_candidate)
1702 8e00939c Michael Hanselmann
1703 8e00939c Michael Hanselmann
    # Remove master node
1704 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
1705 23752136 Michael Hanselmann
1706 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
1707 23752136 Michael Hanselmann
1708 6d5ea385 Michael Hanselmann
    self._queue_size = None
1709 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1710 6d5ea385 Michael Hanselmann
    assert ht.TInt(self._queue_size)
1711 ff699aa9 Michael Hanselmann
    self._drained = jstore.CheckDrainFlag()
1712 20571a26 Guido Trotter
1713 b95479a5 Michael Hanselmann
    # Job dependencies
1714 b95479a5 Michael Hanselmann
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1715 b95479a5 Michael Hanselmann
                                        self._EnqueueJobs)
1716 fcb21ad7 Michael Hanselmann
    self.context.glm.AddToLockMonitor(self.depmgr)
1717 b95479a5 Michael Hanselmann
1718 85f03e0d Michael Hanselmann
    # Setup worker pool
1719 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
1720 711b5124 Michael Hanselmann
1721 fcd70b89 Klaus Aehlig
  def _PickupJobUnlocked(self, job_id):
1722 fcd70b89 Klaus Aehlig
    """Load a job from the job queue
1723 fcd70b89 Klaus Aehlig

1724 fcd70b89 Klaus Aehlig
    Pick up a job that already is in the job queue and start/resume it.
1725 fcd70b89 Klaus Aehlig

1726 fcd70b89 Klaus Aehlig
    """
1727 fcd70b89 Klaus Aehlig
    job = self._LoadJobUnlocked(job_id)
1728 fcd70b89 Klaus Aehlig
1729 fcd70b89 Klaus Aehlig
    if job is None:
1730 fcd70b89 Klaus Aehlig
      logging.warning("Job %s could not be read", job_id)
1731 fcd70b89 Klaus Aehlig
      return
1732 fcd70b89 Klaus Aehlig
1733 be6cdf67 Michele Tartara
    job.AddReasons(pickup=True)
1734 be6cdf67 Michele Tartara
1735 fcd70b89 Klaus Aehlig
    status = job.CalcStatus()
1736 fcd70b89 Klaus Aehlig
    if status == constants.JOB_STATUS_QUEUED:
1737 ea0a6023 Petr Pudlak
      job.SetPid(os.getpid())
1738 fcd70b89 Klaus Aehlig
      self._EnqueueJobsUnlocked([job])
1739 fcd70b89 Klaus Aehlig
      logging.info("Restarting job %s", job.id)
1740 fcd70b89 Klaus Aehlig
1741 fcd70b89 Klaus Aehlig
    elif status in (constants.JOB_STATUS_RUNNING,
1742 fcd70b89 Klaus Aehlig
                    constants.JOB_STATUS_WAITING,
1743 fcd70b89 Klaus Aehlig
                    constants.JOB_STATUS_CANCELING):
1744 fcd70b89 Klaus Aehlig
      logging.warning("Unfinished job %s found: %s", job.id, job)
1745 fcd70b89 Klaus Aehlig
1746 fcd70b89 Klaus Aehlig
      if status == constants.JOB_STATUS_WAITING:
1747 fcd70b89 Klaus Aehlig
        job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1748 ea0a6023 Petr Pudlak
        job.SetPid(os.getpid())
1749 fcd70b89 Klaus Aehlig
        self._EnqueueJobsUnlocked([job])
1750 fcd70b89 Klaus Aehlig
        logging.info("Restarting job %s", job.id)
1751 fcd70b89 Klaus Aehlig
      else:
1752 f3ac6f36 Klaus Aehlig
        to_encode = errors.OpExecError("Unclean master daemon shutdown")
1753 fcd70b89 Klaus Aehlig
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1754 f3ac6f36 Klaus Aehlig
                              _EncodeOpError(to_encode))
1755 fcd70b89 Klaus Aehlig
        job.Finalize()
1756 fcd70b89 Klaus Aehlig
1757 fcd70b89 Klaus Aehlig
    self.UpdateJobUnlocked(job)
1758 fcd70b89 Klaus Aehlig
1759 fcd70b89 Klaus Aehlig
  @locking.ssynchronized(_LOCK)
1760 fcd70b89 Klaus Aehlig
  def PickupJob(self, job_id):
1761 fcd70b89 Klaus Aehlig
    self._PickupJobUnlocked(job_id)
1762 fcd70b89 Klaus Aehlig
1763 fb1ffbca Michael Hanselmann
  def _GetRpc(self, address_list):
1764 fb1ffbca Michael Hanselmann
    """Gets RPC runner with context.
1765 fb1ffbca Michael Hanselmann

1766 fb1ffbca Michael Hanselmann
    """
1767 fb1ffbca Michael Hanselmann
    return rpc.JobQueueRunner(self.context, address_list)
1768 fb1ffbca Michael Hanselmann
1769 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1770 99aabbed Iustin Pop
  def AddNode(self, node):
1771 99aabbed Iustin Pop
    """Register a new node with the queue.
1772 99aabbed Iustin Pop

1773 99aabbed Iustin Pop
    @type node: L{objects.Node}
1774 99aabbed Iustin Pop
    @param node: the node object to be added
1775 99aabbed Iustin Pop

1776 99aabbed Iustin Pop
    """
1777 99aabbed Iustin Pop
    node_name = node.name
1778 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1779 23752136 Michael Hanselmann
1780 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1781 fb1ffbca Michael Hanselmann
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1782 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1783 c8457ce7 Iustin Pop
    if msg:
1784 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1785 c8457ce7 Iustin Pop
                      node_name, msg)
1786 23752136 Michael Hanselmann
1787 59303563 Iustin Pop
    if not node.master_candidate:
1788 59303563 Iustin Pop
      # remove if existing, ignoring errors
1789 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1790 59303563 Iustin Pop
      # and skip the replication of the job ids
1791 59303563 Iustin Pop
      return
1792 59303563 Iustin Pop
1793 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1794 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1795 23752136 Michael Hanselmann
1796 d2e03a33 Michael Hanselmann
    # Upload current serial file
1797 e2b4a7ba Michael Hanselmann
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1798 d2e03a33 Michael Hanselmann
1799 fb1ffbca Michael Hanselmann
    # Static address list
1800 fb1ffbca Michael Hanselmann
    addrs = [node.primary_ip]
1801 fb1ffbca Michael Hanselmann
1802 d2e03a33 Michael Hanselmann
    for file_name in files:
1803 9f774ee8 Michael Hanselmann
      # Read file content
1804 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1805 9f774ee8 Michael Hanselmann
1806 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1807 cffbbae7 Michael Hanselmann
                             file_name, content)
1808 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1809 c8457ce7 Iustin Pop
      if msg:
1810 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1811 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1812 d2e03a33 Michael Hanselmann
1813 be6c403e Michael Hanselmann
    # Set queue drained flag
1814 be6c403e Michael Hanselmann
    result = \
1815 be6c403e Michael Hanselmann
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1816 be6c403e Michael Hanselmann
                                                       self._drained)
1817 be6c403e Michael Hanselmann
    msg = result[node_name].fail_msg
1818 be6c403e Michael Hanselmann
    if msg:
1819 be6c403e Michael Hanselmann
      logging.error("Failed to set queue drained flag on node %s: %s",
1820 be6c403e Michael Hanselmann
                    node_name, msg)
1821 be6c403e Michael Hanselmann
1822 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1823 d2e03a33 Michael Hanselmann
1824 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1825 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1826 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1827 ea03467c Iustin Pop

1828 ea03467c Iustin Pop
    @type node_name: str
1829 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1830 ea03467c Iustin Pop

1831 ea03467c Iustin Pop
    """
1832 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1833 23752136 Michael Hanselmann
1834 7e950d31 Iustin Pop
  @staticmethod
1835 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1836 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1837 ea03467c Iustin Pop

1838 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1839 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1840 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1841 ea03467c Iustin Pop

1842 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1843 ea03467c Iustin Pop
    @type nodes: list
1844 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1845 ea03467c Iustin Pop
    @type failmsg: str
1846 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1847 ea03467c Iustin Pop

1848 ea03467c Iustin Pop
    """
1849 e74798c1 Michael Hanselmann
    failed = []
1850 e74798c1 Michael Hanselmann
    success = []
1851 e74798c1 Michael Hanselmann
1852 e74798c1 Michael Hanselmann
    for node in nodes:
1853 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1854 c8457ce7 Iustin Pop
      if msg:
1855 e74798c1 Michael Hanselmann
        failed.append(node)
1856 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1857 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1858 c8457ce7 Iustin Pop
      else:
1859 c8457ce7 Iustin Pop
        success.append(node)
1860 e74798c1 Michael Hanselmann
1861 e74798c1 Michael Hanselmann
    # +1 for the master node
1862 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1863 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1864 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1865 e74798c1 Michael Hanselmann
1866 99aabbed Iustin Pop
  def _GetNodeIp(self):
1867 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1868 99aabbed Iustin Pop

1869 ea03467c Iustin Pop
    @rtype: (list, list)
1870 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1871 ea03467c Iustin Pop
        names and the second one with the node addresses
1872 ea03467c Iustin Pop

1873 99aabbed Iustin Pop
    """
1874 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1875 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1876 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1877 99aabbed Iustin Pop
    return name_list, addr_list
1878 99aabbed Iustin Pop
1879 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1880 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1881 8e00939c Michael Hanselmann

1882 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1883 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1884 ea03467c Iustin Pop

1885 ea03467c Iustin Pop
    @type file_name: str
1886 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1887 ea03467c Iustin Pop
    @type data: str
1888 ea03467c Iustin Pop
    @param data: the new contents of the file
1889 4c36bdf5 Guido Trotter
    @type replicate: boolean
1890 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1891 ea03467c Iustin Pop

1892 8e00939c Michael Hanselmann
    """
1893 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1894 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1895 fe05a931 Michele Tartara
                    gid=getents.daemons_gid,
1896 fe05a931 Michele Tartara
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1897 8e00939c Michael Hanselmann
1898 4c36bdf5 Guido Trotter
    if replicate:
1899 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1900 cffbbae7 Michael Hanselmann
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1901 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1902 23752136 Michael Hanselmann
1903 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1904 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1905 ea03467c Iustin Pop

1906 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1907 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1908 ea03467c Iustin Pop

1909 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1910 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1911 ea03467c Iustin Pop

1912 ea03467c Iustin Pop
    """
1913 dd875d32 Michael Hanselmann
    # Rename them locally
1914 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1915 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1916 abc1f2ce Michael Hanselmann
1917 dd875d32 Michael Hanselmann
    # ... and on all nodes
1918 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1919 fb1ffbca Michael Hanselmann
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1920 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1921 abc1f2ce Michael Hanselmann
1922 85f03e0d Michael Hanselmann
  @staticmethod
1923 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1924 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1925 ea03467c Iustin Pop

1926 ea03467c Iustin Pop
    @type job_id: str
1927 ea03467c Iustin Pop
    @param job_id: the job identifier
1928 ea03467c Iustin Pop
    @rtype: str
1929 ea03467c Iustin Pop
    @return: the path to the job file
1930 ea03467c Iustin Pop

1931 ea03467c Iustin Pop
    """
1932 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1933 f1da30e6 Michael Hanselmann
1934 1410a389 Michael Hanselmann
  @staticmethod
1935 1410a389 Michael Hanselmann
  def _GetArchivedJobPath(job_id):
1936 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1937 ea03467c Iustin Pop

1938 ea03467c Iustin Pop
    @type job_id: str
1939 ea03467c Iustin Pop
    @param job_id: the job identifier
1940 ea03467c Iustin Pop
    @rtype: str
1941 ea03467c Iustin Pop
    @return: the path to the archived job file
1942 ea03467c Iustin Pop

1943 ea03467c Iustin Pop
    """
1944 e2b4a7ba Michael Hanselmann
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1945 1410a389 Michael Hanselmann
                          jstore.GetArchiveDirectory(job_id),
1946 1410a389 Michael Hanselmann
                          "job-%s" % job_id)
1947 0cb94105 Michael Hanselmann
1948 cb66225d Michael Hanselmann
  @staticmethod
1949 0422250e Michael Hanselmann
  def _DetermineJobDirectories(archived):
1950 bb921668 Michael Hanselmann
    """Build list of directories containing job files.
1951 bb921668 Michael Hanselmann

1952 bb921668 Michael Hanselmann
    @type archived: bool
1953 bb921668 Michael Hanselmann
    @param archived: Whether to include directories for archived jobs
1954 bb921668 Michael Hanselmann
    @rtype: list
1955 bb921668 Michael Hanselmann

1956 bb921668 Michael Hanselmann
    """
1957 0422250e Michael Hanselmann
    result = [pathutils.QUEUE_DIR]
1958 0422250e Michael Hanselmann
1959 0422250e Michael Hanselmann
    if archived:
1960 0422250e Michael Hanselmann
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1961 0422250e Michael Hanselmann
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1962 0422250e Michael Hanselmann
                        utils.ListVisibleFiles(archive_path)))
1963 0422250e Michael Hanselmann
1964 0422250e Michael Hanselmann
    return result
1965 0422250e Michael Hanselmann
1966 0422250e Michael Hanselmann
  @classmethod
1967 0422250e Michael Hanselmann
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1968 911a495b Iustin Pop
    """Return all known job IDs.
1969 911a495b Iustin Pop

1970 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1971 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1972 ac0930b9 Iustin Pop
    extra IDs).
1973 ac0930b9 Iustin Pop

1974 85a1c57d Guido Trotter
    @type sort: boolean
1975 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1976 ea03467c Iustin Pop
    @rtype: list
1977 ea03467c Iustin Pop
    @return: the list of job IDs
1978 ea03467c Iustin Pop

1979 911a495b Iustin Pop
    """
1980 85a1c57d Guido Trotter
    jlist = []
1981 0422250e Michael Hanselmann
1982 0422250e Michael Hanselmann
    for path in cls._DetermineJobDirectories(archived):
1983 0422250e Michael Hanselmann
      for filename in utils.ListVisibleFiles(path):
1984 0422250e Michael Hanselmann
        m = constants.JOB_FILE_RE.match(filename)
1985 0422250e Michael Hanselmann
        if m:
1986 0422250e Michael Hanselmann
          jlist.append(int(m.group(1)))
1987 0422250e Michael Hanselmann
1988 85a1c57d Guido Trotter
    if sort:
1989 76b62028 Iustin Pop
      jlist.sort()
1990 f0d874fe Iustin Pop
    return jlist
1991 911a495b Iustin Pop
1992 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1993 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1994 ea03467c Iustin Pop

1995 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1996 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1997 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1998 ea03467c Iustin Pop

1999 76b62028 Iustin Pop
    @type job_id: int
2000 ea03467c Iustin Pop
    @param job_id: the job id
2001 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
2002 ea03467c Iustin Pop
    @return: either None or the job object
2003 ea03467c Iustin Pop

2004 ea03467c Iustin Pop
    """
2005 95a4e33f Hrvoje Ribicic
    assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
2006 95a4e33f Hrvoje Ribicic
2007 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
2008 5685c1a5 Michael Hanselmann
    if job:
2009 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
2010 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Found read-only job in memcache"
2011 5685c1a5 Michael Hanselmann
      return job
2012 ac0930b9 Iustin Pop
2013 3d6c5566 Guido Trotter
    try:
2014 194c8ca4 Michael Hanselmann
      job = self._LoadJobFromDisk(job_id, False)
2015 aa9f8167 Iustin Pop
      if job is None:
2016 aa9f8167 Iustin Pop
        return job
2017 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
2018 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
2019 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
2020 3d6c5566 Guido Trotter
      if old_path == new_path:
2021 3d6c5566 Guido Trotter
        # job already archived (future case)
2022 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
2023 3d6c5566 Guido Trotter
      else:
2024 3d6c5566 Guido Trotter
        # non-archived case
2025 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
2026 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
2027 3d6c5566 Guido Trotter
      return None
2028 162c8636 Guido Trotter
2029 c0f6d0d8 Michael Hanselmann
    assert job.writable, "Job just loaded is not writable"
2030 c0f6d0d8 Michael Hanselmann
2031 162c8636 Guido Trotter
    self._memcache[job_id] = job
2032 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
2033 162c8636 Guido Trotter
    return job
2034 162c8636 Guido Trotter
2035 c0f6d0d8 Michael Hanselmann
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2036 162c8636 Guido Trotter
    """Load the given job file from disk.
2037 162c8636 Guido Trotter

2038 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2039 162c8636 Guido Trotter

2040 76b62028 Iustin Pop
    @type job_id: int
2041 162c8636 Guido Trotter
    @param job_id: job identifier
2042 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2043 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2044 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
2045 162c8636 Guido Trotter
    @return: either None or the job object
2046 162c8636 Guido Trotter

2047 162c8636 Guido Trotter
    """
2048 8a3cd185 Michael Hanselmann
    path_functions = [(self._GetJobPath, False)]
2049 194c8ca4 Michael Hanselmann
2050 194c8ca4 Michael Hanselmann
    if try_archived:
2051 8a3cd185 Michael Hanselmann
      path_functions.append((self._GetArchivedJobPath, True))
2052 194c8ca4 Michael Hanselmann
2053 194c8ca4 Michael Hanselmann
    raw_data = None
2054 8a3cd185 Michael Hanselmann
    archived = None
2055 194c8ca4 Michael Hanselmann
2056 8a3cd185 Michael Hanselmann
    for (fn, archived) in path_functions:
2057 194c8ca4 Michael Hanselmann
      filepath = fn(job_id)
2058 194c8ca4 Michael Hanselmann
      logging.debug("Loading job from %s", filepath)
2059 194c8ca4 Michael Hanselmann
      try:
2060 194c8ca4 Michael Hanselmann
        raw_data = utils.ReadFile(filepath)
2061 194c8ca4 Michael Hanselmann
      except EnvironmentError, err:
2062 194c8ca4 Michael Hanselmann
        if err.errno != errno.ENOENT:
2063 194c8ca4 Michael Hanselmann
          raise
2064 194c8ca4 Michael Hanselmann
      else:
2065 194c8ca4 Michael Hanselmann
        break
2066 194c8ca4 Michael Hanselmann
2067 194c8ca4 Michael Hanselmann
    if not raw_data:
2068 194c8ca4 Michael Hanselmann
      return None
2069 13998ef2 Michael Hanselmann
2070 c0f6d0d8 Michael Hanselmann
    if writable is None:
2071 8a3cd185 Michael Hanselmann
      writable = not archived
2072 c0f6d0d8 Michael Hanselmann
2073 94ed59a5 Iustin Pop
    try:
2074 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
2075 8a3cd185 Michael Hanselmann
      job = _QueuedJob.Restore(self, data, writable, archived)
2076 b459a848 Andrea Spadaccini
    except Exception, err: # pylint: disable=W0703
2077 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
2078 94ed59a5 Iustin Pop
2079 ac0930b9 Iustin Pop
    return job
2080 f1da30e6 Michael Hanselmann
2081 c0f6d0d8 Michael Hanselmann
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2082 0f9c08dc Guido Trotter
    """Load the given job file from disk.
2083 0f9c08dc Guido Trotter

2084 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
2085 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
2086 0f9c08dc Guido Trotter
    exception is logged.
2087 0f9c08dc Guido Trotter

2088 76b62028 Iustin Pop
    @type job_id: int
2089 0f9c08dc Guido Trotter
    @param job_id: job identifier
2090 194c8ca4 Michael Hanselmann
    @type try_archived: bool
2091 194c8ca4 Michael Hanselmann
    @param try_archived: Whether to try loading an archived job
2092 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
2093 0f9c08dc Guido Trotter
    @return: either None or the job object
2094 0f9c08dc Guido Trotter

2095 0f9c08dc Guido Trotter
    """
2096 0f9c08dc Guido Trotter
    try:
2097 c0f6d0d8 Michael Hanselmann
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2098 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
2099 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
2100 0f9c08dc Guido Trotter
      return None
2101 0f9c08dc Guido Trotter
2102 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
2103 20571a26 Guido Trotter
    """Update the queue size.
2104 20571a26 Guido Trotter

2105 20571a26 Guido Trotter
    """
2106 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2107 20571a26 Guido Trotter
2108 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2109 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
2110 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
2111 3ccafd0e Iustin Pop

2112 ea03467c Iustin Pop
    @type drain_flag: boolean
2113 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
2114 ea03467c Iustin Pop

2115 3ccafd0e Iustin Pop
    """
2116 be6c403e Michael Hanselmann
    # Change flag locally
2117 ff699aa9 Michael Hanselmann
    jstore.SetDrainFlag(drain_flag)
2118 20571a26 Guido Trotter
2119 20571a26 Guido Trotter
    self._drained = drain_flag
2120 20571a26 Guido Trotter
2121 be6c403e Michael Hanselmann
    # ... and on all nodes
2122 be6c403e Michael Hanselmann
    (names, addrs) = self._GetNodeIp()
2123 be6c403e Michael Hanselmann
    result = \
2124 be6c403e Michael Hanselmann
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2125 be6c403e Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
2126 be6c403e Michael Hanselmann
                         "Setting queue drain flag to %s" % drain_flag)
2127 be6c403e Michael Hanselmann
2128 3ccafd0e Iustin Pop
    return True
2129 3ccafd0e Iustin Pop
2130 704b51ff Klaus Aehlig
  @classmethod
2131 704b51ff Klaus Aehlig
  def SubmitJob(cls, ops):
2132 2971c913 Iustin Pop
    """Create and store a new job.
2133 2971c913 Iustin Pop

2134 2971c913 Iustin Pop
    """
2135 704b51ff Klaus Aehlig
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops)
2136 2971c913 Iustin Pop
2137 704b51ff Klaus Aehlig
  @classmethod
2138 704b51ff Klaus Aehlig
  def SubmitJobToDrainedQueue(cls, ops):
2139 346c3037 Klaus Aehlig
    """Forcefully create and store a new job.
2140 346c3037 Klaus Aehlig

2141 346c3037 Klaus Aehlig
    Do so, even if the job queue is drained.
2142 346c3037 Klaus Aehlig

2143 346c3037 Klaus Aehlig
    """
2144 704b51ff Klaus Aehlig
    return luxi.Client(address=pathutils.QUERY_SOCKET)\
2145 704b51ff Klaus Aehlig
        .SubmitJobToDrainedQueue(ops)
2146 346c3037 Klaus Aehlig
2147 704b51ff Klaus Aehlig
  @classmethod
2148 704b51ff Klaus Aehlig
  def SubmitManyJobs(cls, jobs):
2149 2971c913 Iustin Pop
    """Create and store multiple jobs.
2150 2971c913 Iustin Pop

2151 2971c913 Iustin Pop
    """
2152 704b51ff Klaus Aehlig
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
2153 2971c913 Iustin Pop
2154 b247c6fc Michael Hanselmann
  @staticmethod
2155 b247c6fc Michael Hanselmann
  def _FormatSubmitError(msg, ops):
2156 b247c6fc Michael Hanselmann
    """Formats errors which occurred while submitting a job.
2157 b247c6fc Michael Hanselmann

2158 b247c6fc Michael Hanselmann
    """
2159 b247c6fc Michael Hanselmann
    return ("%s; opcodes %s" %
2160 b247c6fc Michael Hanselmann
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2161 b247c6fc Michael Hanselmann
2162 b247c6fc Michael Hanselmann
  @staticmethod
2163 b247c6fc Michael Hanselmann
  def _ResolveJobDependencies(resolve_fn, deps):
2164 b247c6fc Michael Hanselmann
    """Resolves relative job IDs in dependencies.
2165 b247c6fc Michael Hanselmann

2166 b247c6fc Michael Hanselmann
    @type resolve_fn: callable
2167 b247c6fc Michael Hanselmann
    @param resolve_fn: Function to resolve a relative job ID
2168 b247c6fc Michael Hanselmann
    @type deps: list
2169 b247c6fc Michael Hanselmann
    @param deps: Dependencies
2170 4c27b231 Michael Hanselmann
    @rtype: tuple; (boolean, string or list)
2171 4c27b231 Michael Hanselmann
    @return: If successful (first tuple item), the returned list contains
2172 4c27b231 Michael Hanselmann
      resolved job IDs along with the requested status; if not successful,
2173 4c27b231 Michael Hanselmann
      the second element is an error message
2174 b247c6fc Michael Hanselmann

2175 b247c6fc Michael Hanselmann
    """
2176 b247c6fc Michael Hanselmann
    result = []
2177 b247c6fc Michael Hanselmann
2178 b247c6fc Michael Hanselmann
    for (dep_job_id, dep_status) in deps:
2179 b247c6fc Michael Hanselmann
      if ht.TRelativeJobId(dep_job_id):
2180 b247c6fc Michael Hanselmann
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2181 b247c6fc Michael Hanselmann
        try:
2182 b247c6fc Michael Hanselmann
          job_id = resolve_fn(dep_job_id)
2183 b247c6fc Michael Hanselmann
        except IndexError:
2184 b247c6fc Michael Hanselmann
          # Abort
2185 b247c6fc Michael Hanselmann
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2186 b247c6fc Michael Hanselmann
      else:
2187 b247c6fc Michael Hanselmann
        job_id = dep_job_id
2188 b247c6fc Michael Hanselmann
2189 b247c6fc Michael Hanselmann
      result.append((job_id, dep_status))
2190 b247c6fc Michael Hanselmann
2191 b247c6fc Michael Hanselmann
    return (True, result)
2192 b247c6fc Michael Hanselmann
2193 75d81fc8 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2194 7b5c4a69 Michael Hanselmann
  def _EnqueueJobs(self, jobs):
2195 7b5c4a69 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2196 7b5c4a69 Michael Hanselmann

2197 7b5c4a69 Michael Hanselmann
    @type jobs: list
2198 7b5c4a69 Michael Hanselmann
    @param jobs: List of all jobs
2199 7b5c4a69 Michael Hanselmann

2200 7b5c4a69 Michael Hanselmann
    """
2201 75d81fc8 Michael Hanselmann
    return self._EnqueueJobsUnlocked(jobs)
2202 75d81fc8 Michael Hanselmann
2203 75d81fc8 Michael Hanselmann
  def _EnqueueJobsUnlocked(self, jobs):
2204 75d81fc8 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
2205 75d81fc8 Michael Hanselmann

2206 75d81fc8 Michael Hanselmann
    @type jobs: list
2207 75d81fc8 Michael Hanselmann
    @param jobs: List of all jobs
2208 75d81fc8 Michael Hanselmann

2209 75d81fc8 Michael Hanselmann
    """
2210 75d81fc8 Michael Hanselmann
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2211 7b5c4a69 Michael Hanselmann
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2212 99fb250b Michael Hanselmann
                             priority=[job.CalcPriority() for job in jobs],
2213 99fb250b Michael Hanselmann
                             task_id=map(_GetIdAttr, jobs))
2214 7b5c4a69 Michael Hanselmann
2215 b95479a5 Michael Hanselmann
  def _GetJobStatusForDependencies(self, job_id):
2216 b95479a5 Michael Hanselmann
    """Gets the status of a job for dependencies.
2217 b95479a5 Michael Hanselmann

2218 76b62028 Iustin Pop
    @type job_id: int
2219 b95479a5 Michael Hanselmann
    @param job_id: Job ID
2220 b95479a5 Michael Hanselmann
    @raise errors.JobLost: If job can't be found
2221 b95479a5 Michael Hanselmann

2222 b95479a5 Michael Hanselmann
    """
2223 b95479a5 Michael Hanselmann
    # Not using in-memory cache as doing so would require an exclusive lock
2224 b95479a5 Michael Hanselmann
2225 b95479a5 Michael Hanselmann
    # Try to load from disk
2226 c0f6d0d8 Michael Hanselmann
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2227 c0f6d0d8 Michael Hanselmann
2228 17385bd2 Andrea Spadaccini
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2229 b95479a5 Michael Hanselmann
2230 b95479a5 Michael Hanselmann
    if job:
2231 b95479a5 Michael Hanselmann
      return job.CalcStatus()
2232 b95479a5 Michael Hanselmann
2233 b95479a5 Michael Hanselmann
    raise errors.JobLost("Job %s not found" % job_id)
2234 b95479a5 Michael Hanselmann
2235 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
2236 ea03467c Iustin Pop
    """Update a job's on disk storage.
2237 ea03467c Iustin Pop

2238 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
2239 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
2240 ea03467c Iustin Pop
    nodes.
2241 ea03467c Iustin Pop

2242 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
2243 ea03467c Iustin Pop
    @param job: the changed job
2244 4c36bdf5 Guido Trotter
    @type replicate: boolean
2245 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
2246 ea03467c Iustin Pop

2247 ea03467c Iustin Pop
    """
2248 66bd7445 Michael Hanselmann
    if __debug__:
2249 66bd7445 Michael Hanselmann
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2250 66bd7445 Michael Hanselmann
      assert (finalized ^ (job.end_timestamp is None))
2251 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't update read-only job"
2252 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't update archived job"
2253 66bd7445 Michael Hanselmann
2254 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
2255 a182a3ed Michael Hanselmann
    data = serializer.DumpJson(job.Serialize())
2256 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
2257 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
2258 ac0930b9 Iustin Pop
2259 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2260 5c735209 Iustin Pop
                        timeout):
2261 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
2262 6c5a7090 Michael Hanselmann

2263 76b62028 Iustin Pop
    @type job_id: int
2264 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
2265 6c5a7090 Michael Hanselmann
    @type fields: list of strings
2266 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
2267 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
2268 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
2269 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
2270 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
2271 5c735209 Iustin Pop
    @type timeout: float
2272 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
2273 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
2274 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
2275 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
2276 ea03467c Iustin Pop

2277 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
2278 ea03467c Iustin Pop
        we instead return a special value,
2279 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2280 ea03467c Iustin Pop
        as such by the clients
2281 6c5a7090 Michael Hanselmann

2282 6c5a7090 Michael Hanselmann
    """
2283 04569469 Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2284 c0f6d0d8 Michael Hanselmann
                             writable=False)
2285 989a8bee Michael Hanselmann
2286 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
2287 989a8bee Michael Hanselmann
2288 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
2289 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
2290 dfe57c22 Michael Hanselmann
2291 c061d046 Petr Pudlak
  def HasJobBeenFinalized(self, job_id):
2292 c061d046 Petr Pudlak
    """Checks if a job has been finalized.
2293 c061d046 Petr Pudlak

2294 c061d046 Petr Pudlak
    @type job_id: int
2295 c061d046 Petr Pudlak
    @param job_id: Job identifier
2296 c061d046 Petr Pudlak
    @rtype: boolean
2297 c061d046 Petr Pudlak
    @return: True if the job has been finalized,
2298 c061d046 Petr Pudlak
        False if the timeout has been reached,
2299 c061d046 Petr Pudlak
        None if the job doesn't exist
2300 c061d046 Petr Pudlak

2301 c061d046 Petr Pudlak
    """
2302 c061d046 Petr Pudlak
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2303 c061d046 Petr Pudlak
    if job is not None:
2304 c061d046 Petr Pudlak
      return job.CalcStatus() in constants.JOBS_FINALIZED
2305 c061d046 Petr Pudlak
    else:
2306 c061d046 Petr Pudlak
      return None
2307 c061d046 Petr Pudlak
2308 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2309 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
2310 188c5e0a Michael Hanselmann
    """Cancels a job.
2311 188c5e0a Michael Hanselmann

2312 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
2313 ea03467c Iustin Pop

2314 76b62028 Iustin Pop
    @type job_id: int
2315 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
2316 188c5e0a Michael Hanselmann

2317 188c5e0a Michael Hanselmann
    """
2318 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
2319 188c5e0a Michael Hanselmann
2320 aebd0e4e Michael Hanselmann
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2321 aebd0e4e Michael Hanselmann
2322 4679547e Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2323 4679547e Michael Hanselmann
  def ChangeJobPriority(self, job_id, priority):
2324 4679547e Michael Hanselmann
    """Changes a job's priority.
2325 4679547e Michael Hanselmann

2326 4679547e Michael Hanselmann
    @type job_id: int
2327 4679547e Michael Hanselmann
    @param job_id: ID of the job whose priority should be changed
2328 4679547e Michael Hanselmann
    @type priority: int
2329 4679547e Michael Hanselmann
    @param priority: New priority
2330 4679547e Michael Hanselmann

2331 4679547e Michael Hanselmann
    """
2332 4679547e Michael Hanselmann
    logging.info("Changing priority of job %s to %s", job_id, priority)
2333 4679547e Michael Hanselmann
2334 4679547e Michael Hanselmann
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2335 4679547e Michael Hanselmann
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2336 4679547e Michael Hanselmann
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2337 4679547e Michael Hanselmann
                                (priority, allowed))
2338 4679547e Michael Hanselmann
2339 4679547e Michael Hanselmann
    def fn(job):
2340 4679547e Michael Hanselmann
      (success, msg) = job.ChangePriority(priority)
2341 4679547e Michael Hanselmann
2342 4679547e Michael Hanselmann
      if success:
2343 4679547e Michael Hanselmann
        try:
2344 4679547e Michael Hanselmann
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2345 4679547e Michael Hanselmann
        except workerpool.NoSuchTask:
2346 4679547e Michael Hanselmann
          logging.debug("Job %s is not in workerpool at this time", job.id)
2347 4679547e Michael Hanselmann
2348 4679547e Michael Hanselmann
      return (success, msg)
2349 4679547e Michael Hanselmann
2350 4679547e Michael Hanselmann
    return self._ModifyJobUnlocked(job_id, fn)
2351 4679547e Michael Hanselmann
2352 aebd0e4e Michael Hanselmann
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2353 aebd0e4e Michael Hanselmann
    """Modifies a job.
2354 aebd0e4e Michael Hanselmann

2355 aebd0e4e Michael Hanselmann
    @type job_id: int
2356 aebd0e4e Michael Hanselmann
    @param job_id: Job ID
2357 aebd0e4e Michael Hanselmann
    @type mod_fn: callable
2358 aebd0e4e Michael Hanselmann
    @param mod_fn: Modifying function, receiving job object as parameter,
2359 aebd0e4e Michael Hanselmann
      returning tuple of (status boolean, message string)
2360 aebd0e4e Michael Hanselmann

2361 aebd0e4e Michael Hanselmann
    """
2362 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2363 188c5e0a Michael Hanselmann
    if not job:
2364 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2365 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
2366 fbf0262f Michael Hanselmann
2367 aebd0e4e Michael Hanselmann
    assert job.writable, "Can't modify read-only job"
2368 aebd0e4e Michael Hanselmann
    assert not job.archived, "Can't modify archived job"
2369 c0f6d0d8 Michael Hanselmann
2370 aebd0e4e Michael Hanselmann
    (success, msg) = mod_fn(job)
2371 188c5e0a Michael Hanselmann
2372 099b2870 Michael Hanselmann
    if success:
2373 66bd7445 Michael Hanselmann
      # If the job was finalized (e.g. cancelled), this is the final write
2374 66bd7445 Michael Hanselmann
      # allowed. The job can be archived anytime.
2375 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
2376 fbf0262f Michael Hanselmann
2377 099b2870 Michael Hanselmann
    return (success, msg)
2378 fbf0262f Michael Hanselmann
2379 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
2380 d7fd1f28 Michael Hanselmann
    """Archives jobs.
2381 c609f802 Michael Hanselmann

2382 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
2383 25e7b43f Iustin Pop
    @param jobs: Job objects
2384 d7fd1f28 Michael Hanselmann
    @rtype: int
2385 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
2386 c609f802 Michael Hanselmann

2387 c609f802 Michael Hanselmann
    """
2388 d7fd1f28 Michael Hanselmann
    archive_jobs = []
2389 d7fd1f28 Michael Hanselmann
    rename_files = []
2390 d7fd1f28 Michael Hanselmann
    for job in jobs:
2391 c0f6d0d8 Michael Hanselmann
      assert job.writable, "Can't archive read-only job"
2392 8a3cd185 Michael Hanselmann
      assert not job.archived, "Can't cancel archived job"
2393 c0f6d0d8 Michael Hanselmann
2394 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2395 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
2396 d7fd1f28 Michael Hanselmann
        continue
2397 c609f802 Michael Hanselmann
2398 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
2399 c609f802 Michael Hanselmann
2400 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
2401 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
2402 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
2403 c609f802 Michael Hanselmann
2404 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
2405 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
2406 f1da30e6 Michael Hanselmann
2407 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
2408 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
2409 d7fd1f28 Michael Hanselmann
2410 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2411 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
2412 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
2413 20571a26 Guido Trotter
    # archived jobs to fix this.
2414 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
2415 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
2416 78d12585 Michael Hanselmann
2417 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2418 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
2419 07cd723a Iustin Pop
    """Archives a job.
2420 07cd723a Iustin Pop

2421 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2422 ea03467c Iustin Pop

2423 76b62028 Iustin Pop
    @type job_id: int
2424 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
2425 78d12585 Michael Hanselmann
    @rtype: bool
2426 78d12585 Michael Hanselmann
    @return: Whether job was archived
2427 07cd723a Iustin Pop

2428 07cd723a Iustin Pop
    """
2429 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
2430 78d12585 Michael Hanselmann
2431 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
2432 78d12585 Michael Hanselmann
    if not job:
2433 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
2434 78d12585 Michael Hanselmann
      return False
2435 78d12585 Michael Hanselmann
2436 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
2437 07cd723a Iustin Pop
2438 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2439 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
2440 07cd723a Iustin Pop
    """Archives all jobs based on age.
2441 07cd723a Iustin Pop

2442 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
2443 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
2444 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
2445 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
2446 07cd723a Iustin Pop

2447 07cd723a Iustin Pop
    @type age: int
2448 07cd723a Iustin Pop
    @param age: the minimum age in seconds
2449 07cd723a Iustin Pop

2450 07cd723a Iustin Pop
    """
2451 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
2452 07cd723a Iustin Pop
2453 07cd723a Iustin Pop
    now = time.time()
2454 f8ad5591 Michael Hanselmann
    end_time = now + timeout
2455 f8ad5591 Michael Hanselmann
    archived_count = 0
2456 f8ad5591 Michael Hanselmann
    last_touched = 0
2457 f8ad5591 Michael Hanselmann
2458 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
2459 d7fd1f28 Michael Hanselmann
    pending = []
2460 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
2461 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
2462 f8ad5591 Michael Hanselmann
2463 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
2464 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
2465 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
2466 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
2467 f8ad5591 Michael Hanselmann
        break
2468 f8ad5591 Michael Hanselmann
2469 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
2470 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
2471 f8ad5591 Michael Hanselmann
      if job:
2472 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
2473 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
2474 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
2475 f8ad5591 Michael Hanselmann
          else:
2476 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
2477 07cd723a Iustin Pop
        else:
2478 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
2479 f8ad5591 Michael Hanselmann
2480 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
2481 d7fd1f28 Michael Hanselmann
          pending.append(job)
2482 d7fd1f28 Michael Hanselmann
2483 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
2484 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
2485 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
2486 d7fd1f28 Michael Hanselmann
            pending = []
2487 f8ad5591 Michael Hanselmann
2488 d7fd1f28 Michael Hanselmann
    if pending:
2489 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
2490 07cd723a Iustin Pop
2491 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
2492 07cd723a Iustin Pop
2493 e07f7f7a Michael Hanselmann
  def _Query(self, fields, qfilter):
2494 e07f7f7a Michael Hanselmann
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2495 e07f7f7a Michael Hanselmann
                       namefield="id")
2496 e07f7f7a Michael Hanselmann
2497 0422250e Michael Hanselmann
    # Archived jobs are only looked at if the "archived" field is referenced
2498 0422250e Michael Hanselmann
    # either as a requested field or in the filter. By default archived jobs
2499 0422250e Michael Hanselmann
    # are ignored.
2500 0422250e Michael Hanselmann
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2501 0422250e Michael Hanselmann
2502 e07f7f7a Michael Hanselmann
    job_ids = qobj.RequestedNames()
2503 e07f7f7a Michael Hanselmann
2504 e07f7f7a Michael Hanselmann
    list_all = (job_ids is None)
2505 e07f7f7a Michael Hanselmann
2506 e07f7f7a Michael Hanselmann
    if list_all:
2507 e07f7f7a Michael Hanselmann
      # Since files are added to/removed from the queue atomically, there's no
2508 e07f7f7a Michael Hanselmann
      # risk of getting the job ids in an inconsistent state.
2509 0422250e Michael Hanselmann
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2510 e07f7f7a Michael Hanselmann
2511 e07f7f7a Michael Hanselmann
    jobs = []
2512 e07f7f7a Michael Hanselmann
2513 e07f7f7a Michael Hanselmann
    for job_id in job_ids:
2514 e07f7f7a Michael Hanselmann
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2515 e07f7f7a Michael Hanselmann
      if job is not None or not list_all:
2516 e07f7f7a Michael Hanselmann
        jobs.append((job_id, job))
2517 e07f7f7a Michael Hanselmann
2518 e07f7f7a Michael Hanselmann
    return (qobj, jobs, list_all)
2519 e07f7f7a Michael Hanselmann
2520 e07f7f7a Michael Hanselmann
  def QueryJobs(self, fields, qfilter):
2521 e07f7f7a Michael Hanselmann
    """Returns a list of jobs in queue.
2522 e07f7f7a Michael Hanselmann

2523 e07f7f7a Michael Hanselmann
    @type fields: sequence
2524 e07f7f7a Michael Hanselmann
    @param fields: List of wanted fields
2525 e07f7f7a Michael Hanselmann
    @type qfilter: None or query2 filter (list)
2526 e07f7f7a Michael Hanselmann
    @param qfilter: Query filter
2527 e07f7f7a Michael Hanselmann

2528 e07f7f7a Michael Hanselmann
    """
2529 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2530 e07f7f7a Michael Hanselmann
2531 76b62028 Iustin Pop
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2532 e07f7f7a Michael Hanselmann
2533 e07f7f7a Michael Hanselmann
  def OldStyleQueryJobs(self, job_ids, fields):
2534 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
2535 e2715f69 Michael Hanselmann

2536 ea03467c Iustin Pop
    @type job_ids: list
2537 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
2538 ea03467c Iustin Pop
    @type fields: list
2539 ea03467c Iustin Pop
    @param fields: names of fields to return
2540 ea03467c Iustin Pop
    @rtype: list
2541 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
2542 ea03467c Iustin Pop
        the requested fields
2543 e2715f69 Michael Hanselmann

2544 e2715f69 Michael Hanselmann
    """
2545 76b62028 Iustin Pop
    # backwards compat:
2546 76b62028 Iustin Pop
    job_ids = [int(jid) for jid in job_ids]
2547 e07f7f7a Michael Hanselmann
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2548 e2715f69 Michael Hanselmann
2549 76b62028 Iustin Pop
    (qobj, ctx, _) = self._Query(fields, qfilter)
2550 e2715f69 Michael Hanselmann
2551 76b62028 Iustin Pop
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2552 e2715f69 Michael Hanselmann
2553 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2554 6d5ea385 Michael Hanselmann
  def PrepareShutdown(self):
2555 6d5ea385 Michael Hanselmann
    """Prepare to stop the job queue.
2556 6d5ea385 Michael Hanselmann

2557 6d5ea385 Michael Hanselmann
    Disables execution of jobs in the workerpool and returns whether there are
2558 6d5ea385 Michael Hanselmann
    any jobs currently running. If the latter is the case, the job queue is not
2559 6d5ea385 Michael Hanselmann
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2560 6d5ea385 Michael Hanselmann
    be called without interfering with any job. Queued and unfinished jobs will
2561 6d5ea385 Michael Hanselmann
    be resumed next time.
2562 6d5ea385 Michael Hanselmann

2563 6d5ea385 Michael Hanselmann
    Once this function has been called no new job submissions will be accepted
2564 6d5ea385 Michael Hanselmann
    (see L{_RequireNonDrainedQueue}).
2565 6d5ea385 Michael Hanselmann

2566 6d5ea385 Michael Hanselmann
    @rtype: bool
2567 6d5ea385 Michael Hanselmann
    @return: Whether there are any running jobs
2568 6d5ea385 Michael Hanselmann

2569 6d5ea385 Michael Hanselmann
    """
2570 6d5ea385 Michael Hanselmann
    if self._accepting_jobs:
2571 6d5ea385 Michael Hanselmann
      self._accepting_jobs = False
2572 6d5ea385 Michael Hanselmann
2573 6d5ea385 Michael Hanselmann
      # Tell worker pool to stop processing pending tasks
2574 6d5ea385 Michael Hanselmann
      self._wpool.SetActive(False)
2575 6d5ea385 Michael Hanselmann
2576 6d5ea385 Michael Hanselmann
    return self._wpool.HasRunningTasks()
2577 6d5ea385 Michael Hanselmann
2578 942e2262 Michael Hanselmann
  def AcceptingJobsUnlocked(self):
2579 942e2262 Michael Hanselmann
    """Returns whether jobs are accepted.
2580 942e2262 Michael Hanselmann

2581 942e2262 Michael Hanselmann
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2582 942e2262 Michael Hanselmann
    queue is shutting down.
2583 942e2262 Michael Hanselmann

2584 942e2262 Michael Hanselmann
    @rtype: bool
2585 942e2262 Michael Hanselmann

2586 942e2262 Michael Hanselmann
    """
2587 942e2262 Michael Hanselmann
    return self._accepting_jobs
2588 942e2262 Michael Hanselmann
2589 6d5ea385 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
2590 e2715f69 Michael Hanselmann
  def Shutdown(self):
2591 e2715f69 Michael Hanselmann
    """Stops the job queue.
2592 e2715f69 Michael Hanselmann

2593 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
2594 ea03467c Iustin Pop

2595 e2715f69 Michael Hanselmann
    """
2596 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()