Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 0aeeb6e3

History | View | Annotate | Download (59.1 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 7f93570a Iustin Pop
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 e2715f69 Michael Hanselmann
import logging
33 f1da30e6 Michael Hanselmann
import errno
34 f1da30e6 Michael Hanselmann
import re
35 f1048938 Iustin Pop
import time
36 5685c1a5 Michael Hanselmann
import weakref
37 498ae1cc Iustin Pop
38 6c2549d6 Guido Trotter
try:
39 6c2549d6 Guido Trotter
  # pylint: disable-msg=E0611
40 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
41 6c2549d6 Guido Trotter
except ImportError:
42 6c2549d6 Guido Trotter
  import pyinotify
43 6c2549d6 Guido Trotter
44 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
45 e2715f69 Michael Hanselmann
from ganeti import constants
46 f1da30e6 Michael Hanselmann
from ganeti import serializer
47 e2715f69 Michael Hanselmann
from ganeti import workerpool
48 99bd4f0a Guido Trotter
from ganeti import locking
49 f1da30e6 Michael Hanselmann
from ganeti import opcodes
50 7a1ecaed Iustin Pop
from ganeti import errors
51 e2715f69 Michael Hanselmann
from ganeti import mcpu
52 7996a135 Iustin Pop
from ganeti import utils
53 04ab05ce Michael Hanselmann
from ganeti import jstore
54 c3f0a12f Iustin Pop
from ganeti import rpc
55 82b22e19 René Nussbaumer
from ganeti import runtime
56 a744b676 Manuel Franceschini
from ganeti import netutils
57 989a8bee Michael Hanselmann
from ganeti import compat
58 e2715f69 Michael Hanselmann
59 fbf0262f Michael Hanselmann
60 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
61 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
62 e2715f69 Michael Hanselmann
63 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
64 ebb80afa Guido Trotter
_LOCK = "_lock"
65 ebb80afa Guido Trotter
_QUEUE = "_queue"
66 99bd4f0a Guido Trotter
67 498ae1cc Iustin Pop
68 9728ae5d Iustin Pop
class CancelJob(Exception):
69 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
70 fbf0262f Michael Hanselmann

71 fbf0262f Michael Hanselmann
  """
72 fbf0262f Michael Hanselmann
73 fbf0262f Michael Hanselmann
74 70552c46 Michael Hanselmann
def TimeStampNow():
75 ea03467c Iustin Pop
  """Returns the current timestamp.
76 ea03467c Iustin Pop

77 ea03467c Iustin Pop
  @rtype: tuple
78 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
79 ea03467c Iustin Pop

80 ea03467c Iustin Pop
  """
81 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
82 70552c46 Michael Hanselmann
83 70552c46 Michael Hanselmann
84 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
85 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
86 e2715f69 Michael Hanselmann

87 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
88 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
89 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
90 ea03467c Iustin Pop
  @ivar status: the current status
91 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
92 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
93 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
95 f1048938 Iustin Pop

96 e2715f69 Michael Hanselmann
  """
97 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
98 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
99 66d895a8 Iustin Pop
               "__weakref__"]
100 66d895a8 Iustin Pop
101 85f03e0d Michael Hanselmann
  def __init__(self, op):
102 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
103 ea03467c Iustin Pop

104 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
105 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
106 ea03467c Iustin Pop

107 ea03467c Iustin Pop
    """
108 85f03e0d Michael Hanselmann
    self.input = op
109 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
110 85f03e0d Michael Hanselmann
    self.result = None
111 85f03e0d Michael Hanselmann
    self.log = []
112 70552c46 Michael Hanselmann
    self.start_timestamp = None
113 b9b5abcb Iustin Pop
    self.exec_timestamp = None
114 70552c46 Michael Hanselmann
    self.end_timestamp = None
115 f1da30e6 Michael Hanselmann
116 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
117 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
118 8f5c488d Michael Hanselmann
119 f1da30e6 Michael Hanselmann
  @classmethod
120 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
121 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
122 ea03467c Iustin Pop

123 ea03467c Iustin Pop
    @type state: dict
124 ea03467c Iustin Pop
    @param state: the serialized state
125 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
126 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
127 ea03467c Iustin Pop

128 ea03467c Iustin Pop
    """
129 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
130 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
131 85f03e0d Michael Hanselmann
    obj.status = state["status"]
132 85f03e0d Michael Hanselmann
    obj.result = state["result"]
133 85f03e0d Michael Hanselmann
    obj.log = state["log"]
134 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
135 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
136 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
137 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
138 f1da30e6 Michael Hanselmann
    return obj
139 f1da30e6 Michael Hanselmann
140 f1da30e6 Michael Hanselmann
  def Serialize(self):
141 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
142 ea03467c Iustin Pop

143 ea03467c Iustin Pop
    @rtype: dict
144 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
145 ea03467c Iustin Pop

146 ea03467c Iustin Pop
    """
147 6c5a7090 Michael Hanselmann
    return {
148 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
149 6c5a7090 Michael Hanselmann
      "status": self.status,
150 6c5a7090 Michael Hanselmann
      "result": self.result,
151 6c5a7090 Michael Hanselmann
      "log": self.log,
152 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
153 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
154 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
155 8f5c488d Michael Hanselmann
      "priority": self.priority,
156 6c5a7090 Michael Hanselmann
      }
157 f1048938 Iustin Pop
158 e2715f69 Michael Hanselmann
159 e2715f69 Michael Hanselmann
class _QueuedJob(object):
160 e2715f69 Michael Hanselmann
  """In-memory job representation.
161 e2715f69 Michael Hanselmann

162 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
163 ea03467c Iustin Pop
  be taken care of by users of this class.
164 ea03467c Iustin Pop

165 ea03467c Iustin Pop
  @type queue: L{JobQueue}
166 ea03467c Iustin Pop
  @ivar queue: the parent queue
167 ea03467c Iustin Pop
  @ivar id: the job ID
168 ea03467c Iustin Pop
  @type ops: list
169 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
170 ea03467c Iustin Pop
  @type log_serial: int
171 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
172 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
173 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
174 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
175 e2715f69 Michael Hanselmann

176 e2715f69 Michael Hanselmann
  """
177 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
178 26d3fd2f Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
179 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
180 66d895a8 Iustin Pop
               "__weakref__"]
181 66d895a8 Iustin Pop
182 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
183 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
184 ea03467c Iustin Pop

185 ea03467c Iustin Pop
    @type queue: L{JobQueue}
186 ea03467c Iustin Pop
    @param queue: our parent queue
187 ea03467c Iustin Pop
    @type job_id: job_id
188 ea03467c Iustin Pop
    @param job_id: our job id
189 ea03467c Iustin Pop
    @type ops: list
190 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
191 ea03467c Iustin Pop
        in _QueuedOpCodes
192 ea03467c Iustin Pop

193 ea03467c Iustin Pop
    """
194 e2715f69 Michael Hanselmann
    if not ops:
195 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
196 e2715f69 Michael Hanselmann
197 85f03e0d Michael Hanselmann
    self.queue = queue
198 f1da30e6 Michael Hanselmann
    self.id = job_id
199 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
200 6c5a7090 Michael Hanselmann
    self.log_serial = 0
201 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
202 c56ec146 Iustin Pop
    self.start_timestamp = None
203 c56ec146 Iustin Pop
    self.end_timestamp = None
204 6c5a7090 Michael Hanselmann
205 fa4aa6b4 Michael Hanselmann
    self._InitInMemory(self)
206 fa4aa6b4 Michael Hanselmann
207 fa4aa6b4 Michael Hanselmann
  @staticmethod
208 fa4aa6b4 Michael Hanselmann
  def _InitInMemory(obj):
209 fa4aa6b4 Michael Hanselmann
    """Initializes in-memory variables.
210 fa4aa6b4 Michael Hanselmann

211 fa4aa6b4 Michael Hanselmann
    """
212 03b63608 Michael Hanselmann
    obj.ops_iter = None
213 26d3fd2f Michael Hanselmann
    obj.cur_opctx = None
214 be760ba8 Michael Hanselmann
215 9fa2e150 Michael Hanselmann
  def __repr__(self):
216 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
217 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
218 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
219 9fa2e150 Michael Hanselmann
220 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
221 9fa2e150 Michael Hanselmann
222 f1da30e6 Michael Hanselmann
  @classmethod
223 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
224 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
225 ea03467c Iustin Pop

226 ea03467c Iustin Pop
    @type queue: L{JobQueue}
227 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
228 ea03467c Iustin Pop
    @type state: dict
229 ea03467c Iustin Pop
    @param state: the serialized state
230 ea03467c Iustin Pop
    @rtype: _JobQueue
231 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
232 ea03467c Iustin Pop

233 ea03467c Iustin Pop
    """
234 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
235 85f03e0d Michael Hanselmann
    obj.queue = queue
236 85f03e0d Michael Hanselmann
    obj.id = state["id"]
237 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
238 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
239 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
240 6c5a7090 Michael Hanselmann
241 6c5a7090 Michael Hanselmann
    obj.ops = []
242 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
243 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
244 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
245 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
246 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
247 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
248 6c5a7090 Michael Hanselmann
249 fa4aa6b4 Michael Hanselmann
    cls._InitInMemory(obj)
250 be760ba8 Michael Hanselmann
251 f1da30e6 Michael Hanselmann
    return obj
252 f1da30e6 Michael Hanselmann
253 f1da30e6 Michael Hanselmann
  def Serialize(self):
254 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
255 ea03467c Iustin Pop

256 ea03467c Iustin Pop
    @rtype: dict
257 ea03467c Iustin Pop
    @return: the serialized state
258 ea03467c Iustin Pop

259 ea03467c Iustin Pop
    """
260 f1da30e6 Michael Hanselmann
    return {
261 f1da30e6 Michael Hanselmann
      "id": self.id,
262 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
263 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
264 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
265 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
266 f1da30e6 Michael Hanselmann
      }
267 f1da30e6 Michael Hanselmann
268 85f03e0d Michael Hanselmann
  def CalcStatus(self):
269 ea03467c Iustin Pop
    """Compute the status of this job.
270 ea03467c Iustin Pop

271 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
272 ea03467c Iustin Pop
    based on their status, computes the job status.
273 ea03467c Iustin Pop

274 ea03467c Iustin Pop
    The algorithm is:
275 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
276 ea03467c Iustin Pop
        status will be the same
277 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
278 ea03467c Iustin Pop
          - waitlock
279 fbf0262f Michael Hanselmann
          - canceling
280 ea03467c Iustin Pop
          - running
281 ea03467c Iustin Pop

282 ea03467c Iustin Pop
        will determine the job status
283 ea03467c Iustin Pop

284 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
285 ea03467c Iustin Pop
        and the job status will be the same
286 ea03467c Iustin Pop

287 ea03467c Iustin Pop
    @return: the job status
288 ea03467c Iustin Pop

289 ea03467c Iustin Pop
    """
290 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
291 e2715f69 Michael Hanselmann
292 e2715f69 Michael Hanselmann
    all_success = True
293 85f03e0d Michael Hanselmann
    for op in self.ops:
294 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
295 e2715f69 Michael Hanselmann
        continue
296 e2715f69 Michael Hanselmann
297 e2715f69 Michael Hanselmann
      all_success = False
298 e2715f69 Michael Hanselmann
299 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
300 e2715f69 Michael Hanselmann
        pass
301 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
302 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
303 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
304 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
305 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
306 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
307 fbf0262f Michael Hanselmann
        break
308 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
309 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
310 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
311 f1da30e6 Michael Hanselmann
        break
312 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
313 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
314 4cb1d919 Michael Hanselmann
        break
315 e2715f69 Michael Hanselmann
316 e2715f69 Michael Hanselmann
    if all_success:
317 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
318 e2715f69 Michael Hanselmann
319 e2715f69 Michael Hanselmann
    return status
320 e2715f69 Michael Hanselmann
321 8f5c488d Michael Hanselmann
  def CalcPriority(self):
322 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
323 8f5c488d Michael Hanselmann

324 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
325 8f5c488d Michael Hanselmann
    priority is used.
326 8f5c488d Michael Hanselmann

327 8f5c488d Michael Hanselmann
    @rtype: int
328 8f5c488d Michael Hanselmann

329 8f5c488d Michael Hanselmann
    """
330 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
331 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
332 8f5c488d Michael Hanselmann
333 8f5c488d Michael Hanselmann
    if not priorities:
334 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
335 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
336 8f5c488d Michael Hanselmann
337 8f5c488d Michael Hanselmann
    return min(priorities)
338 8f5c488d Michael Hanselmann
339 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
340 ea03467c Iustin Pop
    """Selectively returns the log entries.
341 ea03467c Iustin Pop

342 ea03467c Iustin Pop
    @type newer_than: None or int
343 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
344 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
345 ea03467c Iustin Pop
        than this value
346 ea03467c Iustin Pop
    @rtype: list
347 ea03467c Iustin Pop
    @return: the list of the log entries selected
348 ea03467c Iustin Pop

349 ea03467c Iustin Pop
    """
350 6c5a7090 Michael Hanselmann
    if newer_than is None:
351 6c5a7090 Michael Hanselmann
      serial = -1
352 6c5a7090 Michael Hanselmann
    else:
353 6c5a7090 Michael Hanselmann
      serial = newer_than
354 6c5a7090 Michael Hanselmann
355 6c5a7090 Michael Hanselmann
    entries = []
356 6c5a7090 Michael Hanselmann
    for op in self.ops:
357 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
358 6c5a7090 Michael Hanselmann
359 6c5a7090 Michael Hanselmann
    return entries
360 6c5a7090 Michael Hanselmann
361 6a290889 Guido Trotter
  def GetInfo(self, fields):
362 6a290889 Guido Trotter
    """Returns information about a job.
363 6a290889 Guido Trotter

364 6a290889 Guido Trotter
    @type fields: list
365 6a290889 Guido Trotter
    @param fields: names of fields to return
366 6a290889 Guido Trotter
    @rtype: list
367 6a290889 Guido Trotter
    @return: list with one element for each field
368 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
369 6a290889 Guido Trotter
        has been passed
370 6a290889 Guido Trotter

371 6a290889 Guido Trotter
    """
372 6a290889 Guido Trotter
    row = []
373 6a290889 Guido Trotter
    for fname in fields:
374 6a290889 Guido Trotter
      if fname == "id":
375 6a290889 Guido Trotter
        row.append(self.id)
376 6a290889 Guido Trotter
      elif fname == "status":
377 6a290889 Guido Trotter
        row.append(self.CalcStatus())
378 b8802cc4 Michael Hanselmann
      elif fname == "priority":
379 b8802cc4 Michael Hanselmann
        row.append(self.CalcPriority())
380 6a290889 Guido Trotter
      elif fname == "ops":
381 6a290889 Guido Trotter
        row.append([op.input.__getstate__() for op in self.ops])
382 6a290889 Guido Trotter
      elif fname == "opresult":
383 6a290889 Guido Trotter
        row.append([op.result for op in self.ops])
384 6a290889 Guido Trotter
      elif fname == "opstatus":
385 6a290889 Guido Trotter
        row.append([op.status for op in self.ops])
386 6a290889 Guido Trotter
      elif fname == "oplog":
387 6a290889 Guido Trotter
        row.append([op.log for op in self.ops])
388 6a290889 Guido Trotter
      elif fname == "opstart":
389 6a290889 Guido Trotter
        row.append([op.start_timestamp for op in self.ops])
390 6a290889 Guido Trotter
      elif fname == "opexec":
391 6a290889 Guido Trotter
        row.append([op.exec_timestamp for op in self.ops])
392 6a290889 Guido Trotter
      elif fname == "opend":
393 6a290889 Guido Trotter
        row.append([op.end_timestamp for op in self.ops])
394 b8802cc4 Michael Hanselmann
      elif fname == "oppriority":
395 b8802cc4 Michael Hanselmann
        row.append([op.priority for op in self.ops])
396 6a290889 Guido Trotter
      elif fname == "received_ts":
397 6a290889 Guido Trotter
        row.append(self.received_timestamp)
398 6a290889 Guido Trotter
      elif fname == "start_ts":
399 6a290889 Guido Trotter
        row.append(self.start_timestamp)
400 6a290889 Guido Trotter
      elif fname == "end_ts":
401 6a290889 Guido Trotter
        row.append(self.end_timestamp)
402 6a290889 Guido Trotter
      elif fname == "summary":
403 6a290889 Guido Trotter
        row.append([op.input.Summary() for op in self.ops])
404 6a290889 Guido Trotter
      else:
405 6a290889 Guido Trotter
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
406 6a290889 Guido Trotter
    return row
407 6a290889 Guido Trotter
408 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
409 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
410 34327f51 Iustin Pop

411 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
412 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
413 34327f51 Iustin Pop
    finalised are not changed.
414 34327f51 Iustin Pop

415 34327f51 Iustin Pop
    @param status: a given opcode status
416 34327f51 Iustin Pop
    @param result: the opcode result
417 34327f51 Iustin Pop

418 34327f51 Iustin Pop
    """
419 747f6113 Michael Hanselmann
    not_marked = True
420 747f6113 Michael Hanselmann
    for op in self.ops:
421 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
422 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
423 747f6113 Michael Hanselmann
        continue
424 747f6113 Michael Hanselmann
      op.status = status
425 747f6113 Michael Hanselmann
      op.result = result
426 747f6113 Michael Hanselmann
      not_marked = False
427 34327f51 Iustin Pop
428 099b2870 Michael Hanselmann
  def Cancel(self):
429 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
430 a0d2fe2c Michael Hanselmann

431 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
432 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
433 a0d2fe2c Michael Hanselmann
      as canceling and a text message
434 a0d2fe2c Michael Hanselmann

435 a0d2fe2c Michael Hanselmann
    """
436 099b2870 Michael Hanselmann
    status = self.CalcStatus()
437 099b2870 Michael Hanselmann
438 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
439 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
440 099b2870 Michael Hanselmann
                             "Job canceled by request")
441 86b16e9d Michael Hanselmann
      return (True, "Job %s canceled" % self.id)
442 099b2870 Michael Hanselmann
443 099b2870 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITLOCK:
444 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
445 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
446 86b16e9d Michael Hanselmann
      return (True, "Job %s will be canceled" % self.id)
447 099b2870 Michael Hanselmann
448 86b16e9d Michael Hanselmann
    else:
449 86b16e9d Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
450 86b16e9d Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
451 099b2870 Michael Hanselmann
452 f1048938 Iustin Pop
453 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
454 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
455 031a3e57 Michael Hanselmann
    """Initializes this class.
456 ea03467c Iustin Pop

457 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
458 031a3e57 Michael Hanselmann
    @param queue: Job queue
459 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
460 031a3e57 Michael Hanselmann
    @param job: Job object
461 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
462 031a3e57 Michael Hanselmann
    @param op: OpCode
463 031a3e57 Michael Hanselmann

464 031a3e57 Michael Hanselmann
    """
465 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
466 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
467 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
468 031a3e57 Michael Hanselmann
469 031a3e57 Michael Hanselmann
    self._queue = queue
470 031a3e57 Michael Hanselmann
    self._job = job
471 031a3e57 Michael Hanselmann
    self._op = op
472 031a3e57 Michael Hanselmann
473 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
474 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
475 dc1e2262 Michael Hanselmann

476 dc1e2262 Michael Hanselmann
    """
477 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
478 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
479 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
480 dc1e2262 Michael Hanselmann
      raise CancelJob()
481 dc1e2262 Michael Hanselmann
482 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
483 031a3e57 Michael Hanselmann
  def NotifyStart(self):
484 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
485 e92376d7 Iustin Pop

486 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
487 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
488 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
489 031a3e57 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
490 e92376d7 Iustin Pop

491 e92376d7 Iustin Pop
    """
492 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
493 271daef8 Iustin Pop
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
494 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
495 fbf0262f Michael Hanselmann
496 271daef8 Iustin Pop
    # Cancel here if we were asked to
497 dc1e2262 Michael Hanselmann
    self._CheckCancel()
498 fbf0262f Michael Hanselmann
499 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
500 9bdab621 Michael Hanselmann
501 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
502 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
503 271daef8 Iustin Pop
504 271daef8 Iustin Pop
    # And finally replicate the job status
505 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
506 031a3e57 Michael Hanselmann
507 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
508 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
509 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
510 9bf5e01f Guido Trotter

511 9bf5e01f Guido Trotter
    """
512 9bf5e01f Guido Trotter
    self._job.log_serial += 1
513 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
514 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
515 9bf5e01f Guido Trotter
516 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
517 031a3e57 Michael Hanselmann
    """Append a log entry.
518 031a3e57 Michael Hanselmann

519 031a3e57 Michael Hanselmann
    """
520 031a3e57 Michael Hanselmann
    assert len(args) < 3
521 031a3e57 Michael Hanselmann
522 031a3e57 Michael Hanselmann
    if len(args) == 1:
523 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
524 031a3e57 Michael Hanselmann
      log_msg = args[0]
525 031a3e57 Michael Hanselmann
    else:
526 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
527 031a3e57 Michael Hanselmann
528 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
529 031a3e57 Michael Hanselmann
    # precision.
530 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
531 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
532 031a3e57 Michael Hanselmann
533 acf931b7 Michael Hanselmann
  def CheckCancel(self):
534 acf931b7 Michael Hanselmann
    """Check whether job has been cancelled.
535 ef2df7d3 Michael Hanselmann

536 ef2df7d3 Michael Hanselmann
    """
537 dc1e2262 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
538 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
539 dc1e2262 Michael Hanselmann
540 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
541 dc1e2262 Michael Hanselmann
    self._CheckCancel()
542 dc1e2262 Michael Hanselmann
543 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
544 6a373640 Michael Hanselmann
    """Submits jobs for processing.
545 6a373640 Michael Hanselmann

546 6a373640 Michael Hanselmann
    See L{JobQueue.SubmitManyJobs}.
547 6a373640 Michael Hanselmann

548 6a373640 Michael Hanselmann
    """
549 6a373640 Michael Hanselmann
    # Locking is done in job queue
550 6a373640 Michael Hanselmann
    return self._queue.SubmitManyJobs(jobs)
551 6a373640 Michael Hanselmann
552 031a3e57 Michael Hanselmann
553 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
554 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
555 989a8bee Michael Hanselmann
    """Initializes this class.
556 6c2549d6 Guido Trotter

557 989a8bee Michael Hanselmann
    @type fields: list of strings
558 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
559 989a8bee Michael Hanselmann
    @type prev_job_info: string
560 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
561 989a8bee Michael Hanselmann
    @type prev_log_serial: string
562 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
563 6c2549d6 Guido Trotter

564 989a8bee Michael Hanselmann
    """
565 989a8bee Michael Hanselmann
    self._fields = fields
566 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
567 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
568 6c2549d6 Guido Trotter
569 989a8bee Michael Hanselmann
  def __call__(self, job):
570 989a8bee Michael Hanselmann
    """Checks whether job has changed.
571 6c2549d6 Guido Trotter

572 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
573 989a8bee Michael Hanselmann
    @param job: Job object
574 6c2549d6 Guido Trotter

575 6c2549d6 Guido Trotter
    """
576 989a8bee Michael Hanselmann
    status = job.CalcStatus()
577 989a8bee Michael Hanselmann
    job_info = job.GetInfo(self._fields)
578 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
579 6c2549d6 Guido Trotter
580 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
581 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
582 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
583 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
584 6c2549d6 Guido Trotter
    # significantly different.
585 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
586 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
587 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
588 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
589 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
590 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
591 6c2549d6 Guido Trotter
592 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
593 6c2549d6 Guido Trotter
    # no changes.
594 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
595 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
596 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_WAITLOCK) or
597 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
598 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
599 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
600 989a8bee Michael Hanselmann
      return (job_info, log_entries)
601 6c2549d6 Guido Trotter
602 989a8bee Michael Hanselmann
    return None
603 989a8bee Michael Hanselmann
604 989a8bee Michael Hanselmann
605 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
606 989a8bee Michael Hanselmann
  def __init__(self, filename):
607 989a8bee Michael Hanselmann
    """Initializes this class.
608 989a8bee Michael Hanselmann

609 989a8bee Michael Hanselmann
    @type filename: string
610 989a8bee Michael Hanselmann
    @param filename: Path to job file
611 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
612 6c2549d6 Guido Trotter

613 989a8bee Michael Hanselmann
    """
614 989a8bee Michael Hanselmann
    self._wm = pyinotify.WatchManager()
615 989a8bee Michael Hanselmann
    self._inotify_handler = \
616 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
617 989a8bee Michael Hanselmann
    self._notifier = \
618 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
619 989a8bee Michael Hanselmann
    try:
620 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
621 989a8bee Michael Hanselmann
    except Exception:
622 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
623 989a8bee Michael Hanselmann
      self._notifier.stop()
624 989a8bee Michael Hanselmann
      raise
625 989a8bee Michael Hanselmann
626 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
627 989a8bee Michael Hanselmann
    """Callback for inotify.
628 989a8bee Michael Hanselmann

629 989a8bee Michael Hanselmann
    """
630 6c2549d6 Guido Trotter
    if not notifier_enabled:
631 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
632 989a8bee Michael Hanselmann
633 989a8bee Michael Hanselmann
  def Wait(self, timeout):
634 989a8bee Michael Hanselmann
    """Waits for the job file to change.
635 989a8bee Michael Hanselmann

636 989a8bee Michael Hanselmann
    @type timeout: float
637 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
638 989a8bee Michael Hanselmann
    @return: Whether there have been events
639 989a8bee Michael Hanselmann

640 989a8bee Michael Hanselmann
    """
641 989a8bee Michael Hanselmann
    assert timeout >= 0
642 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
643 989a8bee Michael Hanselmann
    if have_events:
644 989a8bee Michael Hanselmann
      self._notifier.read_events()
645 989a8bee Michael Hanselmann
    self._notifier.process_events()
646 989a8bee Michael Hanselmann
    return have_events
647 989a8bee Michael Hanselmann
648 989a8bee Michael Hanselmann
  def Close(self):
649 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
650 989a8bee Michael Hanselmann

651 989a8bee Michael Hanselmann
    """
652 989a8bee Michael Hanselmann
    self._notifier.stop()
653 989a8bee Michael Hanselmann
654 989a8bee Michael Hanselmann
655 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
656 989a8bee Michael Hanselmann
  def __init__(self, filename):
657 989a8bee Michael Hanselmann
    """Initializes this class.
658 989a8bee Michael Hanselmann

659 989a8bee Michael Hanselmann
    @type filename: string
660 989a8bee Michael Hanselmann
    @param filename: Path to job file
661 989a8bee Michael Hanselmann

662 989a8bee Michael Hanselmann
    """
663 989a8bee Michael Hanselmann
    self._filewaiter = None
664 989a8bee Michael Hanselmann
    self._filename = filename
665 6c2549d6 Guido Trotter
666 989a8bee Michael Hanselmann
  def Wait(self, timeout):
667 989a8bee Michael Hanselmann
    """Waits for a job to change.
668 6c2549d6 Guido Trotter

669 989a8bee Michael Hanselmann
    @type timeout: float
670 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
671 989a8bee Michael Hanselmann
    @return: Whether there have been events
672 989a8bee Michael Hanselmann

673 989a8bee Michael Hanselmann
    """
674 989a8bee Michael Hanselmann
    if self._filewaiter:
675 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
676 989a8bee Michael Hanselmann
677 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
678 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
679 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
680 989a8bee Michael Hanselmann
    # race condition.
681 989a8bee Michael Hanselmann
    self._filewaiter = _JobFileChangesWaiter(self._filename)
682 989a8bee Michael Hanselmann
683 989a8bee Michael Hanselmann
    return True
684 989a8bee Michael Hanselmann
685 989a8bee Michael Hanselmann
  def Close(self):
686 989a8bee Michael Hanselmann
    """Closes underlying waiter.
687 989a8bee Michael Hanselmann

688 989a8bee Michael Hanselmann
    """
689 989a8bee Michael Hanselmann
    if self._filewaiter:
690 989a8bee Michael Hanselmann
      self._filewaiter.Close()
691 989a8bee Michael Hanselmann
692 989a8bee Michael Hanselmann
693 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
694 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
695 989a8bee Michael Hanselmann

696 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
697 989a8bee Michael Hanselmann
  the current job status has changed.
698 989a8bee Michael Hanselmann

699 989a8bee Michael Hanselmann
  """
700 989a8bee Michael Hanselmann
  @staticmethod
701 989a8bee Michael Hanselmann
  def _CheckForChanges(job_load_fn, check_fn):
702 989a8bee Michael Hanselmann
    job = job_load_fn()
703 989a8bee Michael Hanselmann
    if not job:
704 989a8bee Michael Hanselmann
      raise errors.JobLost()
705 989a8bee Michael Hanselmann
706 989a8bee Michael Hanselmann
    result = check_fn(job)
707 989a8bee Michael Hanselmann
    if result is None:
708 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
709 989a8bee Michael Hanselmann
710 989a8bee Michael Hanselmann
    return result
711 989a8bee Michael Hanselmann
712 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
713 989a8bee Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout):
714 989a8bee Michael Hanselmann
    """Waits for changes on a job.
715 989a8bee Michael Hanselmann

716 989a8bee Michael Hanselmann
    @type filename: string
717 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
718 989a8bee Michael Hanselmann
    @type job_load_fn: callable
719 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
720 989a8bee Michael Hanselmann
    @type fields: list of strings
721 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
722 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
723 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
724 989a8bee Michael Hanselmann
    @type prev_log_serial: int
725 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
726 989a8bee Michael Hanselmann
    @type timeout: float
727 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
728 989a8bee Michael Hanselmann

729 989a8bee Michael Hanselmann
    """
730 6c2549d6 Guido Trotter
    try:
731 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
732 989a8bee Michael Hanselmann
      waiter = _JobChangesWaiter(filename)
733 989a8bee Michael Hanselmann
      try:
734 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
735 989a8bee Michael Hanselmann
                                          job_load_fn, check_fn),
736 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
737 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
738 989a8bee Michael Hanselmann
      finally:
739 989a8bee Michael Hanselmann
        waiter.Close()
740 6c2549d6 Guido Trotter
    except (errors.InotifyError, errors.JobLost):
741 6c2549d6 Guido Trotter
      return None
742 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
743 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
744 6c2549d6 Guido Trotter
745 6c2549d6 Guido Trotter
746 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
747 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
748 6760e4ed Michael Hanselmann

749 6760e4ed Michael Hanselmann
  """
750 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
751 6760e4ed Michael Hanselmann
    to_encode = err
752 6760e4ed Michael Hanselmann
  else:
753 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
754 6760e4ed Michael Hanselmann
755 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
756 6760e4ed Michael Hanselmann
757 6760e4ed Michael Hanselmann
758 26d3fd2f Michael Hanselmann
class _TimeoutStrategyWrapper:
759 26d3fd2f Michael Hanselmann
  def __init__(self, fn):
760 26d3fd2f Michael Hanselmann
    """Initializes this class.
761 26d3fd2f Michael Hanselmann

762 26d3fd2f Michael Hanselmann
    """
763 26d3fd2f Michael Hanselmann
    self._fn = fn
764 26d3fd2f Michael Hanselmann
    self._next = None
765 26d3fd2f Michael Hanselmann
766 26d3fd2f Michael Hanselmann
  def _Advance(self):
767 26d3fd2f Michael Hanselmann
    """Gets the next timeout if necessary.
768 26d3fd2f Michael Hanselmann

769 26d3fd2f Michael Hanselmann
    """
770 26d3fd2f Michael Hanselmann
    if self._next is None:
771 26d3fd2f Michael Hanselmann
      self._next = self._fn()
772 26d3fd2f Michael Hanselmann
773 26d3fd2f Michael Hanselmann
  def Peek(self):
774 26d3fd2f Michael Hanselmann
    """Returns the next timeout.
775 26d3fd2f Michael Hanselmann

776 26d3fd2f Michael Hanselmann
    """
777 26d3fd2f Michael Hanselmann
    self._Advance()
778 26d3fd2f Michael Hanselmann
    return self._next
779 26d3fd2f Michael Hanselmann
780 26d3fd2f Michael Hanselmann
  def Next(self):
781 26d3fd2f Michael Hanselmann
    """Returns the current timeout and advances the internal state.
782 26d3fd2f Michael Hanselmann

783 26d3fd2f Michael Hanselmann
    """
784 26d3fd2f Michael Hanselmann
    self._Advance()
785 26d3fd2f Michael Hanselmann
    result = self._next
786 26d3fd2f Michael Hanselmann
    self._next = None
787 26d3fd2f Michael Hanselmann
    return result
788 26d3fd2f Michael Hanselmann
789 26d3fd2f Michael Hanselmann
790 b80cc518 Michael Hanselmann
class _OpExecContext:
791 26d3fd2f Michael Hanselmann
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
792 b80cc518 Michael Hanselmann
    """Initializes this class.
793 b80cc518 Michael Hanselmann

794 b80cc518 Michael Hanselmann
    """
795 b80cc518 Michael Hanselmann
    self.op = op
796 b80cc518 Michael Hanselmann
    self.index = index
797 b80cc518 Michael Hanselmann
    self.log_prefix = log_prefix
798 b80cc518 Michael Hanselmann
    self.summary = op.input.Summary()
799 b80cc518 Michael Hanselmann
800 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = timeout_strategy_factory
801 26d3fd2f Michael Hanselmann
    self._ResetTimeoutStrategy()
802 26d3fd2f Michael Hanselmann
803 26d3fd2f Michael Hanselmann
  def _ResetTimeoutStrategy(self):
804 26d3fd2f Michael Hanselmann
    """Creates a new timeout strategy.
805 26d3fd2f Michael Hanselmann

806 26d3fd2f Michael Hanselmann
    """
807 26d3fd2f Michael Hanselmann
    self._timeout_strategy = \
808 26d3fd2f Michael Hanselmann
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
809 26d3fd2f Michael Hanselmann
810 26d3fd2f Michael Hanselmann
  def CheckPriorityIncrease(self):
811 26d3fd2f Michael Hanselmann
    """Checks whether priority can and should be increased.
812 26d3fd2f Michael Hanselmann

813 26d3fd2f Michael Hanselmann
    Called when locks couldn't be acquired.
814 26d3fd2f Michael Hanselmann

815 26d3fd2f Michael Hanselmann
    """
816 26d3fd2f Michael Hanselmann
    op = self.op
817 26d3fd2f Michael Hanselmann
818 26d3fd2f Michael Hanselmann
    # Exhausted all retries and next round should not use blocking acquire
819 26d3fd2f Michael Hanselmann
    # for locks?
820 26d3fd2f Michael Hanselmann
    if (self._timeout_strategy.Peek() is None and
821 26d3fd2f Michael Hanselmann
        op.priority > constants.OP_PRIO_HIGHEST):
822 26d3fd2f Michael Hanselmann
      logging.debug("Increasing priority")
823 26d3fd2f Michael Hanselmann
      op.priority -= 1
824 26d3fd2f Michael Hanselmann
      self._ResetTimeoutStrategy()
825 26d3fd2f Michael Hanselmann
      return True
826 26d3fd2f Michael Hanselmann
827 26d3fd2f Michael Hanselmann
    return False
828 26d3fd2f Michael Hanselmann
829 26d3fd2f Michael Hanselmann
  def GetNextLockTimeout(self):
830 26d3fd2f Michael Hanselmann
    """Returns the next lock acquire timeout.
831 26d3fd2f Michael Hanselmann

832 26d3fd2f Michael Hanselmann
    """
833 26d3fd2f Michael Hanselmann
    return self._timeout_strategy.Next()
834 26d3fd2f Michael Hanselmann
835 b80cc518 Michael Hanselmann
836 be760ba8 Michael Hanselmann
class _JobProcessor(object):
837 26d3fd2f Michael Hanselmann
  def __init__(self, queue, opexec_fn, job,
838 26d3fd2f Michael Hanselmann
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
839 be760ba8 Michael Hanselmann
    """Initializes this class.
840 be760ba8 Michael Hanselmann

841 be760ba8 Michael Hanselmann
    """
842 be760ba8 Michael Hanselmann
    self.queue = queue
843 be760ba8 Michael Hanselmann
    self.opexec_fn = opexec_fn
844 be760ba8 Michael Hanselmann
    self.job = job
845 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = _timeout_strategy_factory
846 be760ba8 Michael Hanselmann
847 be760ba8 Michael Hanselmann
  @staticmethod
848 26d3fd2f Michael Hanselmann
  def _FindNextOpcode(job, timeout_strategy_factory):
849 be760ba8 Michael Hanselmann
    """Locates the next opcode to run.
850 be760ba8 Michael Hanselmann

851 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
852 be760ba8 Michael Hanselmann
    @param job: Job object
853 26d3fd2f Michael Hanselmann
    @param timeout_strategy_factory: Callable to create new timeout strategy
854 be760ba8 Michael Hanselmann

855 be760ba8 Michael Hanselmann
    """
856 be760ba8 Michael Hanselmann
    # Create some sort of a cache to speed up locating next opcode for future
857 be760ba8 Michael Hanselmann
    # lookups
858 be760ba8 Michael Hanselmann
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
859 be760ba8 Michael Hanselmann
    # pending and one for processed ops.
860 03b63608 Michael Hanselmann
    if job.ops_iter is None:
861 03b63608 Michael Hanselmann
      job.ops_iter = enumerate(job.ops)
862 be760ba8 Michael Hanselmann
863 be760ba8 Michael Hanselmann
    # Find next opcode to run
864 be760ba8 Michael Hanselmann
    while True:
865 be760ba8 Michael Hanselmann
      try:
866 03b63608 Michael Hanselmann
        (idx, op) = job.ops_iter.next()
867 be760ba8 Michael Hanselmann
      except StopIteration:
868 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for a finished job")
869 be760ba8 Michael Hanselmann
870 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_RUNNING:
871 be760ba8 Michael Hanselmann
        # Found an opcode already marked as running
872 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for job marked as running")
873 be760ba8 Michael Hanselmann
874 26d3fd2f Michael Hanselmann
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
875 26d3fd2f Michael Hanselmann
                             timeout_strategy_factory)
876 be760ba8 Michael Hanselmann
877 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELED:
878 be760ba8 Michael Hanselmann
        # Cancelled jobs are handled by the caller
879 be760ba8 Michael Hanselmann
        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
880 be760ba8 Michael Hanselmann
                              for i in job.ops[idx:])
881 be760ba8 Michael Hanselmann
882 be760ba8 Michael Hanselmann
      elif op.status in constants.OPS_FINALIZED:
883 be760ba8 Michael Hanselmann
        # This is a job that was partially completed before master daemon
884 be760ba8 Michael Hanselmann
        # shutdown, so it can be expected that some opcodes are already
885 be760ba8 Michael Hanselmann
        # completed successfully (if any did error out, then the whole job
886 be760ba8 Michael Hanselmann
        # should have been aborted and not resubmitted for processing).
887 be760ba8 Michael Hanselmann
        logging.info("%s: opcode %s already processed, skipping",
888 b80cc518 Michael Hanselmann
                     opctx.log_prefix, opctx.summary)
889 be760ba8 Michael Hanselmann
        continue
890 be760ba8 Michael Hanselmann
891 b80cc518 Michael Hanselmann
      return opctx
892 be760ba8 Michael Hanselmann
893 be760ba8 Michael Hanselmann
  @staticmethod
894 be760ba8 Michael Hanselmann
  def _MarkWaitlock(job, op):
895 be760ba8 Michael Hanselmann
    """Marks an opcode as waiting for locks.
896 be760ba8 Michael Hanselmann

897 be760ba8 Michael Hanselmann
    The job's start timestamp is also set if necessary.
898 be760ba8 Michael Hanselmann

899 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
900 be760ba8 Michael Hanselmann
    @param job: Job object
901 a38e8674 Michael Hanselmann
    @type op: L{_QueuedOpCode}
902 a38e8674 Michael Hanselmann
    @param op: Opcode object
903 be760ba8 Michael Hanselmann

904 be760ba8 Michael Hanselmann
    """
905 be760ba8 Michael Hanselmann
    assert op in job.ops
906 5fd6b694 Michael Hanselmann
    assert op.status in (constants.OP_STATUS_QUEUED,
907 5fd6b694 Michael Hanselmann
                         constants.OP_STATUS_WAITLOCK)
908 5fd6b694 Michael Hanselmann
909 5fd6b694 Michael Hanselmann
    update = False
910 be760ba8 Michael Hanselmann
911 be760ba8 Michael Hanselmann
    op.result = None
912 5fd6b694 Michael Hanselmann
913 5fd6b694 Michael Hanselmann
    if op.status == constants.OP_STATUS_QUEUED:
914 5fd6b694 Michael Hanselmann
      op.status = constants.OP_STATUS_WAITLOCK
915 5fd6b694 Michael Hanselmann
      update = True
916 5fd6b694 Michael Hanselmann
917 5fd6b694 Michael Hanselmann
    if op.start_timestamp is None:
918 5fd6b694 Michael Hanselmann
      op.start_timestamp = TimeStampNow()
919 5fd6b694 Michael Hanselmann
      update = True
920 be760ba8 Michael Hanselmann
921 be760ba8 Michael Hanselmann
    if job.start_timestamp is None:
922 be760ba8 Michael Hanselmann
      job.start_timestamp = op.start_timestamp
923 5fd6b694 Michael Hanselmann
      update = True
924 5fd6b694 Michael Hanselmann
925 5fd6b694 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITLOCK
926 5fd6b694 Michael Hanselmann
927 5fd6b694 Michael Hanselmann
    return update
928 be760ba8 Michael Hanselmann
929 b80cc518 Michael Hanselmann
  def _ExecOpCodeUnlocked(self, opctx):
930 be760ba8 Michael Hanselmann
    """Processes one opcode and returns the result.
931 be760ba8 Michael Hanselmann

932 be760ba8 Michael Hanselmann
    """
933 b80cc518 Michael Hanselmann
    op = opctx.op
934 b80cc518 Michael Hanselmann
935 be760ba8 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITLOCK
936 be760ba8 Michael Hanselmann
937 26d3fd2f Michael Hanselmann
    timeout = opctx.GetNextLockTimeout()
938 26d3fd2f Michael Hanselmann
939 be760ba8 Michael Hanselmann
    try:
940 be760ba8 Michael Hanselmann
      # Make sure not to hold queue lock while calling ExecOpCode
941 be760ba8 Michael Hanselmann
      result = self.opexec_fn(op.input,
942 26d3fd2f Michael Hanselmann
                              _OpExecCallbacks(self.queue, self.job, op),
943 f23db633 Michael Hanselmann
                              timeout=timeout, priority=op.priority)
944 26d3fd2f Michael Hanselmann
    except mcpu.LockAcquireTimeout:
945 26d3fd2f Michael Hanselmann
      assert timeout is not None, "Received timeout for blocking acquire"
946 26d3fd2f Michael Hanselmann
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
947 9e49dfc5 Michael Hanselmann
948 9e49dfc5 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_WAITLOCK,
949 9e49dfc5 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
950 9e49dfc5 Michael Hanselmann
951 9e49dfc5 Michael Hanselmann
      # Was job cancelled while we were waiting for the lock?
952 9e49dfc5 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELING:
953 9e49dfc5 Michael Hanselmann
        return (constants.OP_STATUS_CANCELING, None)
954 9e49dfc5 Michael Hanselmann
955 5fd6b694 Michael Hanselmann
      # Stay in waitlock while trying to re-acquire lock
956 5fd6b694 Michael Hanselmann
      return (constants.OP_STATUS_WAITLOCK, None)
957 be760ba8 Michael Hanselmann
    except CancelJob:
958 b80cc518 Michael Hanselmann
      logging.exception("%s: Canceling job", opctx.log_prefix)
959 be760ba8 Michael Hanselmann
      assert op.status == constants.OP_STATUS_CANCELING
960 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_CANCELING, None)
961 be760ba8 Michael Hanselmann
    except Exception, err: # pylint: disable-msg=W0703
962 b80cc518 Michael Hanselmann
      logging.exception("%s: Caught exception in %s",
963 b80cc518 Michael Hanselmann
                        opctx.log_prefix, opctx.summary)
964 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
965 be760ba8 Michael Hanselmann
    else:
966 b80cc518 Michael Hanselmann
      logging.debug("%s: %s successful",
967 b80cc518 Michael Hanselmann
                    opctx.log_prefix, opctx.summary)
968 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_SUCCESS, result)
969 be760ba8 Michael Hanselmann
970 26d3fd2f Michael Hanselmann
  def __call__(self, _nextop_fn=None):
971 be760ba8 Michael Hanselmann
    """Continues execution of a job.
972 be760ba8 Michael Hanselmann

973 26d3fd2f Michael Hanselmann
    @param _nextop_fn: Callback function for tests
974 be760ba8 Michael Hanselmann
    @rtype: bool
975 be760ba8 Michael Hanselmann
    @return: True if job is finished, False if processor needs to be called
976 be760ba8 Michael Hanselmann
             again
977 be760ba8 Michael Hanselmann

978 be760ba8 Michael Hanselmann
    """
979 be760ba8 Michael Hanselmann
    queue = self.queue
980 be760ba8 Michael Hanselmann
    job = self.job
981 be760ba8 Michael Hanselmann
982 be760ba8 Michael Hanselmann
    logging.debug("Processing job %s", job.id)
983 be760ba8 Michael Hanselmann
984 be760ba8 Michael Hanselmann
    queue.acquire(shared=1)
985 be760ba8 Michael Hanselmann
    try:
986 be760ba8 Michael Hanselmann
      opcount = len(job.ops)
987 be760ba8 Michael Hanselmann
988 26d3fd2f Michael Hanselmann
      # Is a previous opcode still pending?
989 26d3fd2f Michael Hanselmann
      if job.cur_opctx:
990 26d3fd2f Michael Hanselmann
        opctx = job.cur_opctx
991 5fd6b694 Michael Hanselmann
        job.cur_opctx = None
992 26d3fd2f Michael Hanselmann
      else:
993 26d3fd2f Michael Hanselmann
        if __debug__ and _nextop_fn:
994 26d3fd2f Michael Hanselmann
          _nextop_fn()
995 26d3fd2f Michael Hanselmann
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
996 26d3fd2f Michael Hanselmann
997 b80cc518 Michael Hanselmann
      op = opctx.op
998 be760ba8 Michael Hanselmann
999 be760ba8 Michael Hanselmann
      # Consistency check
1000 be760ba8 Michael Hanselmann
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1001 30c945d0 Michael Hanselmann
                                     constants.OP_STATUS_CANCELING,
1002 be760ba8 Michael Hanselmann
                                     constants.OP_STATUS_CANCELED)
1003 5fd6b694 Michael Hanselmann
                        for i in job.ops[opctx.index + 1:])
1004 be760ba8 Michael Hanselmann
1005 be760ba8 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_QUEUED,
1006 be760ba8 Michael Hanselmann
                           constants.OP_STATUS_WAITLOCK,
1007 30c945d0 Michael Hanselmann
                           constants.OP_STATUS_CANCELING,
1008 be760ba8 Michael Hanselmann
                           constants.OP_STATUS_CANCELED)
1009 be760ba8 Michael Hanselmann
1010 26d3fd2f Michael Hanselmann
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1011 26d3fd2f Michael Hanselmann
              op.priority >= constants.OP_PRIO_HIGHEST)
1012 26d3fd2f Michael Hanselmann
1013 30c945d0 Michael Hanselmann
      if op.status not in (constants.OP_STATUS_CANCELING,
1014 30c945d0 Michael Hanselmann
                           constants.OP_STATUS_CANCELED):
1015 30c945d0 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
1016 30c945d0 Michael Hanselmann
                             constants.OP_STATUS_WAITLOCK)
1017 30c945d0 Michael Hanselmann
1018 be760ba8 Michael Hanselmann
        # Prepare to start opcode
1019 5fd6b694 Michael Hanselmann
        if self._MarkWaitlock(job, op):
1020 5fd6b694 Michael Hanselmann
          # Write to disk
1021 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1022 be760ba8 Michael Hanselmann
1023 be760ba8 Michael Hanselmann
        assert op.status == constants.OP_STATUS_WAITLOCK
1024 be760ba8 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1025 5fd6b694 Michael Hanselmann
        assert job.start_timestamp and op.start_timestamp
1026 be760ba8 Michael Hanselmann
1027 b80cc518 Michael Hanselmann
        logging.info("%s: opcode %s waiting for locks",
1028 b80cc518 Michael Hanselmann
                     opctx.log_prefix, opctx.summary)
1029 be760ba8 Michael Hanselmann
1030 be760ba8 Michael Hanselmann
        queue.release()
1031 be760ba8 Michael Hanselmann
        try:
1032 b80cc518 Michael Hanselmann
          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1033 be760ba8 Michael Hanselmann
        finally:
1034 be760ba8 Michael Hanselmann
          queue.acquire(shared=1)
1035 be760ba8 Michael Hanselmann
1036 be760ba8 Michael Hanselmann
        op.status = op_status
1037 be760ba8 Michael Hanselmann
        op.result = op_result
1038 be760ba8 Michael Hanselmann
1039 5fd6b694 Michael Hanselmann
        if op.status == constants.OP_STATUS_WAITLOCK:
1040 26d3fd2f Michael Hanselmann
          # Couldn't get locks in time
1041 26d3fd2f Michael Hanselmann
          assert not op.end_timestamp
1042 be760ba8 Michael Hanselmann
        else:
1043 26d3fd2f Michael Hanselmann
          # Finalize opcode
1044 26d3fd2f Michael Hanselmann
          op.end_timestamp = TimeStampNow()
1045 be760ba8 Michael Hanselmann
1046 26d3fd2f Michael Hanselmann
          if op.status == constants.OP_STATUS_CANCELING:
1047 26d3fd2f Michael Hanselmann
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1048 26d3fd2f Michael Hanselmann
                                  for i in job.ops[opctx.index:])
1049 26d3fd2f Michael Hanselmann
          else:
1050 26d3fd2f Michael Hanselmann
            assert op.status in constants.OPS_FINALIZED
1051 be760ba8 Michael Hanselmann
1052 5fd6b694 Michael Hanselmann
      if op.status == constants.OP_STATUS_WAITLOCK:
1053 be760ba8 Michael Hanselmann
        finalize = False
1054 be760ba8 Michael Hanselmann
1055 5fd6b694 Michael Hanselmann
        if opctx.CheckPriorityIncrease():
1056 5fd6b694 Michael Hanselmann
          # Priority was changed, need to update on-disk file
1057 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1058 be760ba8 Michael Hanselmann
1059 26d3fd2f Michael Hanselmann
        # Keep around for another round
1060 26d3fd2f Michael Hanselmann
        job.cur_opctx = opctx
1061 be760ba8 Michael Hanselmann
1062 26d3fd2f Michael Hanselmann
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1063 26d3fd2f Michael Hanselmann
                op.priority >= constants.OP_PRIO_HIGHEST)
1064 be760ba8 Michael Hanselmann
1065 26d3fd2f Michael Hanselmann
        # In no case must the status be finalized here
1066 5fd6b694 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1067 be760ba8 Michael Hanselmann
1068 be760ba8 Michael Hanselmann
      else:
1069 26d3fd2f Michael Hanselmann
        # Ensure all opcodes so far have been successful
1070 26d3fd2f Michael Hanselmann
        assert (opctx.index == 0 or
1071 26d3fd2f Michael Hanselmann
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1072 26d3fd2f Michael Hanselmann
                           for i in job.ops[:opctx.index]))
1073 26d3fd2f Michael Hanselmann
1074 26d3fd2f Michael Hanselmann
        # Reset context
1075 26d3fd2f Michael Hanselmann
        job.cur_opctx = None
1076 26d3fd2f Michael Hanselmann
1077 26d3fd2f Michael Hanselmann
        if op.status == constants.OP_STATUS_SUCCESS:
1078 26d3fd2f Michael Hanselmann
          finalize = False
1079 26d3fd2f Michael Hanselmann
1080 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_ERROR:
1081 26d3fd2f Michael Hanselmann
          # Ensure failed opcode has an exception as its result
1082 26d3fd2f Michael Hanselmann
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1083 26d3fd2f Michael Hanselmann
1084 26d3fd2f Michael Hanselmann
          to_encode = errors.OpExecError("Preceding opcode failed")
1085 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1086 26d3fd2f Michael Hanselmann
                                _EncodeOpError(to_encode))
1087 26d3fd2f Michael Hanselmann
          finalize = True
1088 be760ba8 Michael Hanselmann
1089 26d3fd2f Michael Hanselmann
          # Consistency check
1090 26d3fd2f Michael Hanselmann
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1091 26d3fd2f Michael Hanselmann
                            errors.GetEncodedError(i.result)
1092 26d3fd2f Michael Hanselmann
                            for i in job.ops[opctx.index:])
1093 be760ba8 Michael Hanselmann
1094 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELING:
1095 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1096 26d3fd2f Michael Hanselmann
                                "Job canceled by request")
1097 26d3fd2f Michael Hanselmann
          finalize = True
1098 26d3fd2f Michael Hanselmann
1099 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELED:
1100 26d3fd2f Michael Hanselmann
          finalize = True
1101 26d3fd2f Michael Hanselmann
1102 26d3fd2f Michael Hanselmann
        else:
1103 26d3fd2f Michael Hanselmann
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1104 26d3fd2f Michael Hanselmann
1105 26d3fd2f Michael Hanselmann
        # Finalizing or last opcode?
1106 26d3fd2f Michael Hanselmann
        if finalize or opctx.index == (opcount - 1):
1107 26d3fd2f Michael Hanselmann
          # All opcodes have been run, finalize job
1108 26d3fd2f Michael Hanselmann
          job.end_timestamp = TimeStampNow()
1109 26d3fd2f Michael Hanselmann
1110 26d3fd2f Michael Hanselmann
        # Write to disk. If the job status is final, this is the final write
1111 26d3fd2f Michael Hanselmann
        # allowed. Once the file has been written, it can be archived anytime.
1112 26d3fd2f Michael Hanselmann
        queue.UpdateJobUnlocked(job)
1113 be760ba8 Michael Hanselmann
1114 26d3fd2f Michael Hanselmann
        if finalize or opctx.index == (opcount - 1):
1115 26d3fd2f Michael Hanselmann
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1116 26d3fd2f Michael Hanselmann
          return True
1117 be760ba8 Michael Hanselmann
1118 be760ba8 Michael Hanselmann
      return False
1119 be760ba8 Michael Hanselmann
    finally:
1120 be760ba8 Michael Hanselmann
      queue.release()
1121 be760ba8 Michael Hanselmann
1122 be760ba8 Michael Hanselmann
1123 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
1124 031a3e57 Michael Hanselmann
  """The actual job workers.
1125 031a3e57 Michael Hanselmann

1126 031a3e57 Michael Hanselmann
  """
1127 7260cfbe Iustin Pop
  def RunTask(self, job): # pylint: disable-msg=W0221
1128 e2715f69 Michael Hanselmann
    """Job executor.
1129 e2715f69 Michael Hanselmann

1130 be760ba8 Michael Hanselmann
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
1131 be760ba8 Michael Hanselmann
    L{_QueuedOpCode} classes.
1132 e2715f69 Michael Hanselmann

1133 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1134 ea03467c Iustin Pop
    @param job: the job to be processed
1135 ea03467c Iustin Pop

1136 e2715f69 Michael Hanselmann
    """
1137 be760ba8 Michael Hanselmann
    queue = job.queue
1138 be760ba8 Michael Hanselmann
    assert queue == self.pool.queue
1139 be760ba8 Michael Hanselmann
1140 0aeeb6e3 Michael Hanselmann
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1141 0aeeb6e3 Michael Hanselmann
    setname_fn(None)
1142 daba67c7 Michael Hanselmann
1143 be760ba8 Michael Hanselmann
    proc = mcpu.Processor(queue.context, job.id)
1144 be760ba8 Michael Hanselmann
1145 0aeeb6e3 Michael Hanselmann
    # Create wrapper for setting thread name
1146 0aeeb6e3 Michael Hanselmann
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1147 0aeeb6e3 Michael Hanselmann
                                    proc.ExecOpCode)
1148 0aeeb6e3 Michael Hanselmann
1149 0aeeb6e3 Michael Hanselmann
    if not _JobProcessor(queue, wrap_execop_fn, job)():
1150 be760ba8 Michael Hanselmann
      # Schedule again
1151 26d3fd2f Michael Hanselmann
      raise workerpool.DeferTask(priority=job.CalcPriority())
1152 e2715f69 Michael Hanselmann
1153 0aeeb6e3 Michael Hanselmann
  @staticmethod
1154 0aeeb6e3 Michael Hanselmann
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1155 0aeeb6e3 Michael Hanselmann
    """Updates the worker thread name to include a short summary of the opcode.
1156 0aeeb6e3 Michael Hanselmann

1157 0aeeb6e3 Michael Hanselmann
    @param setname_fn: Callable setting worker thread name
1158 0aeeb6e3 Michael Hanselmann
    @param execop_fn: Callable for executing opcode (usually
1159 0aeeb6e3 Michael Hanselmann
                      L{mcpu.Processor.ExecOpCode})
1160 0aeeb6e3 Michael Hanselmann

1161 0aeeb6e3 Michael Hanselmann
    """
1162 0aeeb6e3 Michael Hanselmann
    setname_fn(op)
1163 0aeeb6e3 Michael Hanselmann
    try:
1164 0aeeb6e3 Michael Hanselmann
      return execop_fn(op, *args, **kwargs)
1165 0aeeb6e3 Michael Hanselmann
    finally:
1166 0aeeb6e3 Michael Hanselmann
      setname_fn(None)
1167 0aeeb6e3 Michael Hanselmann
1168 0aeeb6e3 Michael Hanselmann
  @staticmethod
1169 0aeeb6e3 Michael Hanselmann
  def _GetWorkerName(job, op):
1170 0aeeb6e3 Michael Hanselmann
    """Sets the worker thread name.
1171 0aeeb6e3 Michael Hanselmann

1172 0aeeb6e3 Michael Hanselmann
    @type job: L{_QueuedJob}
1173 0aeeb6e3 Michael Hanselmann
    @type op: L{opcodes.OpCode}
1174 0aeeb6e3 Michael Hanselmann

1175 0aeeb6e3 Michael Hanselmann
    """
1176 0aeeb6e3 Michael Hanselmann
    parts = ["Job%s" % job.id]
1177 0aeeb6e3 Michael Hanselmann
1178 0aeeb6e3 Michael Hanselmann
    if op:
1179 0aeeb6e3 Michael Hanselmann
      parts.append(op.TinySummary())
1180 0aeeb6e3 Michael Hanselmann
1181 0aeeb6e3 Michael Hanselmann
    return "/".join(parts)
1182 0aeeb6e3 Michael Hanselmann
1183 e2715f69 Michael Hanselmann
1184 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
1185 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
1186 ea03467c Iustin Pop

1187 ea03467c Iustin Pop
  """
1188 5bdce580 Michael Hanselmann
  def __init__(self, queue):
1189 0aeeb6e3 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("Jq",
1190 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
1191 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
1192 5bdce580 Michael Hanselmann
    self.queue = queue
1193 e2715f69 Michael Hanselmann
1194 e2715f69 Michael Hanselmann
1195 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
1196 6c881c52 Iustin Pop
  """Decorator for "public" functions.
1197 ea03467c Iustin Pop

1198 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
1199 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
1200 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
1201 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
1202 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
1203 ea03467c Iustin Pop

1204 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
1205 f1da30e6 Michael Hanselmann

1206 6c881c52 Iustin Pop
  Example::
1207 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
1208 6c881c52 Iustin Pop
    @_RequireOpenQueue
1209 6c881c52 Iustin Pop
    def Example(self):
1210 6c881c52 Iustin Pop
      pass
1211 db37da70 Michael Hanselmann

1212 6c881c52 Iustin Pop
  """
1213 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
1214 7260cfbe Iustin Pop
    # pylint: disable-msg=W0212
1215 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
1216 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
1217 6c881c52 Iustin Pop
  return wrapper
1218 db37da70 Michael Hanselmann
1219 db37da70 Michael Hanselmann
1220 6c881c52 Iustin Pop
class JobQueue(object):
1221 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
1222 db37da70 Michael Hanselmann

1223 6c881c52 Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1224 6c881c52 Iustin Pop

1225 6c881c52 Iustin Pop
  """
1226 6c881c52 Iustin Pop
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1227 db37da70 Michael Hanselmann
1228 85f03e0d Michael Hanselmann
  def __init__(self, context):
1229 ea03467c Iustin Pop
    """Constructor for JobQueue.
1230 ea03467c Iustin Pop

1231 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
1232 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
1233 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
1234 ea03467c Iustin Pop
    running).
1235 ea03467c Iustin Pop

1236 ea03467c Iustin Pop
    @type context: GanetiContext
1237 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
1238 ea03467c Iustin Pop
        data and other ganeti objects
1239 ea03467c Iustin Pop

1240 ea03467c Iustin Pop
    """
1241 5bdce580 Michael Hanselmann
    self.context = context
1242 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
1243 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
1244 f1da30e6 Michael Hanselmann
1245 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
1246 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
1247 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
1248 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
1249 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
1250 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
1251 ebb80afa Guido Trotter
1252 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
1253 ebb80afa Guido Trotter
    self.release = self._lock.release
1254 85f03e0d Michael Hanselmann
1255 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
1256 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
1257 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1258 f1da30e6 Michael Hanselmann
1259 04ab05ce Michael Hanselmann
    # Read serial file
1260 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
1261 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
1262 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
1263 c4beba1c Iustin Pop
1264 23752136 Michael Hanselmann
    # Get initial list of nodes
1265 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
1266 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
1267 59303563 Iustin Pop
                       if n.master_candidate)
1268 8e00939c Michael Hanselmann
1269 8e00939c Michael Hanselmann
    # Remove master node
1270 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
1271 23752136 Michael Hanselmann
1272 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
1273 23752136 Michael Hanselmann
1274 20571a26 Guido Trotter
    self._queue_size = 0
1275 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1276 ff699aa9 Michael Hanselmann
    self._drained = jstore.CheckDrainFlag()
1277 20571a26 Guido Trotter
1278 85f03e0d Michael Hanselmann
    # Setup worker pool
1279 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
1280 85f03e0d Michael Hanselmann
    try:
1281 de9d02c7 Michael Hanselmann
      self._InspectQueue()
1282 de9d02c7 Michael Hanselmann
    except:
1283 de9d02c7 Michael Hanselmann
      self._wpool.TerminateWorkers()
1284 de9d02c7 Michael Hanselmann
      raise
1285 711b5124 Michael Hanselmann
1286 de9d02c7 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1287 de9d02c7 Michael Hanselmann
  @_RequireOpenQueue
1288 de9d02c7 Michael Hanselmann
  def _InspectQueue(self):
1289 de9d02c7 Michael Hanselmann
    """Loads the whole job queue and resumes unfinished jobs.
1290 de9d02c7 Michael Hanselmann

1291 de9d02c7 Michael Hanselmann
    This function needs the lock here because WorkerPool.AddTask() may start a
1292 de9d02c7 Michael Hanselmann
    job while we're still doing our work.
1293 711b5124 Michael Hanselmann

1294 de9d02c7 Michael Hanselmann
    """
1295 de9d02c7 Michael Hanselmann
    logging.info("Inspecting job queue")
1296 de9d02c7 Michael Hanselmann
1297 7b5c4a69 Michael Hanselmann
    restartjobs = []
1298 7b5c4a69 Michael Hanselmann
1299 de9d02c7 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked()
1300 de9d02c7 Michael Hanselmann
    jobs_count = len(all_job_ids)
1301 de9d02c7 Michael Hanselmann
    lastinfo = time.time()
1302 de9d02c7 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1303 de9d02c7 Michael Hanselmann
      # Give an update every 1000 jobs or 10 seconds
1304 de9d02c7 Michael Hanselmann
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1305 de9d02c7 Michael Hanselmann
          idx == (jobs_count - 1)):
1306 de9d02c7 Michael Hanselmann
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1307 de9d02c7 Michael Hanselmann
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1308 711b5124 Michael Hanselmann
        lastinfo = time.time()
1309 94ed59a5 Iustin Pop
1310 de9d02c7 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1311 85f03e0d Michael Hanselmann
1312 de9d02c7 Michael Hanselmann
      # a failure in loading the job can cause 'None' to be returned
1313 de9d02c7 Michael Hanselmann
      if job is None:
1314 de9d02c7 Michael Hanselmann
        continue
1315 85f03e0d Michael Hanselmann
1316 de9d02c7 Michael Hanselmann
      status = job.CalcStatus()
1317 711b5124 Michael Hanselmann
1318 320d1daf Michael Hanselmann
      if status == constants.JOB_STATUS_QUEUED:
1319 7b5c4a69 Michael Hanselmann
        restartjobs.append(job)
1320 de9d02c7 Michael Hanselmann
1321 de9d02c7 Michael Hanselmann
      elif status in (constants.JOB_STATUS_RUNNING,
1322 5ef699a0 Michael Hanselmann
                      constants.JOB_STATUS_WAITLOCK,
1323 de9d02c7 Michael Hanselmann
                      constants.JOB_STATUS_CANCELING):
1324 de9d02c7 Michael Hanselmann
        logging.warning("Unfinished job %s found: %s", job.id, job)
1325 320d1daf Michael Hanselmann
1326 320d1daf Michael Hanselmann
        if status == constants.JOB_STATUS_WAITLOCK:
1327 320d1daf Michael Hanselmann
          # Restart job
1328 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1329 320d1daf Michael Hanselmann
          restartjobs.append(job)
1330 320d1daf Michael Hanselmann
        else:
1331 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1332 320d1daf Michael Hanselmann
                                "Unclean master daemon shutdown")
1333 320d1daf Michael Hanselmann
1334 de9d02c7 Michael Hanselmann
        self.UpdateJobUnlocked(job)
1335 de9d02c7 Michael Hanselmann
1336 7b5c4a69 Michael Hanselmann
    if restartjobs:
1337 7b5c4a69 Michael Hanselmann
      logging.info("Restarting %s jobs", len(restartjobs))
1338 7b5c4a69 Michael Hanselmann
      self._EnqueueJobs(restartjobs)
1339 7b5c4a69 Michael Hanselmann
1340 de9d02c7 Michael Hanselmann
    logging.info("Job queue inspection finished")
1341 85f03e0d Michael Hanselmann
1342 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1343 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1344 99aabbed Iustin Pop
  def AddNode(self, node):
1345 99aabbed Iustin Pop
    """Register a new node with the queue.
1346 99aabbed Iustin Pop

1347 99aabbed Iustin Pop
    @type node: L{objects.Node}
1348 99aabbed Iustin Pop
    @param node: the node object to be added
1349 99aabbed Iustin Pop

1350 99aabbed Iustin Pop
    """
1351 99aabbed Iustin Pop
    node_name = node.name
1352 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1353 23752136 Michael Hanselmann
1354 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1355 c8457ce7 Iustin Pop
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1356 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1357 c8457ce7 Iustin Pop
    if msg:
1358 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1359 c8457ce7 Iustin Pop
                      node_name, msg)
1360 23752136 Michael Hanselmann
1361 59303563 Iustin Pop
    if not node.master_candidate:
1362 59303563 Iustin Pop
      # remove if existing, ignoring errors
1363 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1364 59303563 Iustin Pop
      # and skip the replication of the job ids
1365 59303563 Iustin Pop
      return
1366 59303563 Iustin Pop
1367 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1368 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1369 23752136 Michael Hanselmann
1370 d2e03a33 Michael Hanselmann
    # Upload current serial file
1371 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1372 d2e03a33 Michael Hanselmann
1373 d2e03a33 Michael Hanselmann
    for file_name in files:
1374 9f774ee8 Michael Hanselmann
      # Read file content
1375 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1376 9f774ee8 Michael Hanselmann
1377 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1378 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
1379 a3811745 Michael Hanselmann
                                                  file_name, content)
1380 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1381 c8457ce7 Iustin Pop
      if msg:
1382 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1383 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1384 d2e03a33 Michael Hanselmann
1385 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1386 d2e03a33 Michael Hanselmann
1387 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1388 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1389 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1390 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1391 ea03467c Iustin Pop

1392 ea03467c Iustin Pop
    @type node_name: str
1393 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1394 ea03467c Iustin Pop

1395 ea03467c Iustin Pop
    """
1396 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1397 23752136 Michael Hanselmann
1398 7e950d31 Iustin Pop
  @staticmethod
1399 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1400 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1401 ea03467c Iustin Pop

1402 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1403 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1404 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1405 ea03467c Iustin Pop

1406 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1407 ea03467c Iustin Pop
    @type nodes: list
1408 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1409 ea03467c Iustin Pop
    @type failmsg: str
1410 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1411 ea03467c Iustin Pop

1412 ea03467c Iustin Pop
    """
1413 e74798c1 Michael Hanselmann
    failed = []
1414 e74798c1 Michael Hanselmann
    success = []
1415 e74798c1 Michael Hanselmann
1416 e74798c1 Michael Hanselmann
    for node in nodes:
1417 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1418 c8457ce7 Iustin Pop
      if msg:
1419 e74798c1 Michael Hanselmann
        failed.append(node)
1420 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1421 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1422 c8457ce7 Iustin Pop
      else:
1423 c8457ce7 Iustin Pop
        success.append(node)
1424 e74798c1 Michael Hanselmann
1425 e74798c1 Michael Hanselmann
    # +1 for the master node
1426 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1427 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1428 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1429 e74798c1 Michael Hanselmann
1430 99aabbed Iustin Pop
  def _GetNodeIp(self):
1431 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1432 99aabbed Iustin Pop

1433 ea03467c Iustin Pop
    @rtype: (list, list)
1434 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1435 ea03467c Iustin Pop
        names and the second one with the node addresses
1436 ea03467c Iustin Pop

1437 99aabbed Iustin Pop
    """
1438 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1439 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1440 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1441 99aabbed Iustin Pop
    return name_list, addr_list
1442 99aabbed Iustin Pop
1443 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1444 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1445 8e00939c Michael Hanselmann

1446 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1447 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1448 ea03467c Iustin Pop

1449 ea03467c Iustin Pop
    @type file_name: str
1450 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1451 ea03467c Iustin Pop
    @type data: str
1452 ea03467c Iustin Pop
    @param data: the new contents of the file
1453 4c36bdf5 Guido Trotter
    @type replicate: boolean
1454 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1455 ea03467c Iustin Pop

1456 8e00939c Michael Hanselmann
    """
1457 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1458 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1459 82b22e19 René Nussbaumer
                    gid=getents.masterd_gid)
1460 8e00939c Michael Hanselmann
1461 4c36bdf5 Guido Trotter
    if replicate:
1462 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1463 4c36bdf5 Guido Trotter
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1464 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1465 23752136 Michael Hanselmann
1466 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1467 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1468 ea03467c Iustin Pop

1469 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1470 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1471 ea03467c Iustin Pop

1472 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1473 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1474 ea03467c Iustin Pop

1475 ea03467c Iustin Pop
    """
1476 dd875d32 Michael Hanselmann
    # Rename them locally
1477 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1478 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1479 abc1f2ce Michael Hanselmann
1480 dd875d32 Michael Hanselmann
    # ... and on all nodes
1481 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1482 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1483 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1484 abc1f2ce Michael Hanselmann
1485 7e950d31 Iustin Pop
  @staticmethod
1486 7e950d31 Iustin Pop
  def _FormatJobID(job_id):
1487 ea03467c Iustin Pop
    """Convert a job ID to string format.
1488 ea03467c Iustin Pop

1489 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
1490 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
1491 ea03467c Iustin Pop
    abstract this change.
1492 ea03467c Iustin Pop

1493 ea03467c Iustin Pop
    @type job_id: int or long
1494 ea03467c Iustin Pop
    @param job_id: the numeric job id
1495 ea03467c Iustin Pop
    @rtype: str
1496 ea03467c Iustin Pop
    @return: the formatted job id
1497 ea03467c Iustin Pop

1498 ea03467c Iustin Pop
    """
1499 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
1500 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1501 85f03e0d Michael Hanselmann
    if job_id < 0:
1502 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1503 85f03e0d Michael Hanselmann
1504 85f03e0d Michael Hanselmann
    return str(job_id)
1505 85f03e0d Michael Hanselmann
1506 58b22b6e Michael Hanselmann
  @classmethod
1507 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
1508 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
1509 58b22b6e Michael Hanselmann

1510 58b22b6e Michael Hanselmann
    @type job_id: str
1511 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
1512 58b22b6e Michael Hanselmann
    @rtype: str
1513 58b22b6e Michael Hanselmann
    @return: Directory name
1514 58b22b6e Michael Hanselmann

1515 58b22b6e Michael Hanselmann
    """
1516 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1517 58b22b6e Michael Hanselmann
1518 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1519 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1520 f1da30e6 Michael Hanselmann

1521 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1522 f1da30e6 Michael Hanselmann

1523 009e73d0 Iustin Pop
    @type count: integer
1524 009e73d0 Iustin Pop
    @param count: how many serials to return
1525 ea03467c Iustin Pop
    @rtype: str
1526 ea03467c Iustin Pop
    @return: a string representing the job identifier.
1527 f1da30e6 Michael Hanselmann

1528 f1da30e6 Michael Hanselmann
    """
1529 009e73d0 Iustin Pop
    assert count > 0
1530 f1da30e6 Michael Hanselmann
    # New number
1531 009e73d0 Iustin Pop
    serial = self._last_serial + count
1532 f1da30e6 Michael Hanselmann
1533 f1da30e6 Michael Hanselmann
    # Write to file
1534 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1535 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1536 f1da30e6 Michael Hanselmann
1537 009e73d0 Iustin Pop
    result = [self._FormatJobID(v)
1538 009e73d0 Iustin Pop
              for v in range(self._last_serial, serial + 1)]
1539 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1540 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1541 f1da30e6 Michael Hanselmann
1542 009e73d0 Iustin Pop
    return result
1543 f1da30e6 Michael Hanselmann
1544 85f03e0d Michael Hanselmann
  @staticmethod
1545 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1546 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1547 ea03467c Iustin Pop

1548 ea03467c Iustin Pop
    @type job_id: str
1549 ea03467c Iustin Pop
    @param job_id: the job identifier
1550 ea03467c Iustin Pop
    @rtype: str
1551 ea03467c Iustin Pop
    @return: the path to the job file
1552 ea03467c Iustin Pop

1553 ea03467c Iustin Pop
    """
1554 c4feafe8 Iustin Pop
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1555 f1da30e6 Michael Hanselmann
1556 58b22b6e Michael Hanselmann
  @classmethod
1557 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
1558 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1559 ea03467c Iustin Pop

1560 ea03467c Iustin Pop
    @type job_id: str
1561 ea03467c Iustin Pop
    @param job_id: the job identifier
1562 ea03467c Iustin Pop
    @rtype: str
1563 ea03467c Iustin Pop
    @return: the path to the archived job file
1564 ea03467c Iustin Pop

1565 ea03467c Iustin Pop
    """
1566 0411c011 Iustin Pop
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1567 0411c011 Iustin Pop
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1568 0cb94105 Michael Hanselmann
1569 85a1c57d Guido Trotter
  def _GetJobIDsUnlocked(self, sort=True):
1570 911a495b Iustin Pop
    """Return all known job IDs.
1571 911a495b Iustin Pop

1572 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1573 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1574 ac0930b9 Iustin Pop
    extra IDs).
1575 ac0930b9 Iustin Pop

1576 85a1c57d Guido Trotter
    @type sort: boolean
1577 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1578 ea03467c Iustin Pop
    @rtype: list
1579 ea03467c Iustin Pop
    @return: the list of job IDs
1580 ea03467c Iustin Pop

1581 911a495b Iustin Pop
    """
1582 85a1c57d Guido Trotter
    jlist = []
1583 b5b8309d Guido Trotter
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1584 85a1c57d Guido Trotter
      m = self._RE_JOB_FILE.match(filename)
1585 85a1c57d Guido Trotter
      if m:
1586 85a1c57d Guido Trotter
        jlist.append(m.group(1))
1587 85a1c57d Guido Trotter
    if sort:
1588 85a1c57d Guido Trotter
      jlist = utils.NiceSort(jlist)
1589 f0d874fe Iustin Pop
    return jlist
1590 911a495b Iustin Pop
1591 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1592 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1593 ea03467c Iustin Pop

1594 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1595 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1596 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1597 ea03467c Iustin Pop

1598 ea03467c Iustin Pop
    @param job_id: the job id
1599 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1600 ea03467c Iustin Pop
    @return: either None or the job object
1601 ea03467c Iustin Pop

1602 ea03467c Iustin Pop
    """
1603 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
1604 5685c1a5 Michael Hanselmann
    if job:
1605 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
1606 5685c1a5 Michael Hanselmann
      return job
1607 ac0930b9 Iustin Pop
1608 3d6c5566 Guido Trotter
    try:
1609 3d6c5566 Guido Trotter
      job = self._LoadJobFromDisk(job_id)
1610 aa9f8167 Iustin Pop
      if job is None:
1611 aa9f8167 Iustin Pop
        return job
1612 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
1613 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
1614 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
1615 3d6c5566 Guido Trotter
      if old_path == new_path:
1616 3d6c5566 Guido Trotter
        # job already archived (future case)
1617 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
1618 3d6c5566 Guido Trotter
      else:
1619 3d6c5566 Guido Trotter
        # non-archived case
1620 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
1621 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
1622 3d6c5566 Guido Trotter
      return None
1623 162c8636 Guido Trotter
1624 162c8636 Guido Trotter
    self._memcache[job_id] = job
1625 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
1626 162c8636 Guido Trotter
    return job
1627 162c8636 Guido Trotter
1628 162c8636 Guido Trotter
  def _LoadJobFromDisk(self, job_id):
1629 162c8636 Guido Trotter
    """Load the given job file from disk.
1630 162c8636 Guido Trotter

1631 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1632 162c8636 Guido Trotter

1633 162c8636 Guido Trotter
    @type job_id: string
1634 162c8636 Guido Trotter
    @param job_id: job identifier
1635 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
1636 162c8636 Guido Trotter
    @return: either None or the job object
1637 162c8636 Guido Trotter

1638 162c8636 Guido Trotter
    """
1639 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
1640 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
1641 f1da30e6 Michael Hanselmann
    try:
1642 13998ef2 Michael Hanselmann
      raw_data = utils.ReadFile(filepath)
1643 162c8636 Guido Trotter
    except EnvironmentError, err:
1644 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
1645 f1da30e6 Michael Hanselmann
        return None
1646 f1da30e6 Michael Hanselmann
      raise
1647 13998ef2 Michael Hanselmann
1648 94ed59a5 Iustin Pop
    try:
1649 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
1650 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
1651 7260cfbe Iustin Pop
    except Exception, err: # pylint: disable-msg=W0703
1652 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
1653 94ed59a5 Iustin Pop
1654 ac0930b9 Iustin Pop
    return job
1655 f1da30e6 Michael Hanselmann
1656 0f9c08dc Guido Trotter
  def SafeLoadJobFromDisk(self, job_id):
1657 0f9c08dc Guido Trotter
    """Load the given job file from disk.
1658 0f9c08dc Guido Trotter

1659 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1660 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
1661 0f9c08dc Guido Trotter
    exception is logged.
1662 0f9c08dc Guido Trotter

1663 0f9c08dc Guido Trotter
    @type job_id: string
1664 0f9c08dc Guido Trotter
    @param job_id: job identifier
1665 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
1666 0f9c08dc Guido Trotter
    @return: either None or the job object
1667 0f9c08dc Guido Trotter

1668 0f9c08dc Guido Trotter
    """
1669 0f9c08dc Guido Trotter
    try:
1670 0f9c08dc Guido Trotter
      return self._LoadJobFromDisk(job_id)
1671 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
1672 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
1673 0f9c08dc Guido Trotter
      return None
1674 0f9c08dc Guido Trotter
1675 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
1676 20571a26 Guido Trotter
    """Update the queue size.
1677 20571a26 Guido Trotter

1678 20571a26 Guido Trotter
    """
1679 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1680 20571a26 Guido Trotter
1681 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1682 20571a26 Guido Trotter
  @_RequireOpenQueue
1683 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
1684 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
1685 3ccafd0e Iustin Pop

1686 ea03467c Iustin Pop
    @type drain_flag: boolean
1687 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
1688 ea03467c Iustin Pop

1689 3ccafd0e Iustin Pop
    """
1690 ff699aa9 Michael Hanselmann
    jstore.SetDrainFlag(drain_flag)
1691 20571a26 Guido Trotter
1692 20571a26 Guido Trotter
    self._drained = drain_flag
1693 20571a26 Guido Trotter
1694 3ccafd0e Iustin Pop
    return True
1695 3ccafd0e Iustin Pop
1696 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1697 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
1698 85f03e0d Michael Hanselmann
    """Create and store a new job.
1699 f1da30e6 Michael Hanselmann

1700 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
1701 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
1702 c3f0a12f Iustin Pop

1703 009e73d0 Iustin Pop
    @type job_id: job ID
1704 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
1705 c3f0a12f Iustin Pop
    @type ops: list
1706 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
1707 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
1708 7beb1e53 Guido Trotter
    @return: the job object to be queued
1709 7beb1e53 Guido Trotter
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1710 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1711 e71c8147 Michael Hanselmann
    @raise errors.GenericError: If an opcode is not valid
1712 c3f0a12f Iustin Pop

1713 c3f0a12f Iustin Pop
    """
1714 20571a26 Guido Trotter
    # Ok when sharing the big job queue lock, as the drain file is created when
1715 20571a26 Guido Trotter
    # the lock is exclusive.
1716 20571a26 Guido Trotter
    if self._drained:
1717 2971c913 Iustin Pop
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1718 f87b405e Michael Hanselmann
1719 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1720 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
1721 f87b405e Michael Hanselmann
1722 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
1723 f1da30e6 Michael Hanselmann
1724 e71c8147 Michael Hanselmann
    # Check priority
1725 e71c8147 Michael Hanselmann
    for idx, op in enumerate(job.ops):
1726 e71c8147 Michael Hanselmann
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1727 e71c8147 Michael Hanselmann
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1728 e71c8147 Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1729 e71c8147 Michael Hanselmann
                                  " are %s" % (idx, op.priority, allowed))
1730 e71c8147 Michael Hanselmann
1731 f1da30e6 Michael Hanselmann
    # Write to disk
1732 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
1733 f1da30e6 Michael Hanselmann
1734 20571a26 Guido Trotter
    self._queue_size += 1
1735 20571a26 Guido Trotter
1736 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
1737 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
1738 ac0930b9 Iustin Pop
1739 7beb1e53 Guido Trotter
    return job
1740 f1da30e6 Michael Hanselmann
1741 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1742 2971c913 Iustin Pop
  @_RequireOpenQueue
1743 2971c913 Iustin Pop
  def SubmitJob(self, ops):
1744 2971c913 Iustin Pop
    """Create and store a new job.
1745 2971c913 Iustin Pop

1746 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1747 2971c913 Iustin Pop

1748 2971c913 Iustin Pop
    """
1749 009e73d0 Iustin Pop
    job_id = self._NewSerialsUnlocked(1)[0]
1750 7b5c4a69 Michael Hanselmann
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1751 7beb1e53 Guido Trotter
    return job_id
1752 2971c913 Iustin Pop
1753 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1754 2971c913 Iustin Pop
  @_RequireOpenQueue
1755 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
1756 2971c913 Iustin Pop
    """Create and store multiple jobs.
1757 2971c913 Iustin Pop

1758 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1759 2971c913 Iustin Pop

1760 2971c913 Iustin Pop
    """
1761 2971c913 Iustin Pop
    results = []
1762 7b5c4a69 Michael Hanselmann
    added_jobs = []
1763 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1764 009e73d0 Iustin Pop
    for job_id, ops in zip(all_job_ids, jobs):
1765 2971c913 Iustin Pop
      try:
1766 7b5c4a69 Michael Hanselmann
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1767 2971c913 Iustin Pop
        status = True
1768 7beb1e53 Guido Trotter
        data = job_id
1769 2971c913 Iustin Pop
      except errors.GenericError, err:
1770 98ed5092 Michael Hanselmann
        data = ("%s; opcodes %s" %
1771 98ed5092 Michael Hanselmann
                (err, utils.CommaJoin(op.Summary() for op in ops)))
1772 2971c913 Iustin Pop
        status = False
1773 2971c913 Iustin Pop
      results.append((status, data))
1774 7b5c4a69 Michael Hanselmann
1775 7b5c4a69 Michael Hanselmann
    self._EnqueueJobs(added_jobs)
1776 2971c913 Iustin Pop
1777 2971c913 Iustin Pop
    return results
1778 2971c913 Iustin Pop
1779 7b5c4a69 Michael Hanselmann
  def _EnqueueJobs(self, jobs):
1780 7b5c4a69 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
1781 7b5c4a69 Michael Hanselmann

1782 7b5c4a69 Michael Hanselmann
    @type jobs: list
1783 7b5c4a69 Michael Hanselmann
    @param jobs: List of all jobs
1784 7b5c4a69 Michael Hanselmann

1785 7b5c4a69 Michael Hanselmann
    """
1786 7b5c4a69 Michael Hanselmann
    self._wpool.AddManyTasks([(job, ) for job in jobs],
1787 7b5c4a69 Michael Hanselmann
                             priority=[job.CalcPriority() for job in jobs])
1788 7b5c4a69 Michael Hanselmann
1789 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1790 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
1791 ea03467c Iustin Pop
    """Update a job's on disk storage.
1792 ea03467c Iustin Pop

1793 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
1794 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
1795 ea03467c Iustin Pop
    nodes.
1796 ea03467c Iustin Pop

1797 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1798 ea03467c Iustin Pop
    @param job: the changed job
1799 4c36bdf5 Guido Trotter
    @type replicate: boolean
1800 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
1801 ea03467c Iustin Pop

1802 ea03467c Iustin Pop
    """
1803 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
1804 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
1805 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
1806 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
1807 ac0930b9 Iustin Pop
1808 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1809 5c735209 Iustin Pop
                        timeout):
1810 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
1811 6c5a7090 Michael Hanselmann

1812 6c5a7090 Michael Hanselmann
    @type job_id: string
1813 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
1814 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1815 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1816 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1817 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1818 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1819 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1820 5c735209 Iustin Pop
    @type timeout: float
1821 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
1822 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1823 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1824 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1825 ea03467c Iustin Pop

1826 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1827 ea03467c Iustin Pop
        we instead return a special value,
1828 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1829 ea03467c Iustin Pop
        as such by the clients
1830 6c5a7090 Michael Hanselmann

1831 6c5a7090 Michael Hanselmann
    """
1832 989a8bee Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1833 989a8bee Michael Hanselmann
1834 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
1835 989a8bee Michael Hanselmann
1836 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
1837 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
1838 dfe57c22 Michael Hanselmann
1839 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1840 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1841 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1842 188c5e0a Michael Hanselmann
    """Cancels a job.
1843 188c5e0a Michael Hanselmann

1844 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1845 ea03467c Iustin Pop

1846 188c5e0a Michael Hanselmann
    @type job_id: string
1847 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1848 188c5e0a Michael Hanselmann

1849 188c5e0a Michael Hanselmann
    """
1850 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1851 188c5e0a Michael Hanselmann
1852 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1853 188c5e0a Michael Hanselmann
    if not job:
1854 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1855 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1856 fbf0262f Michael Hanselmann
1857 099b2870 Michael Hanselmann
    (success, msg) = job.Cancel()
1858 188c5e0a Michael Hanselmann
1859 099b2870 Michael Hanselmann
    if success:
1860 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
1861 fbf0262f Michael Hanselmann
1862 099b2870 Michael Hanselmann
    return (success, msg)
1863 fbf0262f Michael Hanselmann
1864 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1865 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
1866 d7fd1f28 Michael Hanselmann
    """Archives jobs.
1867 c609f802 Michael Hanselmann

1868 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
1869 25e7b43f Iustin Pop
    @param jobs: Job objects
1870 d7fd1f28 Michael Hanselmann
    @rtype: int
1871 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
1872 c609f802 Michael Hanselmann

1873 c609f802 Michael Hanselmann
    """
1874 d7fd1f28 Michael Hanselmann
    archive_jobs = []
1875 d7fd1f28 Michael Hanselmann
    rename_files = []
1876 d7fd1f28 Michael Hanselmann
    for job in jobs:
1877 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1878 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
1879 d7fd1f28 Michael Hanselmann
        continue
1880 c609f802 Michael Hanselmann
1881 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
1882 c609f802 Michael Hanselmann
1883 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
1884 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
1885 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
1886 c609f802 Michael Hanselmann
1887 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
1888 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
1889 f1da30e6 Michael Hanselmann
1890 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
1891 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
1892 d7fd1f28 Michael Hanselmann
1893 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1894 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
1895 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
1896 20571a26 Guido Trotter
    # archived jobs to fix this.
1897 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1898 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
1899 78d12585 Michael Hanselmann
1900 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1901 07cd723a Iustin Pop
  @_RequireOpenQueue
1902 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1903 07cd723a Iustin Pop
    """Archives a job.
1904 07cd723a Iustin Pop

1905 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1906 ea03467c Iustin Pop

1907 07cd723a Iustin Pop
    @type job_id: string
1908 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1909 78d12585 Michael Hanselmann
    @rtype: bool
1910 78d12585 Michael Hanselmann
    @return: Whether job was archived
1911 07cd723a Iustin Pop

1912 07cd723a Iustin Pop
    """
1913 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
1914 78d12585 Michael Hanselmann
1915 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1916 78d12585 Michael Hanselmann
    if not job:
1917 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1918 78d12585 Michael Hanselmann
      return False
1919 78d12585 Michael Hanselmann
1920 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
1921 07cd723a Iustin Pop
1922 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1923 07cd723a Iustin Pop
  @_RequireOpenQueue
1924 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
1925 07cd723a Iustin Pop
    """Archives all jobs based on age.
1926 07cd723a Iustin Pop

1927 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1928 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1929 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1930 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1931 07cd723a Iustin Pop

1932 07cd723a Iustin Pop
    @type age: int
1933 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1934 07cd723a Iustin Pop

1935 07cd723a Iustin Pop
    """
1936 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1937 07cd723a Iustin Pop
1938 07cd723a Iustin Pop
    now = time.time()
1939 f8ad5591 Michael Hanselmann
    end_time = now + timeout
1940 f8ad5591 Michael Hanselmann
    archived_count = 0
1941 f8ad5591 Michael Hanselmann
    last_touched = 0
1942 f8ad5591 Michael Hanselmann
1943 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
1944 d7fd1f28 Michael Hanselmann
    pending = []
1945 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1946 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
1947 f8ad5591 Michael Hanselmann
1948 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
1949 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
1950 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
1951 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
1952 f8ad5591 Michael Hanselmann
        break
1953 f8ad5591 Michael Hanselmann
1954 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
1955 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1956 f8ad5591 Michael Hanselmann
      if job:
1957 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
1958 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
1959 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
1960 f8ad5591 Michael Hanselmann
          else:
1961 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
1962 07cd723a Iustin Pop
        else:
1963 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
1964 f8ad5591 Michael Hanselmann
1965 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
1966 d7fd1f28 Michael Hanselmann
          pending.append(job)
1967 d7fd1f28 Michael Hanselmann
1968 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
1969 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
1970 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
1971 d7fd1f28 Michael Hanselmann
            pending = []
1972 f8ad5591 Michael Hanselmann
1973 d7fd1f28 Michael Hanselmann
    if pending:
1974 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
1975 07cd723a Iustin Pop
1976 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
1977 07cd723a Iustin Pop
1978 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1979 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1980 e2715f69 Michael Hanselmann

1981 ea03467c Iustin Pop
    @type job_ids: list
1982 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1983 ea03467c Iustin Pop
    @type fields: list
1984 ea03467c Iustin Pop
    @param fields: names of fields to return
1985 ea03467c Iustin Pop
    @rtype: list
1986 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1987 ea03467c Iustin Pop
        the requested fields
1988 e2715f69 Michael Hanselmann

1989 e2715f69 Michael Hanselmann
    """
1990 85f03e0d Michael Hanselmann
    jobs = []
1991 9f7b4967 Guido Trotter
    list_all = False
1992 9f7b4967 Guido Trotter
    if not job_ids:
1993 9f7b4967 Guido Trotter
      # Since files are added to/removed from the queue atomically, there's no
1994 9f7b4967 Guido Trotter
      # risk of getting the job ids in an inconsistent state.
1995 9f7b4967 Guido Trotter
      job_ids = self._GetJobIDsUnlocked()
1996 9f7b4967 Guido Trotter
      list_all = True
1997 e2715f69 Michael Hanselmann
1998 9f7b4967 Guido Trotter
    for job_id in job_ids:
1999 9f7b4967 Guido Trotter
      job = self.SafeLoadJobFromDisk(job_id)
2000 9f7b4967 Guido Trotter
      if job is not None:
2001 6a290889 Guido Trotter
        jobs.append(job.GetInfo(fields))
2002 9f7b4967 Guido Trotter
      elif not list_all:
2003 9f7b4967 Guido Trotter
        jobs.append(None)
2004 e2715f69 Michael Hanselmann
2005 85f03e0d Michael Hanselmann
    return jobs
2006 e2715f69 Michael Hanselmann
2007 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
2008 db37da70 Michael Hanselmann
  @_RequireOpenQueue
2009 e2715f69 Michael Hanselmann
  def Shutdown(self):
2010 e2715f69 Michael Hanselmann
    """Stops the job queue.
2011 e2715f69 Michael Hanselmann

2012 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
2013 ea03467c Iustin Pop

2014 e2715f69 Michael Hanselmann
    """
2015 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
2016 85f03e0d Michael Hanselmann
2017 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
2018 a71f9c7d Guido Trotter
    self._queue_filelock = None