Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ a0d2fe2c

History | View | Annotate | Download (50.5 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 7f93570a Iustin Pop
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 f1da30e6 Michael Hanselmann
import errno
35 f1da30e6 Michael Hanselmann
import re
36 f1048938 Iustin Pop
import time
37 5685c1a5 Michael Hanselmann
import weakref
38 498ae1cc Iustin Pop
39 6c2549d6 Guido Trotter
try:
40 6c2549d6 Guido Trotter
  # pylint: disable-msg=E0611
41 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
42 6c2549d6 Guido Trotter
except ImportError:
43 6c2549d6 Guido Trotter
  import pyinotify
44 6c2549d6 Guido Trotter
45 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
46 e2715f69 Michael Hanselmann
from ganeti import constants
47 f1da30e6 Michael Hanselmann
from ganeti import serializer
48 e2715f69 Michael Hanselmann
from ganeti import workerpool
49 99bd4f0a Guido Trotter
from ganeti import locking
50 f1da30e6 Michael Hanselmann
from ganeti import opcodes
51 7a1ecaed Iustin Pop
from ganeti import errors
52 e2715f69 Michael Hanselmann
from ganeti import mcpu
53 7996a135 Iustin Pop
from ganeti import utils
54 04ab05ce Michael Hanselmann
from ganeti import jstore
55 c3f0a12f Iustin Pop
from ganeti import rpc
56 82b22e19 René Nussbaumer
from ganeti import runtime
57 a744b676 Manuel Franceschini
from ganeti import netutils
58 989a8bee Michael Hanselmann
from ganeti import compat
59 e2715f69 Michael Hanselmann
60 fbf0262f Michael Hanselmann
61 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
62 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
63 e2715f69 Michael Hanselmann
64 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
65 ebb80afa Guido Trotter
_LOCK = "_lock"
66 ebb80afa Guido Trotter
_QUEUE = "_queue"
67 99bd4f0a Guido Trotter
68 498ae1cc Iustin Pop
69 9728ae5d Iustin Pop
class CancelJob(Exception):
70 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
71 fbf0262f Michael Hanselmann

72 fbf0262f Michael Hanselmann
  """
73 fbf0262f Michael Hanselmann
74 fbf0262f Michael Hanselmann
75 70552c46 Michael Hanselmann
def TimeStampNow():
76 ea03467c Iustin Pop
  """Returns the current timestamp.
77 ea03467c Iustin Pop

78 ea03467c Iustin Pop
  @rtype: tuple
79 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
80 ea03467c Iustin Pop

81 ea03467c Iustin Pop
  """
82 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
83 70552c46 Michael Hanselmann
84 70552c46 Michael Hanselmann
85 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
86 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
87 e2715f69 Michael Hanselmann

88 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
89 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
90 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
91 ea03467c Iustin Pop
  @ivar status: the current status
92 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
93 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
94 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
96 f1048938 Iustin Pop

97 e2715f69 Michael Hanselmann
  """
98 8f5c488d Michael Hanselmann
  __slots__ = ["input", "status", "result", "log", "priority",
99 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
100 66d895a8 Iustin Pop
               "__weakref__"]
101 66d895a8 Iustin Pop
102 85f03e0d Michael Hanselmann
  def __init__(self, op):
103 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
104 ea03467c Iustin Pop

105 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
106 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
107 ea03467c Iustin Pop

108 ea03467c Iustin Pop
    """
109 85f03e0d Michael Hanselmann
    self.input = op
110 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
111 85f03e0d Michael Hanselmann
    self.result = None
112 85f03e0d Michael Hanselmann
    self.log = []
113 70552c46 Michael Hanselmann
    self.start_timestamp = None
114 b9b5abcb Iustin Pop
    self.exec_timestamp = None
115 70552c46 Michael Hanselmann
    self.end_timestamp = None
116 f1da30e6 Michael Hanselmann
117 8f5c488d Michael Hanselmann
    # Get initial priority (it might change during the lifetime of this opcode)
118 8f5c488d Michael Hanselmann
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119 8f5c488d Michael Hanselmann
120 f1da30e6 Michael Hanselmann
  @classmethod
121 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
122 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
123 ea03467c Iustin Pop

124 ea03467c Iustin Pop
    @type state: dict
125 ea03467c Iustin Pop
    @param state: the serialized state
126 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
127 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
128 ea03467c Iustin Pop

129 ea03467c Iustin Pop
    """
130 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
131 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132 85f03e0d Michael Hanselmann
    obj.status = state["status"]
133 85f03e0d Michael Hanselmann
    obj.result = state["result"]
134 85f03e0d Michael Hanselmann
    obj.log = state["log"]
135 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
136 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
137 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
138 8f5c488d Michael Hanselmann
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139 f1da30e6 Michael Hanselmann
    return obj
140 f1da30e6 Michael Hanselmann
141 f1da30e6 Michael Hanselmann
  def Serialize(self):
142 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
143 ea03467c Iustin Pop

144 ea03467c Iustin Pop
    @rtype: dict
145 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
146 ea03467c Iustin Pop

147 ea03467c Iustin Pop
    """
148 6c5a7090 Michael Hanselmann
    return {
149 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
150 6c5a7090 Michael Hanselmann
      "status": self.status,
151 6c5a7090 Michael Hanselmann
      "result": self.result,
152 6c5a7090 Michael Hanselmann
      "log": self.log,
153 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
154 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
155 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
156 8f5c488d Michael Hanselmann
      "priority": self.priority,
157 6c5a7090 Michael Hanselmann
      }
158 f1048938 Iustin Pop
159 e2715f69 Michael Hanselmann
160 e2715f69 Michael Hanselmann
class _QueuedJob(object):
161 e2715f69 Michael Hanselmann
  """In-memory job representation.
162 e2715f69 Michael Hanselmann

163 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
164 ea03467c Iustin Pop
  be taken care of by users of this class.
165 ea03467c Iustin Pop

166 ea03467c Iustin Pop
  @type queue: L{JobQueue}
167 ea03467c Iustin Pop
  @ivar queue: the parent queue
168 ea03467c Iustin Pop
  @ivar id: the job ID
169 ea03467c Iustin Pop
  @type ops: list
170 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
171 ea03467c Iustin Pop
  @type log_serial: int
172 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
173 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
174 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
175 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
176 e2715f69 Michael Hanselmann

177 e2715f69 Michael Hanselmann
  """
178 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
179 d25c1d6a Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial",
180 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
181 66d895a8 Iustin Pop
               "__weakref__"]
182 66d895a8 Iustin Pop
183 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
184 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
185 ea03467c Iustin Pop

186 ea03467c Iustin Pop
    @type queue: L{JobQueue}
187 ea03467c Iustin Pop
    @param queue: our parent queue
188 ea03467c Iustin Pop
    @type job_id: job_id
189 ea03467c Iustin Pop
    @param job_id: our job id
190 ea03467c Iustin Pop
    @type ops: list
191 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
192 ea03467c Iustin Pop
        in _QueuedOpCodes
193 ea03467c Iustin Pop

194 ea03467c Iustin Pop
    """
195 e2715f69 Michael Hanselmann
    if not ops:
196 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
197 e2715f69 Michael Hanselmann
198 85f03e0d Michael Hanselmann
    self.queue = queue
199 f1da30e6 Michael Hanselmann
    self.id = job_id
200 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
201 6c5a7090 Michael Hanselmann
    self.log_serial = 0
202 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
203 c56ec146 Iustin Pop
    self.start_timestamp = None
204 c56ec146 Iustin Pop
    self.end_timestamp = None
205 6c5a7090 Michael Hanselmann
206 9fa2e150 Michael Hanselmann
  def __repr__(self):
207 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
208 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
209 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
210 9fa2e150 Michael Hanselmann
211 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
212 9fa2e150 Michael Hanselmann
213 f1da30e6 Michael Hanselmann
  @classmethod
214 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
215 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
216 ea03467c Iustin Pop

217 ea03467c Iustin Pop
    @type queue: L{JobQueue}
218 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
219 ea03467c Iustin Pop
    @type state: dict
220 ea03467c Iustin Pop
    @param state: the serialized state
221 ea03467c Iustin Pop
    @rtype: _JobQueue
222 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
223 ea03467c Iustin Pop

224 ea03467c Iustin Pop
    """
225 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
226 85f03e0d Michael Hanselmann
    obj.queue = queue
227 85f03e0d Michael Hanselmann
    obj.id = state["id"]
228 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
229 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
230 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
231 6c5a7090 Michael Hanselmann
232 6c5a7090 Michael Hanselmann
    obj.ops = []
233 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
234 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
235 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
236 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
237 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
238 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
239 6c5a7090 Michael Hanselmann
240 f1da30e6 Michael Hanselmann
    return obj
241 f1da30e6 Michael Hanselmann
242 f1da30e6 Michael Hanselmann
  def Serialize(self):
243 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
244 ea03467c Iustin Pop

245 ea03467c Iustin Pop
    @rtype: dict
246 ea03467c Iustin Pop
    @return: the serialized state
247 ea03467c Iustin Pop

248 ea03467c Iustin Pop
    """
249 f1da30e6 Michael Hanselmann
    return {
250 f1da30e6 Michael Hanselmann
      "id": self.id,
251 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
252 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
253 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
254 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
255 f1da30e6 Michael Hanselmann
      }
256 f1da30e6 Michael Hanselmann
257 85f03e0d Michael Hanselmann
  def CalcStatus(self):
258 ea03467c Iustin Pop
    """Compute the status of this job.
259 ea03467c Iustin Pop

260 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
261 ea03467c Iustin Pop
    based on their status, computes the job status.
262 ea03467c Iustin Pop

263 ea03467c Iustin Pop
    The algorithm is:
264 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
265 ea03467c Iustin Pop
        status will be the same
266 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
267 ea03467c Iustin Pop
          - waitlock
268 fbf0262f Michael Hanselmann
          - canceling
269 ea03467c Iustin Pop
          - running
270 ea03467c Iustin Pop

271 ea03467c Iustin Pop
        will determine the job status
272 ea03467c Iustin Pop

273 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
274 ea03467c Iustin Pop
        and the job status will be the same
275 ea03467c Iustin Pop

276 ea03467c Iustin Pop
    @return: the job status
277 ea03467c Iustin Pop

278 ea03467c Iustin Pop
    """
279 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
280 e2715f69 Michael Hanselmann
281 e2715f69 Michael Hanselmann
    all_success = True
282 85f03e0d Michael Hanselmann
    for op in self.ops:
283 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
284 e2715f69 Michael Hanselmann
        continue
285 e2715f69 Michael Hanselmann
286 e2715f69 Michael Hanselmann
      all_success = False
287 e2715f69 Michael Hanselmann
288 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
289 e2715f69 Michael Hanselmann
        pass
290 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
291 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
292 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
293 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
294 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
295 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
296 fbf0262f Michael Hanselmann
        break
297 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
298 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
299 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
300 f1da30e6 Michael Hanselmann
        break
301 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
302 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
303 4cb1d919 Michael Hanselmann
        break
304 e2715f69 Michael Hanselmann
305 e2715f69 Michael Hanselmann
    if all_success:
306 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
307 e2715f69 Michael Hanselmann
308 e2715f69 Michael Hanselmann
    return status
309 e2715f69 Michael Hanselmann
310 8f5c488d Michael Hanselmann
  def CalcPriority(self):
311 8f5c488d Michael Hanselmann
    """Gets the current priority for this job.
312 8f5c488d Michael Hanselmann

313 8f5c488d Michael Hanselmann
    Only unfinished opcodes are considered. When all are done, the default
314 8f5c488d Michael Hanselmann
    priority is used.
315 8f5c488d Michael Hanselmann

316 8f5c488d Michael Hanselmann
    @rtype: int
317 8f5c488d Michael Hanselmann

318 8f5c488d Michael Hanselmann
    """
319 8f5c488d Michael Hanselmann
    priorities = [op.priority for op in self.ops
320 8f5c488d Michael Hanselmann
                  if op.status not in constants.OPS_FINALIZED]
321 8f5c488d Michael Hanselmann
322 8f5c488d Michael Hanselmann
    if not priorities:
323 8f5c488d Michael Hanselmann
      # All opcodes are done, assume default priority
324 8f5c488d Michael Hanselmann
      return constants.OP_PRIO_DEFAULT
325 8f5c488d Michael Hanselmann
326 8f5c488d Michael Hanselmann
    return min(priorities)
327 8f5c488d Michael Hanselmann
328 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
329 ea03467c Iustin Pop
    """Selectively returns the log entries.
330 ea03467c Iustin Pop

331 ea03467c Iustin Pop
    @type newer_than: None or int
332 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
333 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
334 ea03467c Iustin Pop
        than this value
335 ea03467c Iustin Pop
    @rtype: list
336 ea03467c Iustin Pop
    @return: the list of the log entries selected
337 ea03467c Iustin Pop

338 ea03467c Iustin Pop
    """
339 6c5a7090 Michael Hanselmann
    if newer_than is None:
340 6c5a7090 Michael Hanselmann
      serial = -1
341 6c5a7090 Michael Hanselmann
    else:
342 6c5a7090 Michael Hanselmann
      serial = newer_than
343 6c5a7090 Michael Hanselmann
344 6c5a7090 Michael Hanselmann
    entries = []
345 6c5a7090 Michael Hanselmann
    for op in self.ops:
346 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
347 6c5a7090 Michael Hanselmann
348 6c5a7090 Michael Hanselmann
    return entries
349 6c5a7090 Michael Hanselmann
350 6a290889 Guido Trotter
  def GetInfo(self, fields):
351 6a290889 Guido Trotter
    """Returns information about a job.
352 6a290889 Guido Trotter

353 6a290889 Guido Trotter
    @type fields: list
354 6a290889 Guido Trotter
    @param fields: names of fields to return
355 6a290889 Guido Trotter
    @rtype: list
356 6a290889 Guido Trotter
    @return: list with one element for each field
357 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
358 6a290889 Guido Trotter
        has been passed
359 6a290889 Guido Trotter

360 6a290889 Guido Trotter
    """
361 6a290889 Guido Trotter
    row = []
362 6a290889 Guido Trotter
    for fname in fields:
363 6a290889 Guido Trotter
      if fname == "id":
364 6a290889 Guido Trotter
        row.append(self.id)
365 6a290889 Guido Trotter
      elif fname == "status":
366 6a290889 Guido Trotter
        row.append(self.CalcStatus())
367 6a290889 Guido Trotter
      elif fname == "ops":
368 6a290889 Guido Trotter
        row.append([op.input.__getstate__() for op in self.ops])
369 6a290889 Guido Trotter
      elif fname == "opresult":
370 6a290889 Guido Trotter
        row.append([op.result for op in self.ops])
371 6a290889 Guido Trotter
      elif fname == "opstatus":
372 6a290889 Guido Trotter
        row.append([op.status for op in self.ops])
373 6a290889 Guido Trotter
      elif fname == "oplog":
374 6a290889 Guido Trotter
        row.append([op.log for op in self.ops])
375 6a290889 Guido Trotter
      elif fname == "opstart":
376 6a290889 Guido Trotter
        row.append([op.start_timestamp for op in self.ops])
377 6a290889 Guido Trotter
      elif fname == "opexec":
378 6a290889 Guido Trotter
        row.append([op.exec_timestamp for op in self.ops])
379 6a290889 Guido Trotter
      elif fname == "opend":
380 6a290889 Guido Trotter
        row.append([op.end_timestamp for op in self.ops])
381 6a290889 Guido Trotter
      elif fname == "received_ts":
382 6a290889 Guido Trotter
        row.append(self.received_timestamp)
383 6a290889 Guido Trotter
      elif fname == "start_ts":
384 6a290889 Guido Trotter
        row.append(self.start_timestamp)
385 6a290889 Guido Trotter
      elif fname == "end_ts":
386 6a290889 Guido Trotter
        row.append(self.end_timestamp)
387 6a290889 Guido Trotter
      elif fname == "summary":
388 6a290889 Guido Trotter
        row.append([op.input.Summary() for op in self.ops])
389 6a290889 Guido Trotter
      else:
390 6a290889 Guido Trotter
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
391 6a290889 Guido Trotter
    return row
392 6a290889 Guido Trotter
393 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
394 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
395 34327f51 Iustin Pop

396 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
397 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
398 34327f51 Iustin Pop
    finalised are not changed.
399 34327f51 Iustin Pop

400 34327f51 Iustin Pop
    @param status: a given opcode status
401 34327f51 Iustin Pop
    @param result: the opcode result
402 34327f51 Iustin Pop

403 34327f51 Iustin Pop
    """
404 747f6113 Michael Hanselmann
    not_marked = True
405 747f6113 Michael Hanselmann
    for op in self.ops:
406 747f6113 Michael Hanselmann
      if op.status in constants.OPS_FINALIZED:
407 747f6113 Michael Hanselmann
        assert not_marked, "Finalized opcodes found after non-finalized ones"
408 747f6113 Michael Hanselmann
        continue
409 747f6113 Michael Hanselmann
      op.status = status
410 747f6113 Michael Hanselmann
      op.result = result
411 747f6113 Michael Hanselmann
      not_marked = False
412 34327f51 Iustin Pop
413 099b2870 Michael Hanselmann
  def Cancel(self):
414 a0d2fe2c Michael Hanselmann
    """Marks job as canceled/-ing if possible.
415 a0d2fe2c Michael Hanselmann

416 a0d2fe2c Michael Hanselmann
    @rtype: tuple; (bool, string)
417 a0d2fe2c Michael Hanselmann
    @return: Boolean describing whether job was successfully canceled or marked
418 a0d2fe2c Michael Hanselmann
      as canceling and a text message
419 a0d2fe2c Michael Hanselmann

420 a0d2fe2c Michael Hanselmann
    """
421 099b2870 Michael Hanselmann
    status = self.CalcStatus()
422 099b2870 Michael Hanselmann
423 099b2870 Michael Hanselmann
    if status not in (constants.JOB_STATUS_QUEUED,
424 099b2870 Michael Hanselmann
                      constants.JOB_STATUS_WAITLOCK):
425 099b2870 Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", self.id)
426 099b2870 Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % self.id)
427 099b2870 Michael Hanselmann
428 099b2870 Michael Hanselmann
    if status == constants.JOB_STATUS_QUEUED:
429 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
430 099b2870 Michael Hanselmann
                             "Job canceled by request")
431 099b2870 Michael Hanselmann
      msg = "Job %s canceled" % self.id
432 099b2870 Michael Hanselmann
433 099b2870 Michael Hanselmann
    elif status == constants.JOB_STATUS_WAITLOCK:
434 099b2870 Michael Hanselmann
      # The worker will notice the new status and cancel the job
435 099b2870 Michael Hanselmann
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
436 099b2870 Michael Hanselmann
      msg = "Job %s will be canceled" % self.id
437 099b2870 Michael Hanselmann
438 099b2870 Michael Hanselmann
    return (True, msg)
439 099b2870 Michael Hanselmann
440 f1048938 Iustin Pop
441 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
442 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
443 031a3e57 Michael Hanselmann
    """Initializes this class.
444 ea03467c Iustin Pop

445 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
446 031a3e57 Michael Hanselmann
    @param queue: Job queue
447 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
448 031a3e57 Michael Hanselmann
    @param job: Job object
449 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
450 031a3e57 Michael Hanselmann
    @param op: OpCode
451 031a3e57 Michael Hanselmann

452 031a3e57 Michael Hanselmann
    """
453 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
454 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
455 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
456 031a3e57 Michael Hanselmann
457 031a3e57 Michael Hanselmann
    self._queue = queue
458 031a3e57 Michael Hanselmann
    self._job = job
459 031a3e57 Michael Hanselmann
    self._op = op
460 031a3e57 Michael Hanselmann
461 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
462 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
463 dc1e2262 Michael Hanselmann

464 dc1e2262 Michael Hanselmann
    """
465 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
466 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
467 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
468 dc1e2262 Michael Hanselmann
      raise CancelJob()
469 dc1e2262 Michael Hanselmann
470 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
471 031a3e57 Michael Hanselmann
  def NotifyStart(self):
472 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
473 e92376d7 Iustin Pop

474 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
475 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
476 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
477 031a3e57 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
478 e92376d7 Iustin Pop

479 e92376d7 Iustin Pop
    """
480 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
481 271daef8 Iustin Pop
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
482 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
483 fbf0262f Michael Hanselmann
484 271daef8 Iustin Pop
    # Cancel here if we were asked to
485 dc1e2262 Michael Hanselmann
    self._CheckCancel()
486 fbf0262f Michael Hanselmann
487 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
488 9bdab621 Michael Hanselmann
489 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
490 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
491 271daef8 Iustin Pop
492 271daef8 Iustin Pop
    # And finally replicate the job status
493 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
494 031a3e57 Michael Hanselmann
495 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
496 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
497 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
498 9bf5e01f Guido Trotter

499 9bf5e01f Guido Trotter
    """
500 9bf5e01f Guido Trotter
    self._job.log_serial += 1
501 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
502 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
503 9bf5e01f Guido Trotter
504 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
505 031a3e57 Michael Hanselmann
    """Append a log entry.
506 031a3e57 Michael Hanselmann

507 031a3e57 Michael Hanselmann
    """
508 031a3e57 Michael Hanselmann
    assert len(args) < 3
509 031a3e57 Michael Hanselmann
510 031a3e57 Michael Hanselmann
    if len(args) == 1:
511 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
512 031a3e57 Michael Hanselmann
      log_msg = args[0]
513 031a3e57 Michael Hanselmann
    else:
514 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
515 031a3e57 Michael Hanselmann
516 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
517 031a3e57 Michael Hanselmann
    # precision.
518 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
519 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
520 031a3e57 Michael Hanselmann
521 acf931b7 Michael Hanselmann
  def CheckCancel(self):
522 acf931b7 Michael Hanselmann
    """Check whether job has been cancelled.
523 ef2df7d3 Michael Hanselmann

524 ef2df7d3 Michael Hanselmann
    """
525 dc1e2262 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
526 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
527 dc1e2262 Michael Hanselmann
528 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
529 dc1e2262 Michael Hanselmann
    self._CheckCancel()
530 dc1e2262 Michael Hanselmann
531 031a3e57 Michael Hanselmann
532 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
533 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
534 989a8bee Michael Hanselmann
    """Initializes this class.
535 6c2549d6 Guido Trotter

536 989a8bee Michael Hanselmann
    @type fields: list of strings
537 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
538 989a8bee Michael Hanselmann
    @type prev_job_info: string
539 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
540 989a8bee Michael Hanselmann
    @type prev_log_serial: string
541 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
542 6c2549d6 Guido Trotter

543 989a8bee Michael Hanselmann
    """
544 989a8bee Michael Hanselmann
    self._fields = fields
545 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
546 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
547 6c2549d6 Guido Trotter
548 989a8bee Michael Hanselmann
  def __call__(self, job):
549 989a8bee Michael Hanselmann
    """Checks whether job has changed.
550 6c2549d6 Guido Trotter

551 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
552 989a8bee Michael Hanselmann
    @param job: Job object
553 6c2549d6 Guido Trotter

554 6c2549d6 Guido Trotter
    """
555 989a8bee Michael Hanselmann
    status = job.CalcStatus()
556 989a8bee Michael Hanselmann
    job_info = job.GetInfo(self._fields)
557 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
558 6c2549d6 Guido Trotter
559 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
560 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
561 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
562 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
563 6c2549d6 Guido Trotter
    # significantly different.
564 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
565 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
566 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
567 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
568 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
569 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
570 6c2549d6 Guido Trotter
571 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
572 6c2549d6 Guido Trotter
    # no changes.
573 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
574 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
575 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_WAITLOCK) or
576 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
577 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
578 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
579 989a8bee Michael Hanselmann
      return (job_info, log_entries)
580 6c2549d6 Guido Trotter
581 989a8bee Michael Hanselmann
    return None
582 989a8bee Michael Hanselmann
583 989a8bee Michael Hanselmann
584 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
585 989a8bee Michael Hanselmann
  def __init__(self, filename):
586 989a8bee Michael Hanselmann
    """Initializes this class.
587 989a8bee Michael Hanselmann

588 989a8bee Michael Hanselmann
    @type filename: string
589 989a8bee Michael Hanselmann
    @param filename: Path to job file
590 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
591 6c2549d6 Guido Trotter

592 989a8bee Michael Hanselmann
    """
593 989a8bee Michael Hanselmann
    self._wm = pyinotify.WatchManager()
594 989a8bee Michael Hanselmann
    self._inotify_handler = \
595 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
596 989a8bee Michael Hanselmann
    self._notifier = \
597 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
598 989a8bee Michael Hanselmann
    try:
599 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
600 989a8bee Michael Hanselmann
    except Exception:
601 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
602 989a8bee Michael Hanselmann
      self._notifier.stop()
603 989a8bee Michael Hanselmann
      raise
604 989a8bee Michael Hanselmann
605 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
606 989a8bee Michael Hanselmann
    """Callback for inotify.
607 989a8bee Michael Hanselmann

608 989a8bee Michael Hanselmann
    """
609 6c2549d6 Guido Trotter
    if not notifier_enabled:
610 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
611 989a8bee Michael Hanselmann
612 989a8bee Michael Hanselmann
  def Wait(self, timeout):
613 989a8bee Michael Hanselmann
    """Waits for the job file to change.
614 989a8bee Michael Hanselmann

615 989a8bee Michael Hanselmann
    @type timeout: float
616 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
617 989a8bee Michael Hanselmann
    @return: Whether there have been events
618 989a8bee Michael Hanselmann

619 989a8bee Michael Hanselmann
    """
620 989a8bee Michael Hanselmann
    assert timeout >= 0
621 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
622 989a8bee Michael Hanselmann
    if have_events:
623 989a8bee Michael Hanselmann
      self._notifier.read_events()
624 989a8bee Michael Hanselmann
    self._notifier.process_events()
625 989a8bee Michael Hanselmann
    return have_events
626 989a8bee Michael Hanselmann
627 989a8bee Michael Hanselmann
  def Close(self):
628 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
629 989a8bee Michael Hanselmann

630 989a8bee Michael Hanselmann
    """
631 989a8bee Michael Hanselmann
    self._notifier.stop()
632 989a8bee Michael Hanselmann
633 989a8bee Michael Hanselmann
634 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
635 989a8bee Michael Hanselmann
  def __init__(self, filename):
636 989a8bee Michael Hanselmann
    """Initializes this class.
637 989a8bee Michael Hanselmann

638 989a8bee Michael Hanselmann
    @type filename: string
639 989a8bee Michael Hanselmann
    @param filename: Path to job file
640 989a8bee Michael Hanselmann

641 989a8bee Michael Hanselmann
    """
642 989a8bee Michael Hanselmann
    self._filewaiter = None
643 989a8bee Michael Hanselmann
    self._filename = filename
644 6c2549d6 Guido Trotter
645 989a8bee Michael Hanselmann
  def Wait(self, timeout):
646 989a8bee Michael Hanselmann
    """Waits for a job to change.
647 6c2549d6 Guido Trotter

648 989a8bee Michael Hanselmann
    @type timeout: float
649 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
650 989a8bee Michael Hanselmann
    @return: Whether there have been events
651 989a8bee Michael Hanselmann

652 989a8bee Michael Hanselmann
    """
653 989a8bee Michael Hanselmann
    if self._filewaiter:
654 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
655 989a8bee Michael Hanselmann
656 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
657 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
658 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
659 989a8bee Michael Hanselmann
    # race condition.
660 989a8bee Michael Hanselmann
    self._filewaiter = _JobFileChangesWaiter(self._filename)
661 989a8bee Michael Hanselmann
662 989a8bee Michael Hanselmann
    return True
663 989a8bee Michael Hanselmann
664 989a8bee Michael Hanselmann
  def Close(self):
665 989a8bee Michael Hanselmann
    """Closes underlying waiter.
666 989a8bee Michael Hanselmann

667 989a8bee Michael Hanselmann
    """
668 989a8bee Michael Hanselmann
    if self._filewaiter:
669 989a8bee Michael Hanselmann
      self._filewaiter.Close()
670 989a8bee Michael Hanselmann
671 989a8bee Michael Hanselmann
672 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
673 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
674 989a8bee Michael Hanselmann

675 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
676 989a8bee Michael Hanselmann
  the current job status has changed.
677 989a8bee Michael Hanselmann

678 989a8bee Michael Hanselmann
  """
679 989a8bee Michael Hanselmann
  @staticmethod
680 989a8bee Michael Hanselmann
  def _CheckForChanges(job_load_fn, check_fn):
681 989a8bee Michael Hanselmann
    job = job_load_fn()
682 989a8bee Michael Hanselmann
    if not job:
683 989a8bee Michael Hanselmann
      raise errors.JobLost()
684 989a8bee Michael Hanselmann
685 989a8bee Michael Hanselmann
    result = check_fn(job)
686 989a8bee Michael Hanselmann
    if result is None:
687 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
688 989a8bee Michael Hanselmann
689 989a8bee Michael Hanselmann
    return result
690 989a8bee Michael Hanselmann
691 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
692 989a8bee Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout):
693 989a8bee Michael Hanselmann
    """Waits for changes on a job.
694 989a8bee Michael Hanselmann

695 989a8bee Michael Hanselmann
    @type filename: string
696 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
697 989a8bee Michael Hanselmann
    @type job_load_fn: callable
698 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
699 989a8bee Michael Hanselmann
    @type fields: list of strings
700 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
701 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
702 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
703 989a8bee Michael Hanselmann
    @type prev_log_serial: int
704 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
705 989a8bee Michael Hanselmann
    @type timeout: float
706 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
707 989a8bee Michael Hanselmann

708 989a8bee Michael Hanselmann
    """
709 6c2549d6 Guido Trotter
    try:
710 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
711 989a8bee Michael Hanselmann
      waiter = _JobChangesWaiter(filename)
712 989a8bee Michael Hanselmann
      try:
713 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
714 989a8bee Michael Hanselmann
                                          job_load_fn, check_fn),
715 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
716 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
717 989a8bee Michael Hanselmann
      finally:
718 989a8bee Michael Hanselmann
        waiter.Close()
719 6c2549d6 Guido Trotter
    except (errors.InotifyError, errors.JobLost):
720 6c2549d6 Guido Trotter
      return None
721 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
722 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
723 6c2549d6 Guido Trotter
724 6c2549d6 Guido Trotter
725 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
726 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
727 6760e4ed Michael Hanselmann

728 6760e4ed Michael Hanselmann
  """
729 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
730 6760e4ed Michael Hanselmann
    to_encode = err
731 6760e4ed Michael Hanselmann
  else:
732 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
733 6760e4ed Michael Hanselmann
734 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
735 6760e4ed Michael Hanselmann
736 6760e4ed Michael Hanselmann
737 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
738 031a3e57 Michael Hanselmann
  """The actual job workers.
739 031a3e57 Michael Hanselmann

740 031a3e57 Michael Hanselmann
  """
741 7260cfbe Iustin Pop
  def RunTask(self, job): # pylint: disable-msg=W0221
742 e2715f69 Michael Hanselmann
    """Job executor.
743 e2715f69 Michael Hanselmann

744 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
745 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
746 e2715f69 Michael Hanselmann

747 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
748 ea03467c Iustin Pop
    @param job: the job to be processed
749 ea03467c Iustin Pop

750 e2715f69 Michael Hanselmann
    """
751 daba67c7 Michael Hanselmann
    self.SetTaskName("Job%s" % job.id)
752 daba67c7 Michael Hanselmann
753 02fc74da Michael Hanselmann
    logging.info("Processing job %s", job.id)
754 adfa97e3 Guido Trotter
    proc = mcpu.Processor(self.pool.queue.context, job.id)
755 031a3e57 Michael Hanselmann
    queue = job.queue
756 e2715f69 Michael Hanselmann
    try:
757 85f03e0d Michael Hanselmann
      try:
758 85f03e0d Michael Hanselmann
        count = len(job.ops)
759 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
760 d21d09d6 Iustin Pop
          op_summary = op.input.Summary()
761 f6424741 Iustin Pop
          if op.status == constants.OP_STATUS_SUCCESS:
762 f6424741 Iustin Pop
            # this is a job that was partially completed before master
763 f6424741 Iustin Pop
            # daemon shutdown, so it can be expected that some opcodes
764 f6424741 Iustin Pop
            # are already completed successfully (if any did error
765 f6424741 Iustin Pop
            # out, then the whole job should have been aborted and not
766 f6424741 Iustin Pop
            # resubmitted for processing)
767 f6424741 Iustin Pop
            logging.info("Op %s/%s: opcode %s already processed, skipping",
768 f6424741 Iustin Pop
                         idx + 1, count, op_summary)
769 f6424741 Iustin Pop
            continue
770 85f03e0d Michael Hanselmann
          try:
771 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
772 d21d09d6 Iustin Pop
                         op_summary)
773 85f03e0d Michael Hanselmann
774 3c0d60d0 Guido Trotter
            queue.acquire(shared=1)
775 85f03e0d Michael Hanselmann
            try:
776 df0fb067 Iustin Pop
              if op.status == constants.OP_STATUS_CANCELED:
777 e35344b4 Michael Hanselmann
                logging.debug("Canceling opcode")
778 df0fb067 Iustin Pop
                raise CancelJob()
779 fbf0262f Michael Hanselmann
              assert op.status == constants.OP_STATUS_QUEUED
780 e35344b4 Michael Hanselmann
              logging.debug("Opcode %s/%s waiting for locks",
781 e35344b4 Michael Hanselmann
                            idx + 1, count)
782 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
783 85f03e0d Michael Hanselmann
              op.result = None
784 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
785 c56ec146 Iustin Pop
              if idx == 0: # first opcode
786 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
787 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
788 85f03e0d Michael Hanselmann
789 38206f3c Iustin Pop
              input_opcode = op.input
790 85f03e0d Michael Hanselmann
            finally:
791 85f03e0d Michael Hanselmann
              queue.release()
792 85f03e0d Michael Hanselmann
793 031a3e57 Michael Hanselmann
            # Make sure not to hold queue lock while calling ExecOpCode
794 031a3e57 Michael Hanselmann
            result = proc.ExecOpCode(input_opcode,
795 ef2df7d3 Michael Hanselmann
                                     _OpExecCallbacks(queue, job, op))
796 85f03e0d Michael Hanselmann
797 3c0d60d0 Guido Trotter
            queue.acquire(shared=1)
798 85f03e0d Michael Hanselmann
            try:
799 e35344b4 Michael Hanselmann
              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
800 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
801 85f03e0d Michael Hanselmann
              op.result = result
802 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
803 6ea72e43 Michael Hanselmann
              if idx == count - 1:
804 6ea72e43 Michael Hanselmann
                job.end_timestamp = TimeStampNow()
805 963a068b Michael Hanselmann
806 963a068b Michael Hanselmann
                # Consistency check
807 963a068b Michael Hanselmann
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
808 963a068b Michael Hanselmann
                                  for i in job.ops)
809 963a068b Michael Hanselmann
810 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
811 85f03e0d Michael Hanselmann
            finally:
812 85f03e0d Michael Hanselmann
              queue.release()
813 85f03e0d Michael Hanselmann
814 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Successfully finished opcode %s",
815 d21d09d6 Iustin Pop
                         idx + 1, count, op_summary)
816 fbf0262f Michael Hanselmann
          except CancelJob:
817 fbf0262f Michael Hanselmann
            # Will be handled further up
818 fbf0262f Michael Hanselmann
            raise
819 85f03e0d Michael Hanselmann
          except Exception, err:
820 3c0d60d0 Guido Trotter
            queue.acquire(shared=1)
821 85f03e0d Michael Hanselmann
            try:
822 85f03e0d Michael Hanselmann
              try:
823 e35344b4 Michael Hanselmann
                logging.debug("Opcode %s/%s failed", idx + 1, count)
824 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
825 6760e4ed Michael Hanselmann
                op.result = _EncodeOpError(err)
826 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
827 0f6be82a Iustin Pop
                logging.info("Op %s/%s: Error in opcode %s: %s",
828 0f6be82a Iustin Pop
                             idx + 1, count, op_summary, err)
829 963a068b Michael Hanselmann
830 963a068b Michael Hanselmann
                to_encode = errors.OpExecError("Preceding opcode failed")
831 963a068b Michael Hanselmann
                job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
832 6760e4ed Michael Hanselmann
                                      _EncodeOpError(to_encode))
833 963a068b Michael Hanselmann
834 963a068b Michael Hanselmann
                # Consistency check
835 963a068b Michael Hanselmann
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
836 963a068b Michael Hanselmann
                                  for i in job.ops[:idx])
837 963a068b Michael Hanselmann
                assert compat.all(i.status == constants.OP_STATUS_ERROR and
838 963a068b Michael Hanselmann
                                  errors.GetEncodedError(i.result)
839 963a068b Michael Hanselmann
                                  for i in job.ops[idx:])
840 85f03e0d Michael Hanselmann
              finally:
841 6ea72e43 Michael Hanselmann
                job.end_timestamp = TimeStampNow()
842 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
843 85f03e0d Michael Hanselmann
            finally:
844 85f03e0d Michael Hanselmann
              queue.release()
845 85f03e0d Michael Hanselmann
            raise
846 85f03e0d Michael Hanselmann
847 fbf0262f Michael Hanselmann
      except CancelJob:
848 3c0d60d0 Guido Trotter
        queue.acquire(shared=1)
849 fbf0262f Michael Hanselmann
        try:
850 39ed3a98 Guido Trotter
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
851 39ed3a98 Guido Trotter
                                "Job canceled by request")
852 6ea72e43 Michael Hanselmann
          job.end_timestamp = TimeStampNow()
853 6ea72e43 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
854 fbf0262f Michael Hanselmann
        finally:
855 fbf0262f Michael Hanselmann
          queue.release()
856 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
857 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
858 85f03e0d Michael Hanselmann
      except:
859 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
860 e2715f69 Michael Hanselmann
    finally:
861 6ea72e43 Michael Hanselmann
      status = job.CalcStatus()
862 6ea72e43 Michael Hanselmann
      logging.info("Finished job %s, status = %s", job.id, status)
863 e2715f69 Michael Hanselmann
864 e2715f69 Michael Hanselmann
865 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
866 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
867 ea03467c Iustin Pop

868 ea03467c Iustin Pop
  """
869 5bdce580 Michael Hanselmann
  def __init__(self, queue):
870 89e2b4d2 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
871 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
872 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
873 5bdce580 Michael Hanselmann
    self.queue = queue
874 e2715f69 Michael Hanselmann
875 e2715f69 Michael Hanselmann
876 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
877 6c881c52 Iustin Pop
  """Decorator for "public" functions.
878 ea03467c Iustin Pop

879 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
880 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
881 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
882 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
883 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
884 ea03467c Iustin Pop

885 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
886 f1da30e6 Michael Hanselmann

887 6c881c52 Iustin Pop
  Example::
888 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
889 6c881c52 Iustin Pop
    @_RequireOpenQueue
890 6c881c52 Iustin Pop
    def Example(self):
891 6c881c52 Iustin Pop
      pass
892 db37da70 Michael Hanselmann

893 6c881c52 Iustin Pop
  """
894 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
895 7260cfbe Iustin Pop
    # pylint: disable-msg=W0212
896 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
897 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
898 6c881c52 Iustin Pop
  return wrapper
899 db37da70 Michael Hanselmann
900 db37da70 Michael Hanselmann
901 6c881c52 Iustin Pop
class JobQueue(object):
902 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
903 db37da70 Michael Hanselmann

904 6c881c52 Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
905 6c881c52 Iustin Pop

906 6c881c52 Iustin Pop
  """
907 6c881c52 Iustin Pop
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
908 db37da70 Michael Hanselmann
909 85f03e0d Michael Hanselmann
  def __init__(self, context):
910 ea03467c Iustin Pop
    """Constructor for JobQueue.
911 ea03467c Iustin Pop

912 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
913 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
914 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
915 ea03467c Iustin Pop
    running).
916 ea03467c Iustin Pop

917 ea03467c Iustin Pop
    @type context: GanetiContext
918 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
919 ea03467c Iustin Pop
        data and other ganeti objects
920 ea03467c Iustin Pop

921 ea03467c Iustin Pop
    """
922 5bdce580 Michael Hanselmann
    self.context = context
923 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
924 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
925 f1da30e6 Michael Hanselmann
926 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
927 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
928 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
929 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
930 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
931 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
932 ebb80afa Guido Trotter
933 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
934 ebb80afa Guido Trotter
    self.release = self._lock.release
935 85f03e0d Michael Hanselmann
936 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
937 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
938 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
939 f1da30e6 Michael Hanselmann
940 04ab05ce Michael Hanselmann
    # Read serial file
941 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
942 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
943 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
944 c4beba1c Iustin Pop
945 23752136 Michael Hanselmann
    # Get initial list of nodes
946 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
947 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
948 59303563 Iustin Pop
                       if n.master_candidate)
949 8e00939c Michael Hanselmann
950 8e00939c Michael Hanselmann
    # Remove master node
951 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
952 23752136 Michael Hanselmann
953 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
954 23752136 Michael Hanselmann
955 20571a26 Guido Trotter
    self._queue_size = 0
956 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
957 20571a26 Guido Trotter
    self._drained = self._IsQueueMarkedDrain()
958 20571a26 Guido Trotter
959 85f03e0d Michael Hanselmann
    # Setup worker pool
960 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
961 85f03e0d Michael Hanselmann
    try:
962 de9d02c7 Michael Hanselmann
      self._InspectQueue()
963 de9d02c7 Michael Hanselmann
    except:
964 de9d02c7 Michael Hanselmann
      self._wpool.TerminateWorkers()
965 de9d02c7 Michael Hanselmann
      raise
966 711b5124 Michael Hanselmann
967 de9d02c7 Michael Hanselmann
  @locking.ssynchronized(_LOCK)
968 de9d02c7 Michael Hanselmann
  @_RequireOpenQueue
969 de9d02c7 Michael Hanselmann
  def _InspectQueue(self):
970 de9d02c7 Michael Hanselmann
    """Loads the whole job queue and resumes unfinished jobs.
971 de9d02c7 Michael Hanselmann

972 de9d02c7 Michael Hanselmann
    This function needs the lock here because WorkerPool.AddTask() may start a
973 de9d02c7 Michael Hanselmann
    job while we're still doing our work.
974 711b5124 Michael Hanselmann

975 de9d02c7 Michael Hanselmann
    """
976 de9d02c7 Michael Hanselmann
    logging.info("Inspecting job queue")
977 de9d02c7 Michael Hanselmann
978 de9d02c7 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked()
979 de9d02c7 Michael Hanselmann
    jobs_count = len(all_job_ids)
980 de9d02c7 Michael Hanselmann
    lastinfo = time.time()
981 de9d02c7 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
982 de9d02c7 Michael Hanselmann
      # Give an update every 1000 jobs or 10 seconds
983 de9d02c7 Michael Hanselmann
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
984 de9d02c7 Michael Hanselmann
          idx == (jobs_count - 1)):
985 de9d02c7 Michael Hanselmann
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
986 de9d02c7 Michael Hanselmann
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
987 711b5124 Michael Hanselmann
        lastinfo = time.time()
988 94ed59a5 Iustin Pop
989 de9d02c7 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
990 85f03e0d Michael Hanselmann
991 de9d02c7 Michael Hanselmann
      # a failure in loading the job can cause 'None' to be returned
992 de9d02c7 Michael Hanselmann
      if job is None:
993 de9d02c7 Michael Hanselmann
        continue
994 85f03e0d Michael Hanselmann
995 de9d02c7 Michael Hanselmann
      status = job.CalcStatus()
996 711b5124 Michael Hanselmann
997 5ef699a0 Michael Hanselmann
      if status in (constants.JOB_STATUS_QUEUED, ):
998 de9d02c7 Michael Hanselmann
        self._wpool.AddTask((job, ))
999 de9d02c7 Michael Hanselmann
1000 de9d02c7 Michael Hanselmann
      elif status in (constants.JOB_STATUS_RUNNING,
1001 5ef699a0 Michael Hanselmann
                      constants.JOB_STATUS_WAITLOCK,
1002 de9d02c7 Michael Hanselmann
                      constants.JOB_STATUS_CANCELING):
1003 de9d02c7 Michael Hanselmann
        logging.warning("Unfinished job %s found: %s", job.id, job)
1004 de9d02c7 Michael Hanselmann
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1005 de9d02c7 Michael Hanselmann
                              "Unclean master daemon shutdown")
1006 de9d02c7 Michael Hanselmann
        self.UpdateJobUnlocked(job)
1007 de9d02c7 Michael Hanselmann
1008 de9d02c7 Michael Hanselmann
    logging.info("Job queue inspection finished")
1009 85f03e0d Michael Hanselmann
1010 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1011 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1012 99aabbed Iustin Pop
  def AddNode(self, node):
1013 99aabbed Iustin Pop
    """Register a new node with the queue.
1014 99aabbed Iustin Pop

1015 99aabbed Iustin Pop
    @type node: L{objects.Node}
1016 99aabbed Iustin Pop
    @param node: the node object to be added
1017 99aabbed Iustin Pop

1018 99aabbed Iustin Pop
    """
1019 99aabbed Iustin Pop
    node_name = node.name
1020 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
1021 23752136 Michael Hanselmann
1022 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
1023 c8457ce7 Iustin Pop
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1024 3cebe102 Michael Hanselmann
    msg = result.fail_msg
1025 c8457ce7 Iustin Pop
    if msg:
1026 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1027 c8457ce7 Iustin Pop
                      node_name, msg)
1028 23752136 Michael Hanselmann
1029 59303563 Iustin Pop
    if not node.master_candidate:
1030 59303563 Iustin Pop
      # remove if existing, ignoring errors
1031 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
1032 59303563 Iustin Pop
      # and skip the replication of the job ids
1033 59303563 Iustin Pop
      return
1034 59303563 Iustin Pop
1035 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
1036 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1037 23752136 Michael Hanselmann
1038 d2e03a33 Michael Hanselmann
    # Upload current serial file
1039 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1040 d2e03a33 Michael Hanselmann
1041 d2e03a33 Michael Hanselmann
    for file_name in files:
1042 9f774ee8 Michael Hanselmann
      # Read file content
1043 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
1044 9f774ee8 Michael Hanselmann
1045 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1046 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
1047 a3811745 Michael Hanselmann
                                                  file_name, content)
1048 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1049 c8457ce7 Iustin Pop
      if msg:
1050 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1051 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1052 d2e03a33 Michael Hanselmann
1053 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1054 d2e03a33 Michael Hanselmann
1055 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1056 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1057 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1058 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1059 ea03467c Iustin Pop

1060 ea03467c Iustin Pop
    @type node_name: str
1061 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1062 ea03467c Iustin Pop

1063 ea03467c Iustin Pop
    """
1064 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1065 23752136 Michael Hanselmann
1066 7e950d31 Iustin Pop
  @staticmethod
1067 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1068 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1069 ea03467c Iustin Pop

1070 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1071 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1072 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1073 ea03467c Iustin Pop

1074 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1075 ea03467c Iustin Pop
    @type nodes: list
1076 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1077 ea03467c Iustin Pop
    @type failmsg: str
1078 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1079 ea03467c Iustin Pop

1080 ea03467c Iustin Pop
    """
1081 e74798c1 Michael Hanselmann
    failed = []
1082 e74798c1 Michael Hanselmann
    success = []
1083 e74798c1 Michael Hanselmann
1084 e74798c1 Michael Hanselmann
    for node in nodes:
1085 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1086 c8457ce7 Iustin Pop
      if msg:
1087 e74798c1 Michael Hanselmann
        failed.append(node)
1088 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1089 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1090 c8457ce7 Iustin Pop
      else:
1091 c8457ce7 Iustin Pop
        success.append(node)
1092 e74798c1 Michael Hanselmann
1093 e74798c1 Michael Hanselmann
    # +1 for the master node
1094 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1095 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1096 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1097 e74798c1 Michael Hanselmann
1098 99aabbed Iustin Pop
  def _GetNodeIp(self):
1099 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1100 99aabbed Iustin Pop

1101 ea03467c Iustin Pop
    @rtype: (list, list)
1102 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1103 ea03467c Iustin Pop
        names and the second one with the node addresses
1104 ea03467c Iustin Pop

1105 99aabbed Iustin Pop
    """
1106 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1107 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1108 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1109 99aabbed Iustin Pop
    return name_list, addr_list
1110 99aabbed Iustin Pop
1111 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1112 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1113 8e00939c Michael Hanselmann

1114 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1115 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1116 ea03467c Iustin Pop

1117 ea03467c Iustin Pop
    @type file_name: str
1118 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1119 ea03467c Iustin Pop
    @type data: str
1120 ea03467c Iustin Pop
    @param data: the new contents of the file
1121 4c36bdf5 Guido Trotter
    @type replicate: boolean
1122 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1123 ea03467c Iustin Pop

1124 8e00939c Michael Hanselmann
    """
1125 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1126 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1127 82b22e19 René Nussbaumer
                    gid=getents.masterd_gid)
1128 8e00939c Michael Hanselmann
1129 4c36bdf5 Guido Trotter
    if replicate:
1130 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1131 4c36bdf5 Guido Trotter
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1132 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1133 23752136 Michael Hanselmann
1134 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1135 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1136 ea03467c Iustin Pop

1137 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1138 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1139 ea03467c Iustin Pop

1140 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1141 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1142 ea03467c Iustin Pop

1143 ea03467c Iustin Pop
    """
1144 dd875d32 Michael Hanselmann
    # Rename them locally
1145 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1146 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1147 abc1f2ce Michael Hanselmann
1148 dd875d32 Michael Hanselmann
    # ... and on all nodes
1149 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1150 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1151 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1152 abc1f2ce Michael Hanselmann
1153 7e950d31 Iustin Pop
  @staticmethod
1154 7e950d31 Iustin Pop
  def _FormatJobID(job_id):
1155 ea03467c Iustin Pop
    """Convert a job ID to string format.
1156 ea03467c Iustin Pop

1157 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
1158 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
1159 ea03467c Iustin Pop
    abstract this change.
1160 ea03467c Iustin Pop

1161 ea03467c Iustin Pop
    @type job_id: int or long
1162 ea03467c Iustin Pop
    @param job_id: the numeric job id
1163 ea03467c Iustin Pop
    @rtype: str
1164 ea03467c Iustin Pop
    @return: the formatted job id
1165 ea03467c Iustin Pop

1166 ea03467c Iustin Pop
    """
1167 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
1168 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1169 85f03e0d Michael Hanselmann
    if job_id < 0:
1170 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1171 85f03e0d Michael Hanselmann
1172 85f03e0d Michael Hanselmann
    return str(job_id)
1173 85f03e0d Michael Hanselmann
1174 58b22b6e Michael Hanselmann
  @classmethod
1175 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
1176 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
1177 58b22b6e Michael Hanselmann

1178 58b22b6e Michael Hanselmann
    @type job_id: str
1179 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
1180 58b22b6e Michael Hanselmann
    @rtype: str
1181 58b22b6e Michael Hanselmann
    @return: Directory name
1182 58b22b6e Michael Hanselmann

1183 58b22b6e Michael Hanselmann
    """
1184 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1185 58b22b6e Michael Hanselmann
1186 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1187 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1188 f1da30e6 Michael Hanselmann

1189 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1190 f1da30e6 Michael Hanselmann

1191 009e73d0 Iustin Pop
    @type count: integer
1192 009e73d0 Iustin Pop
    @param count: how many serials to return
1193 ea03467c Iustin Pop
    @rtype: str
1194 ea03467c Iustin Pop
    @return: a string representing the job identifier.
1195 f1da30e6 Michael Hanselmann

1196 f1da30e6 Michael Hanselmann
    """
1197 009e73d0 Iustin Pop
    assert count > 0
1198 f1da30e6 Michael Hanselmann
    # New number
1199 009e73d0 Iustin Pop
    serial = self._last_serial + count
1200 f1da30e6 Michael Hanselmann
1201 f1da30e6 Michael Hanselmann
    # Write to file
1202 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1203 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1204 f1da30e6 Michael Hanselmann
1205 009e73d0 Iustin Pop
    result = [self._FormatJobID(v)
1206 009e73d0 Iustin Pop
              for v in range(self._last_serial, serial + 1)]
1207 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1208 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1209 f1da30e6 Michael Hanselmann
1210 009e73d0 Iustin Pop
    return result
1211 f1da30e6 Michael Hanselmann
1212 85f03e0d Michael Hanselmann
  @staticmethod
1213 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1214 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1215 ea03467c Iustin Pop

1216 ea03467c Iustin Pop
    @type job_id: str
1217 ea03467c Iustin Pop
    @param job_id: the job identifier
1218 ea03467c Iustin Pop
    @rtype: str
1219 ea03467c Iustin Pop
    @return: the path to the job file
1220 ea03467c Iustin Pop

1221 ea03467c Iustin Pop
    """
1222 c4feafe8 Iustin Pop
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1223 f1da30e6 Michael Hanselmann
1224 58b22b6e Michael Hanselmann
  @classmethod
1225 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
1226 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1227 ea03467c Iustin Pop

1228 ea03467c Iustin Pop
    @type job_id: str
1229 ea03467c Iustin Pop
    @param job_id: the job identifier
1230 ea03467c Iustin Pop
    @rtype: str
1231 ea03467c Iustin Pop
    @return: the path to the archived job file
1232 ea03467c Iustin Pop

1233 ea03467c Iustin Pop
    """
1234 0411c011 Iustin Pop
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1235 0411c011 Iustin Pop
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1236 0cb94105 Michael Hanselmann
1237 85a1c57d Guido Trotter
  def _GetJobIDsUnlocked(self, sort=True):
1238 911a495b Iustin Pop
    """Return all known job IDs.
1239 911a495b Iustin Pop

1240 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1241 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1242 ac0930b9 Iustin Pop
    extra IDs).
1243 ac0930b9 Iustin Pop

1244 85a1c57d Guido Trotter
    @type sort: boolean
1245 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1246 ea03467c Iustin Pop
    @rtype: list
1247 ea03467c Iustin Pop
    @return: the list of job IDs
1248 ea03467c Iustin Pop

1249 911a495b Iustin Pop
    """
1250 85a1c57d Guido Trotter
    jlist = []
1251 b5b8309d Guido Trotter
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1252 85a1c57d Guido Trotter
      m = self._RE_JOB_FILE.match(filename)
1253 85a1c57d Guido Trotter
      if m:
1254 85a1c57d Guido Trotter
        jlist.append(m.group(1))
1255 85a1c57d Guido Trotter
    if sort:
1256 85a1c57d Guido Trotter
      jlist = utils.NiceSort(jlist)
1257 f0d874fe Iustin Pop
    return jlist
1258 911a495b Iustin Pop
1259 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1260 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1261 ea03467c Iustin Pop

1262 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1263 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1264 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1265 ea03467c Iustin Pop

1266 ea03467c Iustin Pop
    @param job_id: the job id
1267 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1268 ea03467c Iustin Pop
    @return: either None or the job object
1269 ea03467c Iustin Pop

1270 ea03467c Iustin Pop
    """
1271 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
1272 5685c1a5 Michael Hanselmann
    if job:
1273 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
1274 5685c1a5 Michael Hanselmann
      return job
1275 ac0930b9 Iustin Pop
1276 3d6c5566 Guido Trotter
    try:
1277 3d6c5566 Guido Trotter
      job = self._LoadJobFromDisk(job_id)
1278 aa9f8167 Iustin Pop
      if job is None:
1279 aa9f8167 Iustin Pop
        return job
1280 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
1281 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
1282 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
1283 3d6c5566 Guido Trotter
      if old_path == new_path:
1284 3d6c5566 Guido Trotter
        # job already archived (future case)
1285 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
1286 3d6c5566 Guido Trotter
      else:
1287 3d6c5566 Guido Trotter
        # non-archived case
1288 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
1289 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
1290 3d6c5566 Guido Trotter
      return None
1291 162c8636 Guido Trotter
1292 162c8636 Guido Trotter
    self._memcache[job_id] = job
1293 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
1294 162c8636 Guido Trotter
    return job
1295 162c8636 Guido Trotter
1296 162c8636 Guido Trotter
  def _LoadJobFromDisk(self, job_id):
1297 162c8636 Guido Trotter
    """Load the given job file from disk.
1298 162c8636 Guido Trotter

1299 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1300 162c8636 Guido Trotter

1301 162c8636 Guido Trotter
    @type job_id: string
1302 162c8636 Guido Trotter
    @param job_id: job identifier
1303 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
1304 162c8636 Guido Trotter
    @return: either None or the job object
1305 162c8636 Guido Trotter

1306 162c8636 Guido Trotter
    """
1307 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
1308 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
1309 f1da30e6 Michael Hanselmann
    try:
1310 13998ef2 Michael Hanselmann
      raw_data = utils.ReadFile(filepath)
1311 162c8636 Guido Trotter
    except EnvironmentError, err:
1312 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
1313 f1da30e6 Michael Hanselmann
        return None
1314 f1da30e6 Michael Hanselmann
      raise
1315 13998ef2 Michael Hanselmann
1316 94ed59a5 Iustin Pop
    try:
1317 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
1318 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
1319 7260cfbe Iustin Pop
    except Exception, err: # pylint: disable-msg=W0703
1320 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
1321 94ed59a5 Iustin Pop
1322 ac0930b9 Iustin Pop
    return job
1323 f1da30e6 Michael Hanselmann
1324 0f9c08dc Guido Trotter
  def SafeLoadJobFromDisk(self, job_id):
1325 0f9c08dc Guido Trotter
    """Load the given job file from disk.
1326 0f9c08dc Guido Trotter

1327 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1328 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
1329 0f9c08dc Guido Trotter
    exception is logged.
1330 0f9c08dc Guido Trotter

1331 0f9c08dc Guido Trotter
    @type job_id: string
1332 0f9c08dc Guido Trotter
    @param job_id: job identifier
1333 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
1334 0f9c08dc Guido Trotter
    @return: either None or the job object
1335 0f9c08dc Guido Trotter

1336 0f9c08dc Guido Trotter
    """
1337 0f9c08dc Guido Trotter
    try:
1338 0f9c08dc Guido Trotter
      return self._LoadJobFromDisk(job_id)
1339 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
1340 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
1341 0f9c08dc Guido Trotter
      return None
1342 0f9c08dc Guido Trotter
1343 686d7433 Iustin Pop
  @staticmethod
1344 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
1345 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
1346 686d7433 Iustin Pop

1347 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
1348 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
1349 686d7433 Iustin Pop

1350 ea03467c Iustin Pop
    @rtype: boolean
1351 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
1352 ea03467c Iustin Pop

1353 686d7433 Iustin Pop
    """
1354 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1355 686d7433 Iustin Pop
1356 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
1357 20571a26 Guido Trotter
    """Update the queue size.
1358 20571a26 Guido Trotter

1359 20571a26 Guido Trotter
    """
1360 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1361 20571a26 Guido Trotter
1362 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1363 20571a26 Guido Trotter
  @_RequireOpenQueue
1364 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
1365 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
1366 3ccafd0e Iustin Pop

1367 ea03467c Iustin Pop
    @type drain_flag: boolean
1368 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
1369 ea03467c Iustin Pop

1370 3ccafd0e Iustin Pop
    """
1371 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1372 82b22e19 René Nussbaumer
1373 3ccafd0e Iustin Pop
    if drain_flag:
1374 82b22e19 René Nussbaumer
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1375 82b22e19 René Nussbaumer
                      uid=getents.masterd_uid, gid=getents.masterd_gid)
1376 3ccafd0e Iustin Pop
    else:
1377 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1378 20571a26 Guido Trotter
1379 20571a26 Guido Trotter
    self._drained = drain_flag
1380 20571a26 Guido Trotter
1381 3ccafd0e Iustin Pop
    return True
1382 3ccafd0e Iustin Pop
1383 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1384 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
1385 85f03e0d Michael Hanselmann
    """Create and store a new job.
1386 f1da30e6 Michael Hanselmann

1387 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
1388 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
1389 c3f0a12f Iustin Pop

1390 009e73d0 Iustin Pop
    @type job_id: job ID
1391 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
1392 c3f0a12f Iustin Pop
    @type ops: list
1393 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
1394 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
1395 7beb1e53 Guido Trotter
    @return: the job object to be queued
1396 7beb1e53 Guido Trotter
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1397 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1398 e71c8147 Michael Hanselmann
    @raise errors.GenericError: If an opcode is not valid
1399 c3f0a12f Iustin Pop

1400 c3f0a12f Iustin Pop
    """
1401 20571a26 Guido Trotter
    # Ok when sharing the big job queue lock, as the drain file is created when
1402 20571a26 Guido Trotter
    # the lock is exclusive.
1403 20571a26 Guido Trotter
    if self._drained:
1404 2971c913 Iustin Pop
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1405 f87b405e Michael Hanselmann
1406 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1407 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
1408 f87b405e Michael Hanselmann
1409 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
1410 f1da30e6 Michael Hanselmann
1411 e71c8147 Michael Hanselmann
    # Check priority
1412 e71c8147 Michael Hanselmann
    for idx, op in enumerate(job.ops):
1413 e71c8147 Michael Hanselmann
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1414 e71c8147 Michael Hanselmann
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1415 e71c8147 Michael Hanselmann
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1416 e71c8147 Michael Hanselmann
                                  " are %s" % (idx, op.priority, allowed))
1417 e71c8147 Michael Hanselmann
1418 f1da30e6 Michael Hanselmann
    # Write to disk
1419 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
1420 f1da30e6 Michael Hanselmann
1421 20571a26 Guido Trotter
    self._queue_size += 1
1422 20571a26 Guido Trotter
1423 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
1424 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
1425 ac0930b9 Iustin Pop
1426 7beb1e53 Guido Trotter
    return job
1427 f1da30e6 Michael Hanselmann
1428 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1429 2971c913 Iustin Pop
  @_RequireOpenQueue
1430 2971c913 Iustin Pop
  def SubmitJob(self, ops):
1431 2971c913 Iustin Pop
    """Create and store a new job.
1432 2971c913 Iustin Pop

1433 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1434 2971c913 Iustin Pop

1435 2971c913 Iustin Pop
    """
1436 009e73d0 Iustin Pop
    job_id = self._NewSerialsUnlocked(1)[0]
1437 b2e8a4d9 Michael Hanselmann
    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1438 7beb1e53 Guido Trotter
    return job_id
1439 2971c913 Iustin Pop
1440 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1441 2971c913 Iustin Pop
  @_RequireOpenQueue
1442 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
1443 2971c913 Iustin Pop
    """Create and store multiple jobs.
1444 2971c913 Iustin Pop

1445 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1446 2971c913 Iustin Pop

1447 2971c913 Iustin Pop
    """
1448 2971c913 Iustin Pop
    results = []
1449 7beb1e53 Guido Trotter
    tasks = []
1450 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1451 009e73d0 Iustin Pop
    for job_id, ops in zip(all_job_ids, jobs):
1452 2971c913 Iustin Pop
      try:
1453 7beb1e53 Guido Trotter
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1454 2971c913 Iustin Pop
        status = True
1455 7beb1e53 Guido Trotter
        data = job_id
1456 2971c913 Iustin Pop
      except errors.GenericError, err:
1457 2971c913 Iustin Pop
        data = str(err)
1458 2971c913 Iustin Pop
        status = False
1459 2971c913 Iustin Pop
      results.append((status, data))
1460 7beb1e53 Guido Trotter
    self._wpool.AddManyTasks(tasks)
1461 2971c913 Iustin Pop
1462 2971c913 Iustin Pop
    return results
1463 2971c913 Iustin Pop
1464 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1465 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
1466 ea03467c Iustin Pop
    """Update a job's on disk storage.
1467 ea03467c Iustin Pop

1468 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
1469 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
1470 ea03467c Iustin Pop
    nodes.
1471 ea03467c Iustin Pop

1472 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1473 ea03467c Iustin Pop
    @param job: the changed job
1474 4c36bdf5 Guido Trotter
    @type replicate: boolean
1475 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
1476 ea03467c Iustin Pop

1477 ea03467c Iustin Pop
    """
1478 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
1479 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
1480 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
1481 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
1482 ac0930b9 Iustin Pop
1483 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1484 5c735209 Iustin Pop
                        timeout):
1485 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
1486 6c5a7090 Michael Hanselmann

1487 6c5a7090 Michael Hanselmann
    @type job_id: string
1488 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
1489 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1490 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1491 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1492 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1493 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1494 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1495 5c735209 Iustin Pop
    @type timeout: float
1496 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
1497 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1498 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1499 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1500 ea03467c Iustin Pop

1501 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1502 ea03467c Iustin Pop
        we instead return a special value,
1503 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1504 ea03467c Iustin Pop
        as such by the clients
1505 6c5a7090 Michael Hanselmann

1506 6c5a7090 Michael Hanselmann
    """
1507 989a8bee Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1508 989a8bee Michael Hanselmann
1509 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
1510 989a8bee Michael Hanselmann
1511 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
1512 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
1513 dfe57c22 Michael Hanselmann
1514 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1515 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1516 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1517 188c5e0a Michael Hanselmann
    """Cancels a job.
1518 188c5e0a Michael Hanselmann

1519 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1520 ea03467c Iustin Pop

1521 188c5e0a Michael Hanselmann
    @type job_id: string
1522 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1523 188c5e0a Michael Hanselmann

1524 188c5e0a Michael Hanselmann
    """
1525 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1526 188c5e0a Michael Hanselmann
1527 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1528 188c5e0a Michael Hanselmann
    if not job:
1529 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1530 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1531 fbf0262f Michael Hanselmann
1532 099b2870 Michael Hanselmann
    (success, msg) = job.Cancel()
1533 188c5e0a Michael Hanselmann
1534 099b2870 Michael Hanselmann
    if success:
1535 099b2870 Michael Hanselmann
      self.UpdateJobUnlocked(job)
1536 fbf0262f Michael Hanselmann
1537 099b2870 Michael Hanselmann
    return (success, msg)
1538 fbf0262f Michael Hanselmann
1539 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1540 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
1541 d7fd1f28 Michael Hanselmann
    """Archives jobs.
1542 c609f802 Michael Hanselmann

1543 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
1544 25e7b43f Iustin Pop
    @param jobs: Job objects
1545 d7fd1f28 Michael Hanselmann
    @rtype: int
1546 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
1547 c609f802 Michael Hanselmann

1548 c609f802 Michael Hanselmann
    """
1549 d7fd1f28 Michael Hanselmann
    archive_jobs = []
1550 d7fd1f28 Michael Hanselmann
    rename_files = []
1551 d7fd1f28 Michael Hanselmann
    for job in jobs:
1552 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1553 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
1554 d7fd1f28 Michael Hanselmann
        continue
1555 c609f802 Michael Hanselmann
1556 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
1557 c609f802 Michael Hanselmann
1558 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
1559 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
1560 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
1561 c609f802 Michael Hanselmann
1562 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
1563 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
1564 f1da30e6 Michael Hanselmann
1565 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
1566 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
1567 d7fd1f28 Michael Hanselmann
1568 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1569 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
1570 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
1571 20571a26 Guido Trotter
    # archived jobs to fix this.
1572 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1573 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
1574 78d12585 Michael Hanselmann
1575 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1576 07cd723a Iustin Pop
  @_RequireOpenQueue
1577 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1578 07cd723a Iustin Pop
    """Archives a job.
1579 07cd723a Iustin Pop

1580 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1581 ea03467c Iustin Pop

1582 07cd723a Iustin Pop
    @type job_id: string
1583 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1584 78d12585 Michael Hanselmann
    @rtype: bool
1585 78d12585 Michael Hanselmann
    @return: Whether job was archived
1586 07cd723a Iustin Pop

1587 07cd723a Iustin Pop
    """
1588 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
1589 78d12585 Michael Hanselmann
1590 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1591 78d12585 Michael Hanselmann
    if not job:
1592 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1593 78d12585 Michael Hanselmann
      return False
1594 78d12585 Michael Hanselmann
1595 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
1596 07cd723a Iustin Pop
1597 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1598 07cd723a Iustin Pop
  @_RequireOpenQueue
1599 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
1600 07cd723a Iustin Pop
    """Archives all jobs based on age.
1601 07cd723a Iustin Pop

1602 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1603 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1604 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1605 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1606 07cd723a Iustin Pop

1607 07cd723a Iustin Pop
    @type age: int
1608 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1609 07cd723a Iustin Pop

1610 07cd723a Iustin Pop
    """
1611 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1612 07cd723a Iustin Pop
1613 07cd723a Iustin Pop
    now = time.time()
1614 f8ad5591 Michael Hanselmann
    end_time = now + timeout
1615 f8ad5591 Michael Hanselmann
    archived_count = 0
1616 f8ad5591 Michael Hanselmann
    last_touched = 0
1617 f8ad5591 Michael Hanselmann
1618 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
1619 d7fd1f28 Michael Hanselmann
    pending = []
1620 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1621 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
1622 f8ad5591 Michael Hanselmann
1623 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
1624 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
1625 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
1626 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
1627 f8ad5591 Michael Hanselmann
        break
1628 f8ad5591 Michael Hanselmann
1629 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
1630 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1631 f8ad5591 Michael Hanselmann
      if job:
1632 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
1633 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
1634 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
1635 f8ad5591 Michael Hanselmann
          else:
1636 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
1637 07cd723a Iustin Pop
        else:
1638 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
1639 f8ad5591 Michael Hanselmann
1640 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
1641 d7fd1f28 Michael Hanselmann
          pending.append(job)
1642 d7fd1f28 Michael Hanselmann
1643 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
1644 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
1645 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
1646 d7fd1f28 Michael Hanselmann
            pending = []
1647 f8ad5591 Michael Hanselmann
1648 d7fd1f28 Michael Hanselmann
    if pending:
1649 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
1650 07cd723a Iustin Pop
1651 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
1652 07cd723a Iustin Pop
1653 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1654 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1655 e2715f69 Michael Hanselmann

1656 ea03467c Iustin Pop
    @type job_ids: list
1657 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1658 ea03467c Iustin Pop
    @type fields: list
1659 ea03467c Iustin Pop
    @param fields: names of fields to return
1660 ea03467c Iustin Pop
    @rtype: list
1661 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1662 ea03467c Iustin Pop
        the requested fields
1663 e2715f69 Michael Hanselmann

1664 e2715f69 Michael Hanselmann
    """
1665 85f03e0d Michael Hanselmann
    jobs = []
1666 9f7b4967 Guido Trotter
    list_all = False
1667 9f7b4967 Guido Trotter
    if not job_ids:
1668 9f7b4967 Guido Trotter
      # Since files are added to/removed from the queue atomically, there's no
1669 9f7b4967 Guido Trotter
      # risk of getting the job ids in an inconsistent state.
1670 9f7b4967 Guido Trotter
      job_ids = self._GetJobIDsUnlocked()
1671 9f7b4967 Guido Trotter
      list_all = True
1672 e2715f69 Michael Hanselmann
1673 9f7b4967 Guido Trotter
    for job_id in job_ids:
1674 9f7b4967 Guido Trotter
      job = self.SafeLoadJobFromDisk(job_id)
1675 9f7b4967 Guido Trotter
      if job is not None:
1676 6a290889 Guido Trotter
        jobs.append(job.GetInfo(fields))
1677 9f7b4967 Guido Trotter
      elif not list_all:
1678 9f7b4967 Guido Trotter
        jobs.append(None)
1679 e2715f69 Michael Hanselmann
1680 85f03e0d Michael Hanselmann
    return jobs
1681 e2715f69 Michael Hanselmann
1682 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1683 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1684 e2715f69 Michael Hanselmann
  def Shutdown(self):
1685 e2715f69 Michael Hanselmann
    """Stops the job queue.
1686 e2715f69 Michael Hanselmann

1687 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1688 ea03467c Iustin Pop

1689 e2715f69 Michael Hanselmann
    """
1690 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1691 85f03e0d Michael Hanselmann
1692 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
1693 a71f9c7d Guido Trotter
    self._queue_filelock = None