Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 6b9b18a2

History | View | Annotate | Download (58.5 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 7f93570a Iustin Pop
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 f1da30e6 Michael Hanselmann
import errno
35 f1da30e6 Michael Hanselmann
import re
36 f1048938 Iustin Pop
import time
37 5685c1a5 Michael Hanselmann
import weakref
38 498ae1cc Iustin Pop
39 6c2549d6 Guido Trotter
try:
40 6c2549d6 Guido Trotter
  # pylint: disable-msg=E0611
41 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
42 6c2549d6 Guido Trotter
except ImportError:
43 6c2549d6 Guido Trotter
  import pyinotify
44 6c2549d6 Guido Trotter
45 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
46 e2715f69 Michael Hanselmann
from ganeti import constants
47 f1da30e6 Michael Hanselmann
from ganeti import serializer
48 e2715f69 Michael Hanselmann
from ganeti import workerpool
49 99bd4f0a Guido Trotter
from ganeti import locking
50 f1da30e6 Michael Hanselmann
from ganeti import opcodes
51 7a1ecaed Iustin Pop
from ganeti import errors
52 e2715f69 Michael Hanselmann
from ganeti import mcpu
53 7996a135 Iustin Pop
from ganeti import utils
54 04ab05ce Michael Hanselmann
from ganeti import jstore
55 c3f0a12f Iustin Pop
from ganeti import rpc
56 82b22e19 René Nussbaumer
from ganeti import runtime
57 a744b676 Manuel Franceschini
from ganeti import netutils
58 989a8bee Michael Hanselmann
from ganeti import compat
59 e2715f69 Michael Hanselmann
60 fbf0262f Michael Hanselmann
61 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
62 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
63 e2715f69 Michael Hanselmann
64 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
65 ebb80afa Guido Trotter
_LOCK = "_lock"
66 ebb80afa Guido Trotter
_QUEUE = "_queue"
67 99bd4f0a Guido Trotter
68 498ae1cc Iustin Pop
69 9728ae5d Iustin Pop
class CancelJob(Exception):
70 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
71 fbf0262f Michael Hanselmann

72 fbf0262f Michael Hanselmann
  """
73 fbf0262f Michael Hanselmann
74 fbf0262f Michael Hanselmann
75 70552c46 Michael Hanselmann
def TimeStampNow():
76 ea03467c Iustin Pop
  """Returns the current timestamp.
77 ea03467c Iustin Pop

78 ea03467c Iustin Pop
  @rtype: tuple
79 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
80 ea03467c Iustin Pop

81 ea03467c Iustin Pop
  """
82 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
83 70552c46 Michael Hanselmann
84 70552c46 Michael Hanselmann
85 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
86 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
87 e2715f69 Michael Hanselmann

88 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
89 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
90 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
91 ea03467c Iustin Pop
  @ivar status: the current status
92 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
93 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
94 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
96 f1048938 Iustin Pop

97 e2715f69 Michael Hanselmann
  """
98 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
99 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
100 66d895a8 Iustin Pop
               "__weakref__"]
101 66d895a8 Iustin Pop
102 85f03e0d Michael Hanselmann
  def __init__(self, op):
103 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
104 ea03467c Iustin Pop

105 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
106 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
107 ea03467c Iustin Pop

108 ea03467c Iustin Pop
    """
109 85f03e0d Michael Hanselmann
    self.input = op
110 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
111 85f03e0d Michael Hanselmann
    self.result = None
112 85f03e0d Michael Hanselmann
    self.log = []
113 70552c46 Michael Hanselmann
    self.start_timestamp = None
114 b9b5abcb Iustin Pop
    self.exec_timestamp = None
115 70552c46 Michael Hanselmann
    self.end_timestamp = None
116 f1da30e6 Michael Hanselmann
117 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
118 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119 8f5c488d Michael Hanselmann
120 f1da30e6 Michael Hanselmann
  @classmethod
121 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
122 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
123 ea03467c Iustin Pop

124 ea03467c Iustin Pop
    @type state: dict
125 ea03467c Iustin Pop
    @param state: the serialized state
126 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
127 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
128 ea03467c Iustin Pop

129 ea03467c Iustin Pop
    """
130 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
131 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132 85f03e0d Michael Hanselmann
    obj.status = state["status"]
133 85f03e0d Michael Hanselmann
    obj.result = state["result"]
134 85f03e0d Michael Hanselmann
    obj.log = state["log"]
135 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
136 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
137 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
138 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139 f1da30e6 Michael Hanselmann
    return obj
140 f1da30e6 Michael Hanselmann
141 f1da30e6 Michael Hanselmann
  def Serialize(self):
142 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
143 ea03467c Iustin Pop

144 ea03467c Iustin Pop
    @rtype: dict
145 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
146 ea03467c Iustin Pop

147 ea03467c Iustin Pop
    """
148 6c5a7090 Michael Hanselmann
    return {
149 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
150 6c5a7090 Michael Hanselmann
      "status": self.status,
151 6c5a7090 Michael Hanselmann
      "result": self.result,
152 6c5a7090 Michael Hanselmann
      "log": self.log,
153 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
154 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
155 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
156 8f5c488d Michael Hanselmann
      "priority": self.priority,
157 6c5a7090 Michael Hanselmann
      }
158 f1048938 Iustin Pop
159 e2715f69 Michael Hanselmann
160 e2715f69 Michael Hanselmann
class _QueuedJob(object):
161 e2715f69 Michael Hanselmann
  """In-memory job representation.
162 e2715f69 Michael Hanselmann

163 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
164 ea03467c Iustin Pop
  be taken care of by users of this class.
165 ea03467c Iustin Pop

166 ea03467c Iustin Pop
  @type queue: L{JobQueue}
167 ea03467c Iustin Pop
  @ivar queue: the parent queue
168 ea03467c Iustin Pop
  @ivar id: the job ID
169 ea03467c Iustin Pop
  @type ops: list
170 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
171 ea03467c Iustin Pop
  @type log_serial: int
172 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
173 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
174 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
175 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
176 e2715f69 Michael Hanselmann

177 e2715f69 Michael Hanselmann
  """
178 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
179 26d3fd2f Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
180 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
181 66d895a8 Iustin Pop
               "__weakref__"]
182 66d895a8 Iustin Pop
183 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
184 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
185 ea03467c Iustin Pop

186 ea03467c Iustin Pop
    @type queue: L{JobQueue}
187 ea03467c Iustin Pop
    @param queue: our parent queue
188 ea03467c Iustin Pop
    @type job_id: job_id
189 ea03467c Iustin Pop
    @param job_id: our job id
190 ea03467c Iustin Pop
    @type ops: list
191 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
192 ea03467c Iustin Pop
        in _QueuedOpCodes
193 ea03467c Iustin Pop

194 ea03467c Iustin Pop
    """
195 e2715f69 Michael Hanselmann
    if not ops:
196 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
197 e2715f69 Michael Hanselmann
198 85f03e0d Michael Hanselmann
    self.queue = queue
199 f1da30e6 Michael Hanselmann
    self.id = job_id
200 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
201 6c5a7090 Michael Hanselmann
    self.log_serial = 0
202 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
203 c56ec146 Iustin Pop
    self.start_timestamp = None
204 c56ec146 Iustin Pop
    self.end_timestamp = None
205 6c5a7090 Michael Hanselmann
206 fa4aa6b4 Michael Hanselmann
    self._InitInMemory(self)
207 fa4aa6b4 Michael Hanselmann
208 fa4aa6b4 Michael Hanselmann
  @staticmethod
209 fa4aa6b4 Michael Hanselmann
  def _InitInMemory(obj):
210 fa4aa6b4 Michael Hanselmann
    """Initializes in-memory variables.
211 fa4aa6b4 Michael Hanselmann

212 fa4aa6b4 Michael Hanselmann
    """
213 03b63608 Michael Hanselmann
    obj.ops_iter = None
214 26d3fd2f Michael Hanselmann
    obj.cur_opctx = None
215 be760ba8 Michael Hanselmann
216 9fa2e150 Michael Hanselmann
  def __repr__(self):
217 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
218 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
219 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
220 9fa2e150 Michael Hanselmann
221 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
222 9fa2e150 Michael Hanselmann
223 f1da30e6 Michael Hanselmann
  @classmethod
224 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
225 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
226 ea03467c Iustin Pop

227 ea03467c Iustin Pop
    @type queue: L{JobQueue}
228 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
229 ea03467c Iustin Pop
    @type state: dict
230 ea03467c Iustin Pop
    @param state: the serialized state
231 ea03467c Iustin Pop
    @rtype: _JobQueue
232 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
233 ea03467c Iustin Pop

234 ea03467c Iustin Pop
    """
235 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
236 85f03e0d Michael Hanselmann
    obj.queue = queue
237 85f03e0d Michael Hanselmann
    obj.id = state["id"]
238 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
239 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
240 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
241 6c5a7090 Michael Hanselmann
242 6c5a7090 Michael Hanselmann
    obj.ops = []
243 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
244 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
245 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
246 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
247 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
248 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
249 6c5a7090 Michael Hanselmann
250 fa4aa6b4 Michael Hanselmann
    cls._InitInMemory(obj)
251 be760ba8 Michael Hanselmann
252 f1da30e6 Michael Hanselmann
    return obj
253 f1da30e6 Michael Hanselmann
254 f1da30e6 Michael Hanselmann
  def Serialize(self):
255 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
256 ea03467c Iustin Pop

257 ea03467c Iustin Pop
    @rtype: dict
258 ea03467c Iustin Pop
    @return: the serialized state
259 ea03467c Iustin Pop

260 ea03467c Iustin Pop
    """
261 f1da30e6 Michael Hanselmann
    return {
262 f1da30e6 Michael Hanselmann
      "id": self.id,
263 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
264 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
265 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
266 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
267 f1da30e6 Michael Hanselmann
      }
268 f1da30e6 Michael Hanselmann
269 85f03e0d Michael Hanselmann
  def CalcStatus(self):
270 ea03467c Iustin Pop
    """Compute the status of this job.
271 ea03467c Iustin Pop

272 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
273 ea03467c Iustin Pop
    based on their status, computes the job status.
274 ea03467c Iustin Pop

275 ea03467c Iustin Pop
    The algorithm is:
276 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
277 ea03467c Iustin Pop
        status will be the same
278 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
279 ea03467c Iustin Pop
          - waitlock
280 fbf0262f Michael Hanselmann
          - canceling
281 ea03467c Iustin Pop
          - running
282 ea03467c Iustin Pop

283 ea03467c Iustin Pop
        will determine the job status
284 ea03467c Iustin Pop

285 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
286 ea03467c Iustin Pop
        and the job status will be the same
287 ea03467c Iustin Pop

288 ea03467c Iustin Pop
    @return: the job status
289 ea03467c Iustin Pop

290 ea03467c Iustin Pop
    """
291 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
292 e2715f69 Michael Hanselmann
293 e2715f69 Michael Hanselmann
    all_success = True
294 85f03e0d Michael Hanselmann
    for op in self.ops:
295 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
296 e2715f69 Michael Hanselmann
        continue
297 e2715f69 Michael Hanselmann
298 e2715f69 Michael Hanselmann
      all_success = False
299 e2715f69 Michael Hanselmann
300 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
301 e2715f69 Michael Hanselmann
        pass
302 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
303 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
304 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
305 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
306 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
307 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
308 fbf0262f Michael Hanselmann
        break
309 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
310 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
311 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
312 f1da30e6 Michael Hanselmann
        break
313 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
314 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
315 4cb1d919 Michael Hanselmann
        break
316 e2715f69 Michael Hanselmann
317 e2715f69 Michael Hanselmann
    if all_success:
318 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
319 e2715f69 Michael Hanselmann
320 e2715f69 Michael Hanselmann
    return status
321 e2715f69 Michael Hanselmann
322 8f5c488d Michael Hanselmann
  def CalcPriority(self):
323 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
324 8f5c488d Michael Hanselmann

325 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
326 8f5c488d Michael Hanselmann
    priority is used.
327 8f5c488d Michael Hanselmann

328 8f5c488d Michael Hanselmann
    @rtype: int
329 8f5c488d Michael Hanselmann

330 8f5c488d Michael Hanselmann
    """
331 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
332 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
333 8f5c488d Michael Hanselmann
334 8f5c488d Michael Hanselmann
    if not priorities:
335 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
336 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
337 8f5c488d Michael Hanselmann
338 8f5c488d Michael Hanselmann
    return min(priorities)
339 8f5c488d Michael Hanselmann
340 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
341 ea03467c Iustin Pop
    """Selectively returns the log entries.
342 ea03467c Iustin Pop

343 ea03467c Iustin Pop
    @type newer_than: None or int
344 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
345 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
346 ea03467c Iustin Pop
        than this value
347 ea03467c Iustin Pop
    @rtype: list
348 ea03467c Iustin Pop
    @return: the list of the log entries selected
349 ea03467c Iustin Pop

350 ea03467c Iustin Pop
    """
351 6c5a7090 Michael Hanselmann
    if newer_than is None:
352 6c5a7090 Michael Hanselmann
      serial = -1
353 6c5a7090 Michael Hanselmann
    else:
354 6c5a7090 Michael Hanselmann
      serial = newer_than
355 6c5a7090 Michael Hanselmann
356 6c5a7090 Michael Hanselmann
    entries = []
357 6c5a7090 Michael Hanselmann
    for op in self.ops:
358 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
359 6c5a7090 Michael Hanselmann
360 6c5a7090 Michael Hanselmann
    return entries
361 6c5a7090 Michael Hanselmann
362 6a290889 Guido Trotter
  def GetInfo(self, fields):
363 6a290889 Guido Trotter
    """Returns information about a job.
364 6a290889 Guido Trotter

365 6a290889 Guido Trotter
    @type fields: list
366 6a290889 Guido Trotter
    @param fields: names of fields to return
367 6a290889 Guido Trotter
    @rtype: list
368 6a290889 Guido Trotter
    @return: list with one element for each field
369 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
370 6a290889 Guido Trotter
        has been passed
371 6a290889 Guido Trotter

372 6a290889 Guido Trotter
    """
373 6a290889 Guido Trotter
    row = []
374 6a290889 Guido Trotter
    for fname in fields:
375 6a290889 Guido Trotter
      if fname == "id":
376 6a290889 Guido Trotter
        row.append(self.id)
377 6a290889 Guido Trotter
      elif fname == "status":
378 6a290889 Guido Trotter
        row.append(self.CalcStatus())
379 b8802cc4 Michael Hanselmann
      elif fname == "priority":
380 b8802cc4 Michael Hanselmann
        row.append(self.CalcPriority())
381 6a290889 Guido Trotter
      elif fname == "ops":
382 6a290889 Guido Trotter
        row.append([op.input.__getstate__() for op in self.ops])
383 6a290889 Guido Trotter
      elif fname == "opresult":
384 6a290889 Guido Trotter
        row.append([op.result for op in self.ops])
385 6a290889 Guido Trotter
      elif fname == "opstatus":
386 6a290889 Guido Trotter
        row.append([op.status for op in self.ops])
387 6a290889 Guido Trotter
      elif fname == "oplog":
388 6a290889 Guido Trotter
        row.append([op.log for op in self.ops])
389 6a290889 Guido Trotter
      elif fname == "opstart":
390 6a290889 Guido Trotter
        row.append([op.start_timestamp for op in self.ops])
391 6a290889 Guido Trotter
      elif fname == "opexec":
392 6a290889 Guido Trotter
        row.append([op.exec_timestamp for op in self.ops])
393 6a290889 Guido Trotter
      elif fname == "opend":
394 6a290889 Guido Trotter
        row.append([op.end_timestamp for op in self.ops])
395 b8802cc4 Michael Hanselmann
      elif fname == "oppriority":
396 b8802cc4 Michael Hanselmann
        row.append([op.priority for op in self.ops])
397 6a290889 Guido Trotter
      elif fname == "received_ts":
398 6a290889 Guido Trotter
        row.append(self.received_timestamp)
399 6a290889 Guido Trotter
      elif fname == "start_ts":
400 6a290889 Guido Trotter
        row.append(self.start_timestamp)
401 6a290889 Guido Trotter
      elif fname == "end_ts":
402 6a290889 Guido Trotter
        row.append(self.end_timestamp)
403 6a290889 Guido Trotter
      elif fname == "summary":
404 6a290889 Guido Trotter
        row.append([op.input.Summary() for op in self.ops])
405 6a290889 Guido Trotter
      else:
406 6a290889 Guido Trotter
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
407 6a290889 Guido Trotter
    return row
408 6a290889 Guido Trotter
409 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
410 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
411 34327f51 Iustin Pop

412 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
413 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
414 34327f51 Iustin Pop
    finalised are not changed.
415 34327f51 Iustin Pop

416 34327f51 Iustin Pop
    @param status: a given opcode status
417 34327f51 Iustin Pop
    @param result: the opcode result
418 34327f51 Iustin Pop

419 34327f51 Iustin Pop
    """
420 747f6113 Michael Hanselmann
    not_marked = True
421 747f6113 Michael Hanselmann
    for op in self.ops:
422 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
423 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
424 747f6113 Michael Hanselmann
        continue
425 747f6113 Michael Hanselmann
      op.status = status
426 747f6113 Michael Hanselmann
      op.result = result
427 747f6113 Michael Hanselmann
      not_marked = False
428 34327f51 Iustin Pop
429 099b2870 Michael Hanselmann
  def Cancel(self):
430 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
431 a0d2fe2c Michael Hanselmann

432 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
433 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
434 a0d2fe2c Michael Hanselmann
      as canceling and a text message
435 a0d2fe2c Michael Hanselmann

436 a0d2fe2c Michael Hanselmann
    """
437 099b2870 Michael Hanselmann
    status = self.CalcStatus()
438 099b2870 Michael Hanselmann
439 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
440 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
441 099b2870 Michael Hanselmann
                             "Job canceled by request")
442 86b16e9d Michael Hanselmann
      return (True, "Job %s canceled" % self.id)
443 099b2870 Michael Hanselmann
444 099b2870 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITLOCK:
445 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
446 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
447 86b16e9d Michael Hanselmann
      return (True, "Job %s will be canceled" % self.id)
448 099b2870 Michael Hanselmann
449 86b16e9d Michael Hanselmann
    else:
450 86b16e9d Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
451 86b16e9d Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
452 099b2870 Michael Hanselmann
453 f1048938 Iustin Pop
454 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
455 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
456 031a3e57 Michael Hanselmann
    """Initializes this class.
457 ea03467c Iustin Pop

458 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
459 031a3e57 Michael Hanselmann
    @param queue: Job queue
460 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
461 031a3e57 Michael Hanselmann
    @param job: Job object
462 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
463 031a3e57 Michael Hanselmann
    @param op: OpCode
464 031a3e57 Michael Hanselmann

465 031a3e57 Michael Hanselmann
    """
466 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
467 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
468 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
469 031a3e57 Michael Hanselmann
470 031a3e57 Michael Hanselmann
    self._queue = queue
471 031a3e57 Michael Hanselmann
    self._job = job
472 031a3e57 Michael Hanselmann
    self._op = op
473 031a3e57 Michael Hanselmann
474 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
475 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
476 dc1e2262 Michael Hanselmann

477 dc1e2262 Michael Hanselmann
    """
478 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
479 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
480 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
481 dc1e2262 Michael Hanselmann
      raise CancelJob()
482 dc1e2262 Michael Hanselmann
483 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
484 031a3e57 Michael Hanselmann
  def NotifyStart(self):
485 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
486 e92376d7 Iustin Pop

487 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
488 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
489 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
490 031a3e57 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
491 e92376d7 Iustin Pop

492 e92376d7 Iustin Pop
    """
493 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
494 271daef8 Iustin Pop
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
495 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
496 fbf0262f Michael Hanselmann
497 271daef8 Iustin Pop
    # Cancel here if we were asked to
498 dc1e2262 Michael Hanselmann
    self._CheckCancel()
499 fbf0262f Michael Hanselmann
500 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
501 9bdab621 Michael Hanselmann
502 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
503 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
504 271daef8 Iustin Pop
505 271daef8 Iustin Pop
    # And finally replicate the job status
506 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
507 031a3e57 Michael Hanselmann
508 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
509 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
510 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
511 9bf5e01f Guido Trotter

512 9bf5e01f Guido Trotter
    """
513 9bf5e01f Guido Trotter
    self._job.log_serial += 1
514 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
515 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
516 9bf5e01f Guido Trotter
517 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
518 031a3e57 Michael Hanselmann
    """Append a log entry.
519 031a3e57 Michael Hanselmann

520 031a3e57 Michael Hanselmann
    """
521 031a3e57 Michael Hanselmann
    assert len(args) < 3
522 031a3e57 Michael Hanselmann
523 031a3e57 Michael Hanselmann
    if len(args) == 1:
524 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
525 031a3e57 Michael Hanselmann
      log_msg = args[0]
526 031a3e57 Michael Hanselmann
    else:
527 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
528 031a3e57 Michael Hanselmann
529 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
530 031a3e57 Michael Hanselmann
    # precision.
531 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
532 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
533 031a3e57 Michael Hanselmann
534 acf931b7 Michael Hanselmann
  def CheckCancel(self):
535 acf931b7 Michael Hanselmann
    """Check whether job has been cancelled.
536 ef2df7d3 Michael Hanselmann

537 ef2df7d3 Michael Hanselmann
    """
538 dc1e2262 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
539 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
540 dc1e2262 Michael Hanselmann
541 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
542 dc1e2262 Michael Hanselmann
    self._CheckCancel()
543 dc1e2262 Michael Hanselmann
544 031a3e57 Michael Hanselmann
545 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
546 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
547 989a8bee Michael Hanselmann
    """Initializes this class.
548 6c2549d6 Guido Trotter

549 989a8bee Michael Hanselmann
    @type fields: list of strings
550 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
551 989a8bee Michael Hanselmann
    @type prev_job_info: string
552 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
553 989a8bee Michael Hanselmann
    @type prev_log_serial: string
554 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
555 6c2549d6 Guido Trotter

556 989a8bee Michael Hanselmann
    """
557 989a8bee Michael Hanselmann
    self._fields = fields
558 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
559 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
560 6c2549d6 Guido Trotter
561 989a8bee Michael Hanselmann
  def __call__(self, job):
562 989a8bee Michael Hanselmann
    """Checks whether job has changed.
563 6c2549d6 Guido Trotter

564 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
565 989a8bee Michael Hanselmann
    @param job: Job object
566 6c2549d6 Guido Trotter

567 6c2549d6 Guido Trotter
    """
568 989a8bee Michael Hanselmann
    status = job.CalcStatus()
569 989a8bee Michael Hanselmann
    job_info = job.GetInfo(self._fields)
570 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
571 6c2549d6 Guido Trotter
572 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
573 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
574 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
575 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
576 6c2549d6 Guido Trotter
    # significantly different.
577 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
578 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
579 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
580 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
581 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
582 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
583 6c2549d6 Guido Trotter
584 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
585 6c2549d6 Guido Trotter
    # no changes.
586 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
587 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
588 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_WAITLOCK) or
589 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
590 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
591 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
592 989a8bee Michael Hanselmann
      return (job_info, log_entries)
593 6c2549d6 Guido Trotter
594 989a8bee Michael Hanselmann
    return None
595 989a8bee Michael Hanselmann
596 989a8bee Michael Hanselmann
597 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
598 989a8bee Michael Hanselmann
  def __init__(self, filename):
599 989a8bee Michael Hanselmann
    """Initializes this class.
600 989a8bee Michael Hanselmann

601 989a8bee Michael Hanselmann
    @type filename: string
602 989a8bee Michael Hanselmann
    @param filename: Path to job file
603 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
604 6c2549d6 Guido Trotter

605 989a8bee Michael Hanselmann
    """
606 989a8bee Michael Hanselmann
    self._wm = pyinotify.WatchManager()
607 989a8bee Michael Hanselmann
    self._inotify_handler = \
608 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
609 989a8bee Michael Hanselmann
    self._notifier = \
610 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
611 989a8bee Michael Hanselmann
    try:
612 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
613 989a8bee Michael Hanselmann
    except Exception:
614 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
615 989a8bee Michael Hanselmann
      self._notifier.stop()
616 989a8bee Michael Hanselmann
      raise
617 989a8bee Michael Hanselmann
618 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
619 989a8bee Michael Hanselmann
    """Callback for inotify.
620 989a8bee Michael Hanselmann

621 989a8bee Michael Hanselmann
    """
622 6c2549d6 Guido Trotter
    if not notifier_enabled:
623 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
624 989a8bee Michael Hanselmann
625 989a8bee Michael Hanselmann
  def Wait(self, timeout):
626 989a8bee Michael Hanselmann
    """Waits for the job file to change.
627 989a8bee Michael Hanselmann

628 989a8bee Michael Hanselmann
    @type timeout: float
629 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
630 989a8bee Michael Hanselmann
    @return: Whether there have been events
631 989a8bee Michael Hanselmann

632 989a8bee Michael Hanselmann
    """
633 989a8bee Michael Hanselmann
    assert timeout >= 0
634 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
635 989a8bee Michael Hanselmann
    if have_events:
636 989a8bee Michael Hanselmann
      self._notifier.read_events()
637 989a8bee Michael Hanselmann
    self._notifier.process_events()
638 989a8bee Michael Hanselmann
    return have_events
639 989a8bee Michael Hanselmann
640 989a8bee Michael Hanselmann
  def Close(self):
641 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
642 989a8bee Michael Hanselmann

643 989a8bee Michael Hanselmann
    """
644 989a8bee Michael Hanselmann
    self._notifier.stop()
645 989a8bee Michael Hanselmann
646 989a8bee Michael Hanselmann
647 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
648 989a8bee Michael Hanselmann
  def __init__(self, filename):
649 989a8bee Michael Hanselmann
    """Initializes this class.
650 989a8bee Michael Hanselmann

651 989a8bee Michael Hanselmann
    @type filename: string
652 989a8bee Michael Hanselmann
    @param filename: Path to job file
653 989a8bee Michael Hanselmann

654 989a8bee Michael Hanselmann
    """
655 989a8bee Michael Hanselmann
    self._filewaiter = None
656 989a8bee Michael Hanselmann
    self._filename = filename
657 6c2549d6 Guido Trotter
658 989a8bee Michael Hanselmann
  def Wait(self, timeout):
659 989a8bee Michael Hanselmann
    """Waits for a job to change.
660 6c2549d6 Guido Trotter

661 989a8bee Michael Hanselmann
    @type timeout: float
662 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
663 989a8bee Michael Hanselmann
    @return: Whether there have been events
664 989a8bee Michael Hanselmann

665 989a8bee Michael Hanselmann
    """
666 989a8bee Michael Hanselmann
    if self._filewaiter:
667 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
668 989a8bee Michael Hanselmann
669 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
670 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
671 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
672 989a8bee Michael Hanselmann
    # race condition.
673 989a8bee Michael Hanselmann
    self._filewaiter = _JobFileChangesWaiter(self._filename)
674 989a8bee Michael Hanselmann
675 989a8bee Michael Hanselmann
    return True
676 989a8bee Michael Hanselmann
677 989a8bee Michael Hanselmann
  def Close(self):
678 989a8bee Michael Hanselmann
    """Closes underlying waiter.
679 989a8bee Michael Hanselmann

680 989a8bee Michael Hanselmann
    """
681 989a8bee Michael Hanselmann
    if self._filewaiter:
682 989a8bee Michael Hanselmann
      self._filewaiter.Close()
683 989a8bee Michael Hanselmann
684 989a8bee Michael Hanselmann
685 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
686 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
687 989a8bee Michael Hanselmann

688 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
689 989a8bee Michael Hanselmann
  the current job status has changed.
690 989a8bee Michael Hanselmann

691 989a8bee Michael Hanselmann
  """
692 989a8bee Michael Hanselmann
  @staticmethod
693 989a8bee Michael Hanselmann
  def _CheckForChanges(job_load_fn, check_fn):
694 989a8bee Michael Hanselmann
    job = job_load_fn()
695 989a8bee Michael Hanselmann
    if not job:
696 989a8bee Michael Hanselmann
      raise errors.JobLost()
697 989a8bee Michael Hanselmann
698 989a8bee Michael Hanselmann
    result = check_fn(job)
699 989a8bee Michael Hanselmann
    if result is None:
700 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
701 989a8bee Michael Hanselmann
702 989a8bee Michael Hanselmann
    return result
703 989a8bee Michael Hanselmann
704 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
705 989a8bee Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout):
706 989a8bee Michael Hanselmann
    """Waits for changes on a job.
707 989a8bee Michael Hanselmann

708 989a8bee Michael Hanselmann
    @type filename: string
709 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
710 989a8bee Michael Hanselmann
    @type job_load_fn: callable
711 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
712 989a8bee Michael Hanselmann
    @type fields: list of strings
713 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
714 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
715 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
716 989a8bee Michael Hanselmann
    @type prev_log_serial: int
717 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
718 989a8bee Michael Hanselmann
    @type timeout: float
719 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
720 989a8bee Michael Hanselmann

721 989a8bee Michael Hanselmann
    """
722 6c2549d6 Guido Trotter
    try:
723 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
724 989a8bee Michael Hanselmann
      waiter = _JobChangesWaiter(filename)
725 989a8bee Michael Hanselmann
      try:
726 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
727 989a8bee Michael Hanselmann
                                          job_load_fn, check_fn),
728 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
729 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
730 989a8bee Michael Hanselmann
      finally:
731 989a8bee Michael Hanselmann
        waiter.Close()
732 6c2549d6 Guido Trotter
    except (errors.InotifyError, errors.JobLost):
733 6c2549d6 Guido Trotter
      return None
734 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
735 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
736 6c2549d6 Guido Trotter
737 6c2549d6 Guido Trotter
738 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
739 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
740 6760e4ed Michael Hanselmann

741 6760e4ed Michael Hanselmann
  """
742 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
743 6760e4ed Michael Hanselmann
    to_encode = err
744 6760e4ed Michael Hanselmann
  else:
745 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
746 6760e4ed Michael Hanselmann
747 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
748 6760e4ed Michael Hanselmann
749 6760e4ed Michael Hanselmann
750 26d3fd2f Michael Hanselmann
class _TimeoutStrategyWrapper:
751 26d3fd2f Michael Hanselmann
  def __init__(self, fn):
752 26d3fd2f Michael Hanselmann
    """Initializes this class.
753 26d3fd2f Michael Hanselmann

754 26d3fd2f Michael Hanselmann
    """
755 26d3fd2f Michael Hanselmann
    self._fn = fn
756 26d3fd2f Michael Hanselmann
    self._next = None
757 26d3fd2f Michael Hanselmann
758 26d3fd2f Michael Hanselmann
  def _Advance(self):
759 26d3fd2f Michael Hanselmann
    """Gets the next timeout if necessary.
760 26d3fd2f Michael Hanselmann

761 26d3fd2f Michael Hanselmann
    """
762 26d3fd2f Michael Hanselmann
    if self._next is None:
763 26d3fd2f Michael Hanselmann
      self._next = self._fn()
764 26d3fd2f Michael Hanselmann
765 26d3fd2f Michael Hanselmann
  def Peek(self):
766 26d3fd2f Michael Hanselmann
    """Returns the next timeout.
767 26d3fd2f Michael Hanselmann

768 26d3fd2f Michael Hanselmann
    """
769 26d3fd2f Michael Hanselmann
    self._Advance()
770 26d3fd2f Michael Hanselmann
    return self._next
771 26d3fd2f Michael Hanselmann
772 26d3fd2f Michael Hanselmann
  def Next(self):
773 26d3fd2f Michael Hanselmann
    """Returns the current timeout and advances the internal state.
774 26d3fd2f Michael Hanselmann

775 26d3fd2f Michael Hanselmann
    """
776 26d3fd2f Michael Hanselmann
    self._Advance()
777 26d3fd2f Michael Hanselmann
    result = self._next
778 26d3fd2f Michael Hanselmann
    self._next = None
779 26d3fd2f Michael Hanselmann
    return result
780 26d3fd2f Michael Hanselmann
781 26d3fd2f Michael Hanselmann
782 b80cc518 Michael Hanselmann
class _OpExecContext:
783 26d3fd2f Michael Hanselmann
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
784 b80cc518 Michael Hanselmann
    """Initializes this class.
785 b80cc518 Michael Hanselmann

786 b80cc518 Michael Hanselmann
    """
787 b80cc518 Michael Hanselmann
    self.op = op
788 b80cc518 Michael Hanselmann
    self.index = index
789 b80cc518 Michael Hanselmann
    self.log_prefix = log_prefix
790 b80cc518 Michael Hanselmann
    self.summary = op.input.Summary()
791 b80cc518 Michael Hanselmann
792 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = timeout_strategy_factory
793 26d3fd2f Michael Hanselmann
    self._ResetTimeoutStrategy()
794 26d3fd2f Michael Hanselmann
795 26d3fd2f Michael Hanselmann
  def _ResetTimeoutStrategy(self):
796 26d3fd2f Michael Hanselmann
    """Creates a new timeout strategy.
797 26d3fd2f Michael Hanselmann

798 26d3fd2f Michael Hanselmann
    """
799 26d3fd2f Michael Hanselmann
    self._timeout_strategy = \
800 26d3fd2f Michael Hanselmann
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
801 26d3fd2f Michael Hanselmann
802 26d3fd2f Michael Hanselmann
  def CheckPriorityIncrease(self):
803 26d3fd2f Michael Hanselmann
    """Checks whether priority can and should be increased.
804 26d3fd2f Michael Hanselmann

805 26d3fd2f Michael Hanselmann
    Called when locks couldn't be acquired.
806 26d3fd2f Michael Hanselmann

807 26d3fd2f Michael Hanselmann
    """
808 26d3fd2f Michael Hanselmann
    op = self.op
809 26d3fd2f Michael Hanselmann
810 26d3fd2f Michael Hanselmann
    # Exhausted all retries and next round should not use blocking acquire
811 26d3fd2f Michael Hanselmann
    # for locks?
812 26d3fd2f Michael Hanselmann
    if (self._timeout_strategy.Peek() is None and
813 26d3fd2f Michael Hanselmann
        op.priority > constants.OP_PRIO_HIGHEST):
814 26d3fd2f Michael Hanselmann
      logging.debug("Increasing priority")
815 26d3fd2f Michael Hanselmann
      op.priority -= 1
816 26d3fd2f Michael Hanselmann
      self._ResetTimeoutStrategy()
817 26d3fd2f Michael Hanselmann
      return True
818 26d3fd2f Michael Hanselmann
819 26d3fd2f Michael Hanselmann
    return False
820 26d3fd2f Michael Hanselmann
821 26d3fd2f Michael Hanselmann
  def GetNextLockTimeout(self):
822 26d3fd2f Michael Hanselmann
    """Returns the next lock acquire timeout.
823 26d3fd2f Michael Hanselmann

824 26d3fd2f Michael Hanselmann
    """
825 26d3fd2f Michael Hanselmann
    return self._timeout_strategy.Next()
826 26d3fd2f Michael Hanselmann
827 b80cc518 Michael Hanselmann
828 be760ba8 Michael Hanselmann
class _JobProcessor(object):
829 26d3fd2f Michael Hanselmann
  def __init__(self, queue, opexec_fn, job,
830 26d3fd2f Michael Hanselmann
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
831 be760ba8 Michael Hanselmann
    """Initializes this class.
832 be760ba8 Michael Hanselmann

833 be760ba8 Michael Hanselmann
    """
834 be760ba8 Michael Hanselmann
    self.queue = queue
835 be760ba8 Michael Hanselmann
    self.opexec_fn = opexec_fn
836 be760ba8 Michael Hanselmann
    self.job = job
837 26d3fd2f Michael Hanselmann
    self._timeout_strategy_factory = _timeout_strategy_factory
838 be760ba8 Michael Hanselmann
839 be760ba8 Michael Hanselmann
  @staticmethod
840 26d3fd2f Michael Hanselmann
  def _FindNextOpcode(job, timeout_strategy_factory):
841 be760ba8 Michael Hanselmann
    """Locates the next opcode to run.
842 be760ba8 Michael Hanselmann

843 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
844 be760ba8 Michael Hanselmann
    @param job: Job object
845 26d3fd2f Michael Hanselmann
    @param timeout_strategy_factory: Callable to create new timeout strategy
846 be760ba8 Michael Hanselmann

847 be760ba8 Michael Hanselmann
    """
848 be760ba8 Michael Hanselmann
    # Create some sort of a cache to speed up locating next opcode for future
849 be760ba8 Michael Hanselmann
    # lookups
850 be760ba8 Michael Hanselmann
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
851 be760ba8 Michael Hanselmann
    # pending and one for processed ops.
852 03b63608 Michael Hanselmann
    if job.ops_iter is None:
853 03b63608 Michael Hanselmann
      job.ops_iter = enumerate(job.ops)
854 be760ba8 Michael Hanselmann
855 be760ba8 Michael Hanselmann
    # Find next opcode to run
856 be760ba8 Michael Hanselmann
    while True:
857 be760ba8 Michael Hanselmann
      try:
858 03b63608 Michael Hanselmann
        (idx, op) = job.ops_iter.next()
859 be760ba8 Michael Hanselmann
      except StopIteration:
860 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for a finished job")
861 be760ba8 Michael Hanselmann
862 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_RUNNING:
863 be760ba8 Michael Hanselmann
        # Found an opcode already marked as running
864 be760ba8 Michael Hanselmann
        raise errors.ProgrammerError("Called for job marked as running")
865 be760ba8 Michael Hanselmann
866 26d3fd2f Michael Hanselmann
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
867 26d3fd2f Michael Hanselmann
                             timeout_strategy_factory)
868 be760ba8 Michael Hanselmann
869 be760ba8 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELED:
870 be760ba8 Michael Hanselmann
        # Cancelled jobs are handled by the caller
871 be760ba8 Michael Hanselmann
        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
872 be760ba8 Michael Hanselmann
                              for i in job.ops[idx:])
873 be760ba8 Michael Hanselmann
874 be760ba8 Michael Hanselmann
      elif op.status in constants.OPS_FINALIZED:
875 be760ba8 Michael Hanselmann
        # This is a job that was partially completed before master daemon
876 be760ba8 Michael Hanselmann
        # shutdown, so it can be expected that some opcodes are already
877 be760ba8 Michael Hanselmann
        # completed successfully (if any did error out, then the whole job
878 be760ba8 Michael Hanselmann
        # should have been aborted and not resubmitted for processing).
879 be760ba8 Michael Hanselmann
        logging.info("%s: opcode %s already processed, skipping",
880 b80cc518 Michael Hanselmann
                     opctx.log_prefix, opctx.summary)
881 be760ba8 Michael Hanselmann
        continue
882 be760ba8 Michael Hanselmann
883 b80cc518 Michael Hanselmann
      return opctx
884 be760ba8 Michael Hanselmann
885 be760ba8 Michael Hanselmann
  @staticmethod
886 be760ba8 Michael Hanselmann
  def _MarkWaitlock(job, op):
887 be760ba8 Michael Hanselmann
    """Marks an opcode as waiting for locks.
888 be760ba8 Michael Hanselmann

889 be760ba8 Michael Hanselmann
    The job's start timestamp is also set if necessary.
890 be760ba8 Michael Hanselmann

891 be760ba8 Michael Hanselmann
    @type job: L{_QueuedJob}
892 be760ba8 Michael Hanselmann
    @param job: Job object
893 a38e8674 Michael Hanselmann
    @type op: L{_QueuedOpCode}
894 a38e8674 Michael Hanselmann
    @param op: Opcode object
895 be760ba8 Michael Hanselmann

896 be760ba8 Michael Hanselmann
    """
897 be760ba8 Michael Hanselmann
    assert op in job.ops
898 5fd6b694 Michael Hanselmann
    assert op.status in (constants.OP_STATUS_QUEUED,
899 5fd6b694 Michael Hanselmann
                         constants.OP_STATUS_WAITLOCK)
900 5fd6b694 Michael Hanselmann
901 5fd6b694 Michael Hanselmann
    update = False
902 be760ba8 Michael Hanselmann
903 be760ba8 Michael Hanselmann
    op.result = None
904 5fd6b694 Michael Hanselmann
905 5fd6b694 Michael Hanselmann
    if op.status == constants.OP_STATUS_QUEUED:
906 5fd6b694 Michael Hanselmann
      op.status = constants.OP_STATUS_WAITLOCK
907 5fd6b694 Michael Hanselmann
      update = True
908 5fd6b694 Michael Hanselmann
909 5fd6b694 Michael Hanselmann
    if op.start_timestamp is None:
910 5fd6b694 Michael Hanselmann
      op.start_timestamp = TimeStampNow()
911 5fd6b694 Michael Hanselmann
      update = True
912 be760ba8 Michael Hanselmann
913 be760ba8 Michael Hanselmann
    if job.start_timestamp is None:
914 be760ba8 Michael Hanselmann
      job.start_timestamp = op.start_timestamp
915 5fd6b694 Michael Hanselmann
      update = True
916 5fd6b694 Michael Hanselmann
917 5fd6b694 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITLOCK
918 5fd6b694 Michael Hanselmann
919 5fd6b694 Michael Hanselmann
    return update
920 be760ba8 Michael Hanselmann
921 b80cc518 Michael Hanselmann
  def _ExecOpCodeUnlocked(self, opctx):
922 be760ba8 Michael Hanselmann
    """Processes one opcode and returns the result.
923 be760ba8 Michael Hanselmann

924 be760ba8 Michael Hanselmann
    """
925 b80cc518 Michael Hanselmann
    op = opctx.op
926 b80cc518 Michael Hanselmann
927 be760ba8 Michael Hanselmann
    assert op.status == constants.OP_STATUS_WAITLOCK
928 be760ba8 Michael Hanselmann
929 26d3fd2f Michael Hanselmann
    timeout = opctx.GetNextLockTimeout()
930 26d3fd2f Michael Hanselmann
931 be760ba8 Michael Hanselmann
    try:
932 be760ba8 Michael Hanselmann
      # Make sure not to hold queue lock while calling ExecOpCode
933 be760ba8 Michael Hanselmann
      result = self.opexec_fn(op.input,
934 26d3fd2f Michael Hanselmann
                              _OpExecCallbacks(self.queue, self.job, op),
935 f23db633 Michael Hanselmann
                              timeout=timeout, priority=op.priority)
936 26d3fd2f Michael Hanselmann
    except mcpu.LockAcquireTimeout:
937 26d3fd2f Michael Hanselmann
      assert timeout is not None, "Received timeout for blocking acquire"
938 26d3fd2f Michael Hanselmann
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
939 9e49dfc5 Michael Hanselmann
940 9e49dfc5 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_WAITLOCK,
941 9e49dfc5 Michael Hanselmann
                           constants.OP_STATUS_CANCELING)
942 9e49dfc5 Michael Hanselmann
943 9e49dfc5 Michael Hanselmann
      # Was job cancelled while we were waiting for the lock?
944 9e49dfc5 Michael Hanselmann
      if op.status == constants.OP_STATUS_CANCELING:
945 9e49dfc5 Michael Hanselmann
        return (constants.OP_STATUS_CANCELING, None)
946 9e49dfc5 Michael Hanselmann
947 5fd6b694 Michael Hanselmann
      # Stay in waitlock while trying to re-acquire lock
948 5fd6b694 Michael Hanselmann
      return (constants.OP_STATUS_WAITLOCK, None)
949 be760ba8 Michael Hanselmann
    except CancelJob:
950 b80cc518 Michael Hanselmann
      logging.exception("%s: Canceling job", opctx.log_prefix)
951 be760ba8 Michael Hanselmann
      assert op.status == constants.OP_STATUS_CANCELING
952 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_CANCELING, None)
953 be760ba8 Michael Hanselmann
    except Exception, err: # pylint: disable-msg=W0703
954 b80cc518 Michael Hanselmann
      logging.exception("%s: Caught exception in %s",
955 b80cc518 Michael Hanselmann
                        opctx.log_prefix, opctx.summary)
956 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
957 be760ba8 Michael Hanselmann
    else:
958 b80cc518 Michael Hanselmann
      logging.debug("%s: %s successful",
959 b80cc518 Michael Hanselmann
                    opctx.log_prefix, opctx.summary)
960 be760ba8 Michael Hanselmann
      return (constants.OP_STATUS_SUCCESS, result)
961 be760ba8 Michael Hanselmann
962 26d3fd2f Michael Hanselmann
  def __call__(self, _nextop_fn=None):
963 be760ba8 Michael Hanselmann
    """Continues execution of a job.
964 be760ba8 Michael Hanselmann

965 26d3fd2f Michael Hanselmann
    @param _nextop_fn: Callback function for tests
966 be760ba8 Michael Hanselmann
    @rtype: bool
967 be760ba8 Michael Hanselmann
    @return: True if job is finished, False if processor needs to be called
968 be760ba8 Michael Hanselmann
             again
969 be760ba8 Michael Hanselmann

970 be760ba8 Michael Hanselmann
    """
971 be760ba8 Michael Hanselmann
    queue = self.queue
972 be760ba8 Michael Hanselmann
    job = self.job
973 be760ba8 Michael Hanselmann
974 be760ba8 Michael Hanselmann
    logging.debug("Processing job %s", job.id)
975 be760ba8 Michael Hanselmann
976 be760ba8 Michael Hanselmann
    queue.acquire(shared=1)
977 be760ba8 Michael Hanselmann
    try:
978 be760ba8 Michael Hanselmann
      opcount = len(job.ops)
979 be760ba8 Michael Hanselmann
980 26d3fd2f Michael Hanselmann
      # Is a previous opcode still pending?
981 26d3fd2f Michael Hanselmann
      if job.cur_opctx:
982 26d3fd2f Michael Hanselmann
        opctx = job.cur_opctx
983 5fd6b694 Michael Hanselmann
        job.cur_opctx = None
984 26d3fd2f Michael Hanselmann
      else:
985 26d3fd2f Michael Hanselmann
        if __debug__ and _nextop_fn:
986 26d3fd2f Michael Hanselmann
          _nextop_fn()
987 26d3fd2f Michael Hanselmann
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
988 26d3fd2f Michael Hanselmann
989 b80cc518 Michael Hanselmann
      op = opctx.op
990 be760ba8 Michael Hanselmann
991 be760ba8 Michael Hanselmann
      # Consistency check
992 be760ba8 Michael Hanselmann
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
993 30c945d0 Michael Hanselmann
                                     constants.OP_STATUS_CANCELING,
994 be760ba8 Michael Hanselmann
                                     constants.OP_STATUS_CANCELED)
995 5fd6b694 Michael Hanselmann
                        for i in job.ops[opctx.index + 1:])
996 be760ba8 Michael Hanselmann
997 be760ba8 Michael Hanselmann
      assert op.status in (constants.OP_STATUS_QUEUED,
998 be760ba8 Michael Hanselmann
                           constants.OP_STATUS_WAITLOCK,
999 30c945d0 Michael Hanselmann
                           constants.OP_STATUS_CANCELING,
1000 be760ba8 Michael Hanselmann
                           constants.OP_STATUS_CANCELED)
1001 be760ba8 Michael Hanselmann
1002 26d3fd2f Michael Hanselmann
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1003 26d3fd2f Michael Hanselmann
              op.priority >= constants.OP_PRIO_HIGHEST)
1004 26d3fd2f Michael Hanselmann
1005 30c945d0 Michael Hanselmann
      if op.status not in (constants.OP_STATUS_CANCELING,
1006 30c945d0 Michael Hanselmann
                           constants.OP_STATUS_CANCELED):
1007 30c945d0 Michael Hanselmann
        assert op.status in (constants.OP_STATUS_QUEUED,
1008 30c945d0 Michael Hanselmann
                             constants.OP_STATUS_WAITLOCK)
1009 30c945d0 Michael Hanselmann
1010 be760ba8 Michael Hanselmann
        # Prepare to start opcode
1011 5fd6b694 Michael Hanselmann
        if self._MarkWaitlock(job, op):
1012 5fd6b694 Michael Hanselmann
          # Write to disk
1013 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1014 be760ba8 Michael Hanselmann
1015 be760ba8 Michael Hanselmann
        assert op.status == constants.OP_STATUS_WAITLOCK
1016 be760ba8 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1017 5fd6b694 Michael Hanselmann
        assert job.start_timestamp and op.start_timestamp
1018 be760ba8 Michael Hanselmann
1019 b80cc518 Michael Hanselmann
        logging.info("%s: opcode %s waiting for locks",
1020 b80cc518 Michael Hanselmann
                     opctx.log_prefix, opctx.summary)
1021 be760ba8 Michael Hanselmann
1022 be760ba8 Michael Hanselmann
        queue.release()
1023 be760ba8 Michael Hanselmann
        try:
1024 b80cc518 Michael Hanselmann
          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1025 be760ba8 Michael Hanselmann
        finally:
1026 be760ba8 Michael Hanselmann
          queue.acquire(shared=1)
1027 be760ba8 Michael Hanselmann
1028 be760ba8 Michael Hanselmann
        op.status = op_status
1029 be760ba8 Michael Hanselmann
        op.result = op_result
1030 be760ba8 Michael Hanselmann
1031 5fd6b694 Michael Hanselmann
        if op.status == constants.OP_STATUS_WAITLOCK:
1032 26d3fd2f Michael Hanselmann
          # Couldn't get locks in time
1033 26d3fd2f Michael Hanselmann
          assert not op.end_timestamp
1034 be760ba8 Michael Hanselmann
        else:
1035 26d3fd2f Michael Hanselmann
          # Finalize opcode
1036 26d3fd2f Michael Hanselmann
          op.end_timestamp = TimeStampNow()
1037 be760ba8 Michael Hanselmann
1038 26d3fd2f Michael Hanselmann
          if op.status == constants.OP_STATUS_CANCELING:
1039 26d3fd2f Michael Hanselmann
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1040 26d3fd2f Michael Hanselmann
                                  for i in job.ops[opctx.index:])
1041 26d3fd2f Michael Hanselmann
          else:
1042 26d3fd2f Michael Hanselmann
            assert op.status in constants.OPS_FINALIZED
1043 be760ba8 Michael Hanselmann
1044 5fd6b694 Michael Hanselmann
      if op.status == constants.OP_STATUS_WAITLOCK:
1045 be760ba8 Michael Hanselmann
        finalize = False
1046 be760ba8 Michael Hanselmann
1047 5fd6b694 Michael Hanselmann
        if opctx.CheckPriorityIncrease():
1048 5fd6b694 Michael Hanselmann
          # Priority was changed, need to update on-disk file
1049 5fd6b694 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
1050 be760ba8 Michael Hanselmann
1051 26d3fd2f Michael Hanselmann
        # Keep around for another round
1052 26d3fd2f Michael Hanselmann
        job.cur_opctx = opctx
1053 be760ba8 Michael Hanselmann
1054 26d3fd2f Michael Hanselmann
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1055 26d3fd2f Michael Hanselmann
                op.priority >= constants.OP_PRIO_HIGHEST)
1056 be760ba8 Michael Hanselmann
1057 26d3fd2f Michael Hanselmann
        # In no case must the status be finalized here
1058 5fd6b694 Michael Hanselmann
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1059 be760ba8 Michael Hanselmann
1060 be760ba8 Michael Hanselmann
      else:
1061 26d3fd2f Michael Hanselmann
        # Ensure all opcodes so far have been successful
1062 26d3fd2f Michael Hanselmann
        assert (opctx.index == 0 or
1063 26d3fd2f Michael Hanselmann
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1064 26d3fd2f Michael Hanselmann
                           for i in job.ops[:opctx.index]))
1065 26d3fd2f Michael Hanselmann
1066 26d3fd2f Michael Hanselmann
        # Reset context
1067 26d3fd2f Michael Hanselmann
        job.cur_opctx = None
1068 26d3fd2f Michael Hanselmann
1069 26d3fd2f Michael Hanselmann
        if op.status == constants.OP_STATUS_SUCCESS:
1070 26d3fd2f Michael Hanselmann
          finalize = False
1071 26d3fd2f Michael Hanselmann
1072 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_ERROR:
1073 26d3fd2f Michael Hanselmann
          # Ensure failed opcode has an exception as its result
1074 26d3fd2f Michael Hanselmann
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1075 26d3fd2f Michael Hanselmann
1076 26d3fd2f Michael Hanselmann
          to_encode = errors.OpExecError("Preceding opcode failed")
1077 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1078 26d3fd2f Michael Hanselmann
                                _EncodeOpError(to_encode))
1079 26d3fd2f Michael Hanselmann
          finalize = True
1080 be760ba8 Michael Hanselmann
1081 26d3fd2f Michael Hanselmann
          # Consistency check
1082 26d3fd2f Michael Hanselmann
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1083 26d3fd2f Michael Hanselmann
                            errors.GetEncodedError(i.result)
1084 26d3fd2f Michael Hanselmann
                            for i in job.ops[opctx.index:])
1085 be760ba8 Michael Hanselmann
1086 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELING:
1087 26d3fd2f Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1088 26d3fd2f Michael Hanselmann
                                "Job canceled by request")
1089 26d3fd2f Michael Hanselmann
          finalize = True
1090 26d3fd2f Michael Hanselmann
1091 26d3fd2f Michael Hanselmann
        elif op.status == constants.OP_STATUS_CANCELED:
1092 26d3fd2f Michael Hanselmann
          finalize = True
1093 26d3fd2f Michael Hanselmann
1094 26d3fd2f Michael Hanselmann
        else:
1095 26d3fd2f Michael Hanselmann
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1096 26d3fd2f Michael Hanselmann
1097 26d3fd2f Michael Hanselmann
        # Finalizing or last opcode?
1098 26d3fd2f Michael Hanselmann
        if finalize or opctx.index == (opcount - 1):
1099 26d3fd2f Michael Hanselmann
          # All opcodes have been run, finalize job
1100 26d3fd2f Michael Hanselmann
          job.end_timestamp = TimeStampNow()
1101 26d3fd2f Michael Hanselmann
1102 26d3fd2f Michael Hanselmann
        # Write to disk. If the job status is final, this is the final write
1103 26d3fd2f Michael Hanselmann
        # allowed. Once the file has been written, it can be archived anytime.
1104 26d3fd2f Michael Hanselmann
        queue.UpdateJobUnlocked(job)
1105 be760ba8 Michael Hanselmann
1106 26d3fd2f Michael Hanselmann
        if finalize or opctx.index == (opcount - 1):
1107 26d3fd2f Michael Hanselmann
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1108 26d3fd2f Michael Hanselmann
          return True
1109 be760ba8 Michael Hanselmann
1110 be760ba8 Michael Hanselmann
      return False
1111 be760ba8 Michael Hanselmann
    finally:
1112 be760ba8 Michael Hanselmann
      queue.release()
1113 be760ba8 Michael Hanselmann
1114 be760ba8 Michael Hanselmann
1115 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
1116 031a3e57 Michael Hanselmann
  """The actual job workers.
1117 031a3e57 Michael Hanselmann

1118 031a3e57 Michael Hanselmann
  """
1119 7260cfbe Iustin Pop
  def RunTask(self, job): # pylint: disable-msg=W0221
1120 e2715f69 Michael Hanselmann
    """Job executor.
1121 e2715f69 Michael Hanselmann

1122 be760ba8 Michael Hanselmann
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
1123 be760ba8 Michael Hanselmann
    L{_QueuedOpCode} classes.
1124 e2715f69 Michael Hanselmann

1125 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1126 ea03467c Iustin Pop
    @param job: the job to be processed
1127 ea03467c Iustin Pop

1128 e2715f69 Michael Hanselmann
    """
1129 be760ba8 Michael Hanselmann
    queue = job.queue
1130 be760ba8 Michael Hanselmann
    assert queue == self.pool.queue
1131 be760ba8 Michael Hanselmann
1132 daba67c7 Michael Hanselmann
    self.SetTaskName("Job%s" % job.id)
1133 daba67c7 Michael Hanselmann
1134 be760ba8 Michael Hanselmann
    proc = mcpu.Processor(queue.context, job.id)
1135 be760ba8 Michael Hanselmann
1136 be760ba8 Michael Hanselmann
    if not _JobProcessor(queue, proc.ExecOpCode, job)():
1137 be760ba8 Michael Hanselmann
      # Schedule again
1138 26d3fd2f Michael Hanselmann
      raise workerpool.DeferTask(priority=job.CalcPriority())
1139 e2715f69 Michael Hanselmann
1140 e2715f69 Michael Hanselmann
1141 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
1142 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
1143 ea03467c Iustin Pop

1144 ea03467c Iustin Pop
  """
1145 5bdce580 Michael Hanselmann
  def __init__(self, queue):
1146 89e2b4d2 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
1147 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
1148 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
1149 5bdce580 Michael Hanselmann
    self.queue = queue
1150 e2715f69 Michael Hanselmann
1151 e2715f69 Michael Hanselmann
1152 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
1153 6c881c52 Iustin Pop
  """Decorator for "public" functions.
1154 ea03467c Iustin Pop

1155 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
1156 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
1157 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
1158 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
1159 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
1160 ea03467c Iustin Pop

1161 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
1162 f1da30e6 Michael Hanselmann

1163 6c881c52 Iustin Pop
  Example::
1164 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
1165 6c881c52 Iustin Pop
    @_RequireOpenQueue
1166 6c881c52 Iustin Pop
    def Example(self):
1167 6c881c52 Iustin Pop
      pass
1168 db37da70 Michael Hanselmann

1169 6c881c52 Iustin Pop
  """
1170 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
1171 7260cfbe Iustin Pop
    # pylint: disable-msg=W0212
1172 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
1173 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
1174 6c881c52 Iustin Pop
  return wrapper
1175 db37da70 Michael Hanselmann
1176 db37da70 Michael Hanselmann
1177 6c881c52 Iustin Pop
class JobQueue(object):
1178 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
1179 db37da70 Michael Hanselmann

1180 6c881c52 Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1181 6c881c52 Iustin Pop

1182 6c881c52 Iustin Pop
  """
1183 6c881c52 Iustin Pop
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1184 db37da70 Michael Hanselmann
1185 85f03e0d Michael Hanselmann
  def __init__(self, context):
1186 ea03467c Iustin Pop
    """Constructor for JobQueue.
1187 ea03467c Iustin Pop

1188 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
1189 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
1190 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
1191 ea03467c Iustin Pop
    running).
1192 ea03467c Iustin Pop

1193 ea03467c Iustin Pop
    @type context: GanetiContext
1194 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
1195 ea03467c Iustin Pop
        data and other ganeti objects
1196 ea03467c Iustin Pop

1197 ea03467c Iustin Pop
    """
1198 5bdce580 Michael Hanselmann
    self.context = context
1199 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
1200 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
1201 f1da30e6 Michael Hanselmann
1202 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
1203 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
1204 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
1205 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
1206 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
1207 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
1208 ebb80afa Guido Trotter
1209 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
1210 ebb80afa Guido Trotter
    self.release = self._lock.release
1211 85f03e0d Michael Hanselmann
1212 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
1213 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
1214 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1215 f1da30e6 Michael Hanselmann
1216 04ab05ce Michael Hanselmann
    # Read serial file
1217 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
1218 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
1219 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
1220 c4beba1c Iustin Pop
1221 23752136 Michael Hanselmann
    # Get initial list of nodes
1222 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
1223 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
1224 59303563 Iustin Pop
                       if n.master_candidate)
1225 8e00939c Michael Hanselmann
1226 8e00939c Michael Hanselmann
    # Remove master node
1227 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
1228 23752136 Michael Hanselmann
1229 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
1230 23752136 Michael Hanselmann
1231 20571a26 Guido Trotter
    self._queue_size = 0
1232 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1233 20571a26 Guido Trotter
    self._drained = self._IsQueueMarkedDrain()
1234 20571a26 Guido Trotter
1235 85f03e0d Michael Hanselmann
    # Setup worker pool
1236 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
1237 85f03e0d Michael Hanselmann
    try:
1238 de9d02c7 Michael Hanselmann
      self._InspectQueue()
1239 de9d02c7 Michael Hanselmann
    except:
1240 de9d02c7 Michael Hanselmann
      self._wpool.TerminateWorkers()
1241 de9d02c7 Michael Hanselmann
      raise
1242 711b5124 Michael Hanselmann
1243 de9d02c7 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
1244 de9d02c7 Michael Hanselmann
  @_RequireOpenQueue
1245 de9d02c7 Michael Hanselmann
  def _InspectQueue(self):
1246 de9d02c7 Michael Hanselmann
    """Loads the whole job queue and resumes unfinished jobs.
1247 de9d02c7 Michael Hanselmann

1248 de9d02c7 Michael Hanselmann
    This function needs the lock here because WorkerPool.AddTask() may start a
1249 de9d02c7 Michael Hanselmann
    job while we're still doing our work.
1250 711b5124 Michael Hanselmann

1251 de9d02c7 Michael Hanselmann
    """
1252 de9d02c7 Michael Hanselmann
    logging.info("Inspecting job queue")
1253 de9d02c7 Michael Hanselmann
1254 7b5c4a69 Michael Hanselmann
    restartjobs = []
1255 7b5c4a69 Michael Hanselmann
1256 de9d02c7 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked()
1257 de9d02c7 Michael Hanselmann
    jobs_count = len(all_job_ids)
1258 de9d02c7 Michael Hanselmann
    lastinfo = time.time()
1259 de9d02c7 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1260 de9d02c7 Michael Hanselmann
      # Give an update every 1000 jobs or 10 seconds
1261 de9d02c7 Michael Hanselmann
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1262 de9d02c7 Michael Hanselmann
          idx == (jobs_count - 1)):
1263 de9d02c7 Michael Hanselmann
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1264 de9d02c7 Michael Hanselmann
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1265 711b5124 Michael Hanselmann
        lastinfo = time.time()
1266 94ed59a5 Iustin Pop
1267 de9d02c7 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1268 85f03e0d Michael Hanselmann
1269 de9d02c7 Michael Hanselmann
      # a failure in loading the job can cause 'None' to be returned
1270 de9d02c7 Michael Hanselmann
      if job is None:
1271 de9d02c7 Michael Hanselmann
        continue
1272 85f03e0d Michael Hanselmann
1273 de9d02c7 Michael Hanselmann
      status = job.CalcStatus()
1274 711b5124 Michael Hanselmann
1275 320d1daf Michael Hanselmann
      if status == constants.JOB_STATUS_QUEUED:
1276 7b5c4a69 Michael Hanselmann
        restartjobs.append(job)
1277 de9d02c7 Michael Hanselmann
1278 de9d02c7 Michael Hanselmann
      elif status in (constants.JOB_STATUS_RUNNING,
1279 5ef699a0 Michael Hanselmann
                      constants.JOB_STATUS_WAITLOCK,
1280 de9d02c7 Michael Hanselmann
                      constants.JOB_STATUS_CANCELING):
1281 de9d02c7 Michael Hanselmann
        logging.warning("Unfinished job %s found: %s", job.id, job)
1282 320d1daf Michael Hanselmann
1283 320d1daf Michael Hanselmann
        if status == constants.JOB_STATUS_WAITLOCK:
1284 320d1daf Michael Hanselmann
          # Restart job
1285 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1286 320d1daf Michael Hanselmann
          restartjobs.append(job)
1287 320d1daf Michael Hanselmann
        else:
1288 320d1daf Michael Hanselmann
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1289 320d1daf Michael Hanselmann
                                "Unclean master daemon shutdown")
1290 320d1daf Michael Hanselmann
1291 de9d02c7 Michael Hanselmann
        self.UpdateJobUnlocked(job)
1292 de9d02c7 Michael Hanselmann
1293 7b5c4a69 Michael Hanselmann
    if restartjobs:
1294 7b5c4a69 Michael Hanselmann
      logging.info("Restarting %s jobs", len(restartjobs))
1295 7b5c4a69 Michael Hanselmann
      self._EnqueueJobs(restartjobs)
1296 7b5c4a69 Michael Hanselmann
1297 de9d02c7 Michael Hanselmann
    logging.info("Job queue inspection finished")
1298 85f03e0d Michael Hanselmann
1299 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1300 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1301 99aabbed Iustin Pop
  def AddNode(self, node):
1302 99aabbed Iustin Pop
    """Register a new node with the queue.
1303 99aabbed Iustin Pop

1304 99aabbed Iustin Pop
    @type node: L{objects.Node}
1305 99aabbed Iustin Pop
    @param node: the node object to be added
1306 99aabbed Iustin Pop

1307 99aabbed Iustin Pop
    """
1308 99aabbed Iustin Pop
    node_name = node.name
1309 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1310 23752136 Michael Hanselmann
1311 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1312 c8457ce7 Iustin Pop
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1313 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1314 c8457ce7 Iustin Pop
    if msg:
1315 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1316 c8457ce7 Iustin Pop
                      node_name, msg)
1317 23752136 Michael Hanselmann
1318 59303563 Iustin Pop
    if not node.master_candidate:
1319 59303563 Iustin Pop
      # remove if existing, ignoring errors
1320 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1321 59303563 Iustin Pop
      # and skip the replication of the job ids
1322 59303563 Iustin Pop
      return
1323 59303563 Iustin Pop
1324 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1325 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1326 23752136 Michael Hanselmann
1327 d2e03a33 Michael Hanselmann
    # Upload current serial file
1328 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1329 d2e03a33 Michael Hanselmann
1330 d2e03a33 Michael Hanselmann
    for file_name in files:
1331 9f774ee8 Michael Hanselmann
      # Read file content
1332 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1333 9f774ee8 Michael Hanselmann
1334 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1335 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
1336 a3811745 Michael Hanselmann
                                                  file_name, content)
1337 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1338 c8457ce7 Iustin Pop
      if msg:
1339 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1340 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1341 d2e03a33 Michael Hanselmann
1342 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1343 d2e03a33 Michael Hanselmann
1344 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1345 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1346 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1347 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1348 ea03467c Iustin Pop

1349 ea03467c Iustin Pop
    @type node_name: str
1350 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1351 ea03467c Iustin Pop

1352 ea03467c Iustin Pop
    """
1353 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1354 23752136 Michael Hanselmann
1355 7e950d31 Iustin Pop
  @staticmethod
1356 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1357 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1358 ea03467c Iustin Pop

1359 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1360 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1361 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1362 ea03467c Iustin Pop

1363 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1364 ea03467c Iustin Pop
    @type nodes: list
1365 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1366 ea03467c Iustin Pop
    @type failmsg: str
1367 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1368 ea03467c Iustin Pop

1369 ea03467c Iustin Pop
    """
1370 e74798c1 Michael Hanselmann
    failed = []
1371 e74798c1 Michael Hanselmann
    success = []
1372 e74798c1 Michael Hanselmann
1373 e74798c1 Michael Hanselmann
    for node in nodes:
1374 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1375 c8457ce7 Iustin Pop
      if msg:
1376 e74798c1 Michael Hanselmann
        failed.append(node)
1377 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1378 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1379 c8457ce7 Iustin Pop
      else:
1380 c8457ce7 Iustin Pop
        success.append(node)
1381 e74798c1 Michael Hanselmann
1382 e74798c1 Michael Hanselmann
    # +1 for the master node
1383 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1384 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1385 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1386 e74798c1 Michael Hanselmann
1387 99aabbed Iustin Pop
  def _GetNodeIp(self):
1388 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1389 99aabbed Iustin Pop

1390 ea03467c Iustin Pop
    @rtype: (list, list)
1391 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1392 ea03467c Iustin Pop
        names and the second one with the node addresses
1393 ea03467c Iustin Pop

1394 99aabbed Iustin Pop
    """
1395 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1396 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1397 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1398 99aabbed Iustin Pop
    return name_list, addr_list
1399 99aabbed Iustin Pop
1400 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1401 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1402 8e00939c Michael Hanselmann

1403 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1404 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1405 ea03467c Iustin Pop

1406 ea03467c Iustin Pop
    @type file_name: str
1407 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1408 ea03467c Iustin Pop
    @type data: str
1409 ea03467c Iustin Pop
    @param data: the new contents of the file
1410 4c36bdf5 Guido Trotter
    @type replicate: boolean
1411 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1412 ea03467c Iustin Pop

1413 8e00939c Michael Hanselmann
    """
1414 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1415 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1416 82b22e19 René Nussbaumer
                    gid=getents.masterd_gid)
1417 8e00939c Michael Hanselmann
1418 4c36bdf5 Guido Trotter
    if replicate:
1419 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1420 4c36bdf5 Guido Trotter
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1421 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1422 23752136 Michael Hanselmann
1423 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1424 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1425 ea03467c Iustin Pop

1426 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1427 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1428 ea03467c Iustin Pop

1429 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1430 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1431 ea03467c Iustin Pop

1432 ea03467c Iustin Pop
    """
1433 dd875d32 Michael Hanselmann
    # Rename them locally
1434 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1435 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1436 abc1f2ce Michael Hanselmann
1437 dd875d32 Michael Hanselmann
    # ... and on all nodes
1438 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1439 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1440 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1441 abc1f2ce Michael Hanselmann
1442 7e950d31 Iustin Pop
  @staticmethod
1443 7e950d31 Iustin Pop
  def _FormatJobID(job_id):
1444 ea03467c Iustin Pop
    """Convert a job ID to string format.
1445 ea03467c Iustin Pop

1446 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
1447 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
1448 ea03467c Iustin Pop
    abstract this change.
1449 ea03467c Iustin Pop

1450 ea03467c Iustin Pop
    @type job_id: int or long
1451 ea03467c Iustin Pop
    @param job_id: the numeric job id
1452 ea03467c Iustin Pop
    @rtype: str
1453 ea03467c Iustin Pop
    @return: the formatted job id
1454 ea03467c Iustin Pop

1455 ea03467c Iustin Pop
    """
1456 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
1457 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1458 85f03e0d Michael Hanselmann
    if job_id < 0:
1459 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1460 85f03e0d Michael Hanselmann
1461 85f03e0d Michael Hanselmann
    return str(job_id)
1462 85f03e0d Michael Hanselmann
1463 58b22b6e Michael Hanselmann
  @classmethod
1464 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
1465 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
1466 58b22b6e Michael Hanselmann

1467 58b22b6e Michael Hanselmann
    @type job_id: str
1468 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
1469 58b22b6e Michael Hanselmann
    @rtype: str
1470 58b22b6e Michael Hanselmann
    @return: Directory name
1471 58b22b6e Michael Hanselmann

1472 58b22b6e Michael Hanselmann
    """
1473 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1474 58b22b6e Michael Hanselmann
1475 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1476 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1477 f1da30e6 Michael Hanselmann

1478 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1479 f1da30e6 Michael Hanselmann

1480 009e73d0 Iustin Pop
    @type count: integer
1481 009e73d0 Iustin Pop
    @param count: how many serials to return
1482 ea03467c Iustin Pop
    @rtype: str
1483 ea03467c Iustin Pop
    @return: a string representing the job identifier.
1484 f1da30e6 Michael Hanselmann

1485 f1da30e6 Michael Hanselmann
    """
1486 009e73d0 Iustin Pop
    assert count > 0
1487 f1da30e6 Michael Hanselmann
    # New number
1488 009e73d0 Iustin Pop
    serial = self._last_serial + count
1489 f1da30e6 Michael Hanselmann
1490 f1da30e6 Michael Hanselmann
    # Write to file
1491 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1492 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1493 f1da30e6 Michael Hanselmann
1494 009e73d0 Iustin Pop
    result = [self._FormatJobID(v)
1495 009e73d0 Iustin Pop
              for v in range(self._last_serial, serial + 1)]
1496 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1497 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1498 f1da30e6 Michael Hanselmann
1499 009e73d0 Iustin Pop
    return result
1500 f1da30e6 Michael Hanselmann
1501 85f03e0d Michael Hanselmann
  @staticmethod
1502 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1503 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1504 ea03467c Iustin Pop

1505 ea03467c Iustin Pop
    @type job_id: str
1506 ea03467c Iustin Pop
    @param job_id: the job identifier
1507 ea03467c Iustin Pop
    @rtype: str
1508 ea03467c Iustin Pop
    @return: the path to the job file
1509 ea03467c Iustin Pop

1510 ea03467c Iustin Pop
    """
1511 c4feafe8 Iustin Pop
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1512 f1da30e6 Michael Hanselmann
1513 58b22b6e Michael Hanselmann
  @classmethod
1514 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
1515 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1516 ea03467c Iustin Pop

1517 ea03467c Iustin Pop
    @type job_id: str
1518 ea03467c Iustin Pop
    @param job_id: the job identifier
1519 ea03467c Iustin Pop
    @rtype: str
1520 ea03467c Iustin Pop
    @return: the path to the archived job file
1521 ea03467c Iustin Pop

1522 ea03467c Iustin Pop
    """
1523 0411c011 Iustin Pop
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1524 0411c011 Iustin Pop
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1525 0cb94105 Michael Hanselmann
1526 85a1c57d Guido Trotter
  def _GetJobIDsUnlocked(self, sort=True):
1527 911a495b Iustin Pop
    """Return all known job IDs.
1528 911a495b Iustin Pop

1529 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1530 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1531 ac0930b9 Iustin Pop
    extra IDs).
1532 ac0930b9 Iustin Pop

1533 85a1c57d Guido Trotter
    @type sort: boolean
1534 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1535 ea03467c Iustin Pop
    @rtype: list
1536 ea03467c Iustin Pop
    @return: the list of job IDs
1537 ea03467c Iustin Pop

1538 911a495b Iustin Pop
    """
1539 85a1c57d Guido Trotter
    jlist = []
1540 b5b8309d Guido Trotter
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1541 85a1c57d Guido Trotter
      m = self._RE_JOB_FILE.match(filename)
1542 85a1c57d Guido Trotter
      if m:
1543 85a1c57d Guido Trotter
        jlist.append(m.group(1))
1544 85a1c57d Guido Trotter
    if sort:
1545 85a1c57d Guido Trotter
      jlist = utils.NiceSort(jlist)
1546 f0d874fe Iustin Pop
    return jlist
1547 911a495b Iustin Pop
1548 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1549 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1550 ea03467c Iustin Pop

1551 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1552 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1553 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1554 ea03467c Iustin Pop

1555 ea03467c Iustin Pop
    @param job_id: the job id
1556 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1557 ea03467c Iustin Pop
    @return: either None or the job object
1558 ea03467c Iustin Pop

1559 ea03467c Iustin Pop
    """
1560 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
1561 5685c1a5 Michael Hanselmann
    if job:
1562 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
1563 5685c1a5 Michael Hanselmann
      return job
1564 ac0930b9 Iustin Pop
1565 3d6c5566 Guido Trotter
    try:
1566 3d6c5566 Guido Trotter
      job = self._LoadJobFromDisk(job_id)
1567 aa9f8167 Iustin Pop
      if job is None:
1568 aa9f8167 Iustin Pop
        return job
1569 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
1570 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
1571 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
1572 3d6c5566 Guido Trotter
      if old_path == new_path:
1573 3d6c5566 Guido Trotter
        # job already archived (future case)
1574 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
1575 3d6c5566 Guido Trotter
      else:
1576 3d6c5566 Guido Trotter
        # non-archived case
1577 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
1578 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
1579 3d6c5566 Guido Trotter
      return None
1580 162c8636 Guido Trotter
1581 162c8636 Guido Trotter
    self._memcache[job_id] = job
1582 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
1583 162c8636 Guido Trotter
    return job
1584 162c8636 Guido Trotter
1585 162c8636 Guido Trotter
  def _LoadJobFromDisk(self, job_id):
1586 162c8636 Guido Trotter
    """Load the given job file from disk.
1587 162c8636 Guido Trotter

1588 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1589 162c8636 Guido Trotter

1590 162c8636 Guido Trotter
    @type job_id: string
1591 162c8636 Guido Trotter
    @param job_id: job identifier
1592 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
1593 162c8636 Guido Trotter
    @return: either None or the job object
1594 162c8636 Guido Trotter

1595 162c8636 Guido Trotter
    """
1596 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
1597 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
1598 f1da30e6 Michael Hanselmann
    try:
1599 13998ef2 Michael Hanselmann
      raw_data = utils.ReadFile(filepath)
1600 162c8636 Guido Trotter
    except EnvironmentError, err:
1601 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
1602 f1da30e6 Michael Hanselmann
        return None
1603 f1da30e6 Michael Hanselmann
      raise
1604 13998ef2 Michael Hanselmann
1605 94ed59a5 Iustin Pop
    try:
1606 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
1607 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
1608 7260cfbe Iustin Pop
    except Exception, err: # pylint: disable-msg=W0703
1609 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
1610 94ed59a5 Iustin Pop
1611 ac0930b9 Iustin Pop
    return job
1612 f1da30e6 Michael Hanselmann
1613 0f9c08dc Guido Trotter
  def SafeLoadJobFromDisk(self, job_id):
1614 0f9c08dc Guido Trotter
    """Load the given job file from disk.
1615 0f9c08dc Guido Trotter

1616 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1617 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
1618 0f9c08dc Guido Trotter
    exception is logged.
1619 0f9c08dc Guido Trotter

1620 0f9c08dc Guido Trotter
    @type job_id: string
1621 0f9c08dc Guido Trotter
    @param job_id: job identifier
1622 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
1623 0f9c08dc Guido Trotter
    @return: either None or the job object
1624 0f9c08dc Guido Trotter

1625 0f9c08dc Guido Trotter
    """
1626 0f9c08dc Guido Trotter
    try:
1627 0f9c08dc Guido Trotter
      return self._LoadJobFromDisk(job_id)
1628 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
1629 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
1630 0f9c08dc Guido Trotter
      return None
1631 0f9c08dc Guido Trotter
1632 686d7433 Iustin Pop
  @staticmethod
1633 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
1634 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
1635 686d7433 Iustin Pop

1636 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
1637 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
1638 686d7433 Iustin Pop

1639 ea03467c Iustin Pop
    @rtype: boolean
1640 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
1641 ea03467c Iustin Pop

1642 686d7433 Iustin Pop
    """
1643 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1644 686d7433 Iustin Pop
1645 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
1646 20571a26 Guido Trotter
    """Update the queue size.
1647 20571a26 Guido Trotter

1648 20571a26 Guido Trotter
    """
1649 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1650 20571a26 Guido Trotter
1651 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1652 20571a26 Guido Trotter
  @_RequireOpenQueue
1653 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
1654 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
1655 3ccafd0e Iustin Pop

1656 ea03467c Iustin Pop
    @type drain_flag: boolean
1657 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
1658 ea03467c Iustin Pop

1659 3ccafd0e Iustin Pop
    """
1660 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1661 82b22e19 René Nussbaumer
1662 3ccafd0e Iustin Pop
    if drain_flag:
1663 82b22e19 René Nussbaumer
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1664 82b22e19 René Nussbaumer
                      uid=getents.masterd_uid, gid=getents.masterd_gid)
1665 3ccafd0e Iustin Pop
    else:
1666 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1667 20571a26 Guido Trotter
1668 20571a26 Guido Trotter
    self._drained = drain_flag
1669 20571a26 Guido Trotter
1670 3ccafd0e Iustin Pop
    return True
1671 3ccafd0e Iustin Pop
1672 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1673 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
1674 85f03e0d Michael Hanselmann
    """Create and store a new job.
1675 f1da30e6 Michael Hanselmann

1676 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
1677 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
1678 c3f0a12f Iustin Pop

1679 009e73d0 Iustin Pop
    @type job_id: job ID
1680 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
1681 c3f0a12f Iustin Pop
    @type ops: list
1682 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
1683 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
1684 7beb1e53 Guido Trotter
    @return: the job object to be queued
1685 7beb1e53 Guido Trotter
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1686 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1687 e71c8147 Michael Hanselmann
    @raise errors.GenericError: If an opcode is not valid
1688 c3f0a12f Iustin Pop

1689 c3f0a12f Iustin Pop
    """
1690 20571a26 Guido Trotter
    # Ok when sharing the big job queue lock, as the drain file is created when
1691 20571a26 Guido Trotter
    # the lock is exclusive.
1692 20571a26 Guido Trotter
    if self._drained:
1693 2971c913 Iustin Pop
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1694 f87b405e Michael Hanselmann
1695 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1696 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
1697 f87b405e Michael Hanselmann
1698 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
1699 f1da30e6 Michael Hanselmann
1700 e71c8147 Michael Hanselmann
    # Check priority
1701 e71c8147 Michael Hanselmann
    for idx, op in enumerate(job.ops):
1702 e71c8147 Michael Hanselmann
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1703 e71c8147 Michael Hanselmann
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1704 e71c8147 Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1705 e71c8147 Michael Hanselmann
                                  " are %s" % (idx, op.priority, allowed))
1706 e71c8147 Michael Hanselmann
1707 f1da30e6 Michael Hanselmann
    # Write to disk
1708 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
1709 f1da30e6 Michael Hanselmann
1710 20571a26 Guido Trotter
    self._queue_size += 1
1711 20571a26 Guido Trotter
1712 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
1713 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
1714 ac0930b9 Iustin Pop
1715 7beb1e53 Guido Trotter
    return job
1716 f1da30e6 Michael Hanselmann
1717 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1718 2971c913 Iustin Pop
  @_RequireOpenQueue
1719 2971c913 Iustin Pop
  def SubmitJob(self, ops):
1720 2971c913 Iustin Pop
    """Create and store a new job.
1721 2971c913 Iustin Pop

1722 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1723 2971c913 Iustin Pop

1724 2971c913 Iustin Pop
    """
1725 009e73d0 Iustin Pop
    job_id = self._NewSerialsUnlocked(1)[0]
1726 7b5c4a69 Michael Hanselmann
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1727 7beb1e53 Guido Trotter
    return job_id
1728 2971c913 Iustin Pop
1729 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1730 2971c913 Iustin Pop
  @_RequireOpenQueue
1731 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
1732 2971c913 Iustin Pop
    """Create and store multiple jobs.
1733 2971c913 Iustin Pop

1734 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1735 2971c913 Iustin Pop

1736 2971c913 Iustin Pop
    """
1737 2971c913 Iustin Pop
    results = []
1738 7b5c4a69 Michael Hanselmann
    added_jobs = []
1739 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1740 009e73d0 Iustin Pop
    for job_id, ops in zip(all_job_ids, jobs):
1741 2971c913 Iustin Pop
      try:
1742 7b5c4a69 Michael Hanselmann
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1743 2971c913 Iustin Pop
        status = True
1744 7beb1e53 Guido Trotter
        data = job_id
1745 2971c913 Iustin Pop
      except errors.GenericError, err:
1746 2971c913 Iustin Pop
        data = str(err)
1747 2971c913 Iustin Pop
        status = False
1748 2971c913 Iustin Pop
      results.append((status, data))
1749 7b5c4a69 Michael Hanselmann
1750 7b5c4a69 Michael Hanselmann
    self._EnqueueJobs(added_jobs)
1751 2971c913 Iustin Pop
1752 2971c913 Iustin Pop
    return results
1753 2971c913 Iustin Pop
1754 7b5c4a69 Michael Hanselmann
  def _EnqueueJobs(self, jobs):
1755 7b5c4a69 Michael Hanselmann
    """Helper function to add jobs to worker pool's queue.
1756 7b5c4a69 Michael Hanselmann

1757 7b5c4a69 Michael Hanselmann
    @type jobs: list
1758 7b5c4a69 Michael Hanselmann
    @param jobs: List of all jobs
1759 7b5c4a69 Michael Hanselmann

1760 7b5c4a69 Michael Hanselmann
    """
1761 7b5c4a69 Michael Hanselmann
    self._wpool.AddManyTasks([(job, ) for job in jobs],
1762 7b5c4a69 Michael Hanselmann
                             priority=[job.CalcPriority() for job in jobs])
1763 7b5c4a69 Michael Hanselmann
1764 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1765 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
1766 ea03467c Iustin Pop
    """Update a job's on disk storage.
1767 ea03467c Iustin Pop

1768 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
1769 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
1770 ea03467c Iustin Pop
    nodes.
1771 ea03467c Iustin Pop

1772 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1773 ea03467c Iustin Pop
    @param job: the changed job
1774 4c36bdf5 Guido Trotter
    @type replicate: boolean
1775 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
1776 ea03467c Iustin Pop

1777 ea03467c Iustin Pop
    """
1778 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
1779 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
1780 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
1781 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
1782 ac0930b9 Iustin Pop
1783 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1784 5c735209 Iustin Pop
                        timeout):
1785 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
1786 6c5a7090 Michael Hanselmann

1787 6c5a7090 Michael Hanselmann
    @type job_id: string
1788 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
1789 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1790 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1791 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1792 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1793 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1794 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1795 5c735209 Iustin Pop
    @type timeout: float
1796 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
1797 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1798 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1799 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1800 ea03467c Iustin Pop

1801 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1802 ea03467c Iustin Pop
        we instead return a special value,
1803 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1804 ea03467c Iustin Pop
        as such by the clients
1805 6c5a7090 Michael Hanselmann

1806 6c5a7090 Michael Hanselmann
    """
1807 989a8bee Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1808 989a8bee Michael Hanselmann
1809 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
1810 989a8bee Michael Hanselmann
1811 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
1812 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
1813 dfe57c22 Michael Hanselmann
1814 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1815 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1816 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1817 188c5e0a Michael Hanselmann
    """Cancels a job.
1818 188c5e0a Michael Hanselmann

1819 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1820 ea03467c Iustin Pop

1821 188c5e0a Michael Hanselmann
    @type job_id: string
1822 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1823 188c5e0a Michael Hanselmann

1824 188c5e0a Michael Hanselmann
    """
1825 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1826 188c5e0a Michael Hanselmann
1827 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1828 188c5e0a Michael Hanselmann
    if not job:
1829 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1830 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1831 fbf0262f Michael Hanselmann
1832 099b2870 Michael Hanselmann
    (success, msg) = job.Cancel()
1833 188c5e0a Michael Hanselmann
1834 099b2870 Michael Hanselmann
    if success:
1835 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
1836 fbf0262f Michael Hanselmann
1837 099b2870 Michael Hanselmann
    return (success, msg)
1838 fbf0262f Michael Hanselmann
1839 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1840 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
1841 d7fd1f28 Michael Hanselmann
    """Archives jobs.
1842 c609f802 Michael Hanselmann

1843 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
1844 25e7b43f Iustin Pop
    @param jobs: Job objects
1845 d7fd1f28 Michael Hanselmann
    @rtype: int
1846 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
1847 c609f802 Michael Hanselmann

1848 c609f802 Michael Hanselmann
    """
1849 d7fd1f28 Michael Hanselmann
    archive_jobs = []
1850 d7fd1f28 Michael Hanselmann
    rename_files = []
1851 d7fd1f28 Michael Hanselmann
    for job in jobs:
1852 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1853 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
1854 d7fd1f28 Michael Hanselmann
        continue
1855 c609f802 Michael Hanselmann
1856 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
1857 c609f802 Michael Hanselmann
1858 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
1859 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
1860 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
1861 c609f802 Michael Hanselmann
1862 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
1863 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
1864 f1da30e6 Michael Hanselmann
1865 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
1866 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
1867 d7fd1f28 Michael Hanselmann
1868 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1869 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
1870 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
1871 20571a26 Guido Trotter
    # archived jobs to fix this.
1872 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1873 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
1874 78d12585 Michael Hanselmann
1875 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1876 07cd723a Iustin Pop
  @_RequireOpenQueue
1877 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1878 07cd723a Iustin Pop
    """Archives a job.
1879 07cd723a Iustin Pop

1880 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1881 ea03467c Iustin Pop

1882 07cd723a Iustin Pop
    @type job_id: string
1883 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1884 78d12585 Michael Hanselmann
    @rtype: bool
1885 78d12585 Michael Hanselmann
    @return: Whether job was archived
1886 07cd723a Iustin Pop

1887 07cd723a Iustin Pop
    """
1888 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
1889 78d12585 Michael Hanselmann
1890 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1891 78d12585 Michael Hanselmann
    if not job:
1892 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1893 78d12585 Michael Hanselmann
      return False
1894 78d12585 Michael Hanselmann
1895 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
1896 07cd723a Iustin Pop
1897 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1898 07cd723a Iustin Pop
  @_RequireOpenQueue
1899 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
1900 07cd723a Iustin Pop
    """Archives all jobs based on age.
1901 07cd723a Iustin Pop

1902 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1903 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1904 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1905 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1906 07cd723a Iustin Pop

1907 07cd723a Iustin Pop
    @type age: int
1908 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1909 07cd723a Iustin Pop

1910 07cd723a Iustin Pop
    """
1911 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1912 07cd723a Iustin Pop
1913 07cd723a Iustin Pop
    now = time.time()
1914 f8ad5591 Michael Hanselmann
    end_time = now + timeout
1915 f8ad5591 Michael Hanselmann
    archived_count = 0
1916 f8ad5591 Michael Hanselmann
    last_touched = 0
1917 f8ad5591 Michael Hanselmann
1918 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
1919 d7fd1f28 Michael Hanselmann
    pending = []
1920 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1921 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
1922 f8ad5591 Michael Hanselmann
1923 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
1924 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
1925 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
1926 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
1927 f8ad5591 Michael Hanselmann
        break
1928 f8ad5591 Michael Hanselmann
1929 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
1930 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1931 f8ad5591 Michael Hanselmann
      if job:
1932 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
1933 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
1934 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
1935 f8ad5591 Michael Hanselmann
          else:
1936 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
1937 07cd723a Iustin Pop
        else:
1938 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
1939 f8ad5591 Michael Hanselmann
1940 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
1941 d7fd1f28 Michael Hanselmann
          pending.append(job)
1942 d7fd1f28 Michael Hanselmann
1943 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
1944 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
1945 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
1946 d7fd1f28 Michael Hanselmann
            pending = []
1947 f8ad5591 Michael Hanselmann
1948 d7fd1f28 Michael Hanselmann
    if pending:
1949 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
1950 07cd723a Iustin Pop
1951 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
1952 07cd723a Iustin Pop
1953 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1954 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1955 e2715f69 Michael Hanselmann

1956 ea03467c Iustin Pop
    @type job_ids: list
1957 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1958 ea03467c Iustin Pop
    @type fields: list
1959 ea03467c Iustin Pop
    @param fields: names of fields to return
1960 ea03467c Iustin Pop
    @rtype: list
1961 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1962 ea03467c Iustin Pop
        the requested fields
1963 e2715f69 Michael Hanselmann

1964 e2715f69 Michael Hanselmann
    """
1965 85f03e0d Michael Hanselmann
    jobs = []
1966 9f7b4967 Guido Trotter
    list_all = False
1967 9f7b4967 Guido Trotter
    if not job_ids:
1968 9f7b4967 Guido Trotter
      # Since files are added to/removed from the queue atomically, there's no
1969 9f7b4967 Guido Trotter
      # risk of getting the job ids in an inconsistent state.
1970 9f7b4967 Guido Trotter
      job_ids = self._GetJobIDsUnlocked()
1971 9f7b4967 Guido Trotter
      list_all = True
1972 e2715f69 Michael Hanselmann
1973 9f7b4967 Guido Trotter
    for job_id in job_ids:
1974 9f7b4967 Guido Trotter
      job = self.SafeLoadJobFromDisk(job_id)
1975 9f7b4967 Guido Trotter
      if job is not None:
1976 6a290889 Guido Trotter
        jobs.append(job.GetInfo(fields))
1977 9f7b4967 Guido Trotter
      elif not list_all:
1978 9f7b4967 Guido Trotter
        jobs.append(None)
1979 e2715f69 Michael Hanselmann
1980 85f03e0d Michael Hanselmann
    return jobs
1981 e2715f69 Michael Hanselmann
1982 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1983 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1984 e2715f69 Michael Hanselmann
  def Shutdown(self):
1985 e2715f69 Michael Hanselmann
    """Stops the job queue.
1986 e2715f69 Michael Hanselmann

1987 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1988 ea03467c Iustin Pop

1989 e2715f69 Michael Hanselmann
    """
1990 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1991 85f03e0d Michael Hanselmann
1992 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
1993 a71f9c7d Guido Trotter
    self._queue_filelock = None