Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 6760e4ed

History | View | Annotate | Download (49 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 7f93570a Iustin Pop
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 f1da30e6 Michael Hanselmann
import errno
35 f1da30e6 Michael Hanselmann
import re
36 f1048938 Iustin Pop
import time
37 5685c1a5 Michael Hanselmann
import weakref
38 498ae1cc Iustin Pop
39 6c2549d6 Guido Trotter
try:
40 6c2549d6 Guido Trotter
  # pylint: disable-msg=E0611
41 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
42 6c2549d6 Guido Trotter
except ImportError:
43 6c2549d6 Guido Trotter
  import pyinotify
44 6c2549d6 Guido Trotter
45 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
46 e2715f69 Michael Hanselmann
from ganeti import constants
47 f1da30e6 Michael Hanselmann
from ganeti import serializer
48 e2715f69 Michael Hanselmann
from ganeti import workerpool
49 99bd4f0a Guido Trotter
from ganeti import locking
50 f1da30e6 Michael Hanselmann
from ganeti import opcodes
51 7a1ecaed Iustin Pop
from ganeti import errors
52 e2715f69 Michael Hanselmann
from ganeti import mcpu
53 7996a135 Iustin Pop
from ganeti import utils
54 04ab05ce Michael Hanselmann
from ganeti import jstore
55 c3f0a12f Iustin Pop
from ganeti import rpc
56 a744b676 Manuel Franceschini
from ganeti import netutils
57 989a8bee Michael Hanselmann
from ganeti import compat
58 e2715f69 Michael Hanselmann
59 fbf0262f Michael Hanselmann
60 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
61 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
62 e2715f69 Michael Hanselmann
63 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
64 ebb80afa Guido Trotter
_LOCK = "_lock"
65 ebb80afa Guido Trotter
_QUEUE = "_queue"
66 99bd4f0a Guido Trotter
67 498ae1cc Iustin Pop
68 9728ae5d Iustin Pop
class CancelJob(Exception):
69 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
70 fbf0262f Michael Hanselmann

71 fbf0262f Michael Hanselmann
  """
72 fbf0262f Michael Hanselmann
73 fbf0262f Michael Hanselmann
74 70552c46 Michael Hanselmann
def TimeStampNow():
75 ea03467c Iustin Pop
  """Returns the current timestamp.
76 ea03467c Iustin Pop

77 ea03467c Iustin Pop
  @rtype: tuple
78 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
79 ea03467c Iustin Pop

80 ea03467c Iustin Pop
  """
81 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
82 70552c46 Michael Hanselmann
83 70552c46 Michael Hanselmann
84 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
85 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
86 e2715f69 Michael Hanselmann

87 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
88 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
89 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
90 ea03467c Iustin Pop
  @ivar status: the current status
91 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
92 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
93 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
95 f1048938 Iustin Pop

96 e2715f69 Michael Hanselmann
  """
97 66d895a8 Iustin Pop
  __slots__ = ["input", "status", "result", "log",
98 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
99 66d895a8 Iustin Pop
               "__weakref__"]
100 66d895a8 Iustin Pop
101 85f03e0d Michael Hanselmann
  def __init__(self, op):
102 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
103 ea03467c Iustin Pop

104 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
105 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
106 ea03467c Iustin Pop

107 ea03467c Iustin Pop
    """
108 85f03e0d Michael Hanselmann
    self.input = op
109 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
110 85f03e0d Michael Hanselmann
    self.result = None
111 85f03e0d Michael Hanselmann
    self.log = []
112 70552c46 Michael Hanselmann
    self.start_timestamp = None
113 b9b5abcb Iustin Pop
    self.exec_timestamp = None
114 70552c46 Michael Hanselmann
    self.end_timestamp = None
115 f1da30e6 Michael Hanselmann
116 f1da30e6 Michael Hanselmann
  @classmethod
117 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
118 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
119 ea03467c Iustin Pop

120 ea03467c Iustin Pop
    @type state: dict
121 ea03467c Iustin Pop
    @param state: the serialized state
122 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
123 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
124 ea03467c Iustin Pop

125 ea03467c Iustin Pop
    """
126 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
127 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128 85f03e0d Michael Hanselmann
    obj.status = state["status"]
129 85f03e0d Michael Hanselmann
    obj.result = state["result"]
130 85f03e0d Michael Hanselmann
    obj.log = state["log"]
131 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
132 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
133 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
134 f1da30e6 Michael Hanselmann
    return obj
135 f1da30e6 Michael Hanselmann
136 f1da30e6 Michael Hanselmann
  def Serialize(self):
137 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
138 ea03467c Iustin Pop

139 ea03467c Iustin Pop
    @rtype: dict
140 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
141 ea03467c Iustin Pop

142 ea03467c Iustin Pop
    """
143 6c5a7090 Michael Hanselmann
    return {
144 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
145 6c5a7090 Michael Hanselmann
      "status": self.status,
146 6c5a7090 Michael Hanselmann
      "result": self.result,
147 6c5a7090 Michael Hanselmann
      "log": self.log,
148 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
149 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
150 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
151 6c5a7090 Michael Hanselmann
      }
152 f1048938 Iustin Pop
153 e2715f69 Michael Hanselmann
154 e2715f69 Michael Hanselmann
class _QueuedJob(object):
155 e2715f69 Michael Hanselmann
  """In-memory job representation.
156 e2715f69 Michael Hanselmann

157 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
158 ea03467c Iustin Pop
  be taken care of by users of this class.
159 ea03467c Iustin Pop

160 ea03467c Iustin Pop
  @type queue: L{JobQueue}
161 ea03467c Iustin Pop
  @ivar queue: the parent queue
162 ea03467c Iustin Pop
  @ivar id: the job ID
163 ea03467c Iustin Pop
  @type ops: list
164 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
165 ea03467c Iustin Pop
  @type log_serial: int
166 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
167 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
168 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
169 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
170 e2715f69 Michael Hanselmann

171 e2715f69 Michael Hanselmann
  """
172 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
173 d25c1d6a Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial",
174 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
175 66d895a8 Iustin Pop
               "__weakref__"]
176 66d895a8 Iustin Pop
177 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
178 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
179 ea03467c Iustin Pop

180 ea03467c Iustin Pop
    @type queue: L{JobQueue}
181 ea03467c Iustin Pop
    @param queue: our parent queue
182 ea03467c Iustin Pop
    @type job_id: job_id
183 ea03467c Iustin Pop
    @param job_id: our job id
184 ea03467c Iustin Pop
    @type ops: list
185 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
186 ea03467c Iustin Pop
        in _QueuedOpCodes
187 ea03467c Iustin Pop

188 ea03467c Iustin Pop
    """
189 e2715f69 Michael Hanselmann
    if not ops:
190 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
191 e2715f69 Michael Hanselmann
192 85f03e0d Michael Hanselmann
    self.queue = queue
193 f1da30e6 Michael Hanselmann
    self.id = job_id
194 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
195 6c5a7090 Michael Hanselmann
    self.log_serial = 0
196 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
197 c56ec146 Iustin Pop
    self.start_timestamp = None
198 c56ec146 Iustin Pop
    self.end_timestamp = None
199 6c5a7090 Michael Hanselmann
200 9fa2e150 Michael Hanselmann
  def __repr__(self):
201 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
202 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
203 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
204 9fa2e150 Michael Hanselmann
205 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
206 9fa2e150 Michael Hanselmann
207 f1da30e6 Michael Hanselmann
  @classmethod
208 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
209 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
210 ea03467c Iustin Pop

211 ea03467c Iustin Pop
    @type queue: L{JobQueue}
212 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
213 ea03467c Iustin Pop
    @type state: dict
214 ea03467c Iustin Pop
    @param state: the serialized state
215 ea03467c Iustin Pop
    @rtype: _JobQueue
216 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
217 ea03467c Iustin Pop

218 ea03467c Iustin Pop
    """
219 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
220 85f03e0d Michael Hanselmann
    obj.queue = queue
221 85f03e0d Michael Hanselmann
    obj.id = state["id"]
222 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
223 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
224 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
225 6c5a7090 Michael Hanselmann
226 6c5a7090 Michael Hanselmann
    obj.ops = []
227 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
228 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
229 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
230 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
231 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
232 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
233 6c5a7090 Michael Hanselmann
234 f1da30e6 Michael Hanselmann
    return obj
235 f1da30e6 Michael Hanselmann
236 f1da30e6 Michael Hanselmann
  def Serialize(self):
237 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
238 ea03467c Iustin Pop

239 ea03467c Iustin Pop
    @rtype: dict
240 ea03467c Iustin Pop
    @return: the serialized state
241 ea03467c Iustin Pop

242 ea03467c Iustin Pop
    """
243 f1da30e6 Michael Hanselmann
    return {
244 f1da30e6 Michael Hanselmann
      "id": self.id,
245 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
246 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
247 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
248 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
249 f1da30e6 Michael Hanselmann
      }
250 f1da30e6 Michael Hanselmann
251 85f03e0d Michael Hanselmann
  def CalcStatus(self):
252 ea03467c Iustin Pop
    """Compute the status of this job.
253 ea03467c Iustin Pop

254 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
255 ea03467c Iustin Pop
    based on their status, computes the job status.
256 ea03467c Iustin Pop

257 ea03467c Iustin Pop
    The algorithm is:
258 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
259 ea03467c Iustin Pop
        status will be the same
260 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
261 ea03467c Iustin Pop
          - waitlock
262 fbf0262f Michael Hanselmann
          - canceling
263 ea03467c Iustin Pop
          - running
264 ea03467c Iustin Pop

265 ea03467c Iustin Pop
        will determine the job status
266 ea03467c Iustin Pop

267 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
268 ea03467c Iustin Pop
        and the job status will be the same
269 ea03467c Iustin Pop

270 ea03467c Iustin Pop
    @return: the job status
271 ea03467c Iustin Pop

272 ea03467c Iustin Pop
    """
273 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
274 e2715f69 Michael Hanselmann
275 e2715f69 Michael Hanselmann
    all_success = True
276 85f03e0d Michael Hanselmann
    for op in self.ops:
277 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
278 e2715f69 Michael Hanselmann
        continue
279 e2715f69 Michael Hanselmann
280 e2715f69 Michael Hanselmann
      all_success = False
281 e2715f69 Michael Hanselmann
282 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
283 e2715f69 Michael Hanselmann
        pass
284 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
285 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
286 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
287 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
288 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
289 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
290 fbf0262f Michael Hanselmann
        break
291 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
292 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
293 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
294 f1da30e6 Michael Hanselmann
        break
295 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
296 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
297 4cb1d919 Michael Hanselmann
        break
298 e2715f69 Michael Hanselmann
299 e2715f69 Michael Hanselmann
    if all_success:
300 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
301 e2715f69 Michael Hanselmann
302 e2715f69 Michael Hanselmann
    return status
303 e2715f69 Michael Hanselmann
304 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
305 ea03467c Iustin Pop
    """Selectively returns the log entries.
306 ea03467c Iustin Pop

307 ea03467c Iustin Pop
    @type newer_than: None or int
308 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
309 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
310 ea03467c Iustin Pop
        than this value
311 ea03467c Iustin Pop
    @rtype: list
312 ea03467c Iustin Pop
    @return: the list of the log entries selected
313 ea03467c Iustin Pop

314 ea03467c Iustin Pop
    """
315 6c5a7090 Michael Hanselmann
    if newer_than is None:
316 6c5a7090 Michael Hanselmann
      serial = -1
317 6c5a7090 Michael Hanselmann
    else:
318 6c5a7090 Michael Hanselmann
      serial = newer_than
319 6c5a7090 Michael Hanselmann
320 6c5a7090 Michael Hanselmann
    entries = []
321 6c5a7090 Michael Hanselmann
    for op in self.ops:
322 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
323 6c5a7090 Michael Hanselmann
324 6c5a7090 Michael Hanselmann
    return entries
325 6c5a7090 Michael Hanselmann
326 6a290889 Guido Trotter
  def GetInfo(self, fields):
327 6a290889 Guido Trotter
    """Returns information about a job.
328 6a290889 Guido Trotter

329 6a290889 Guido Trotter
    @type fields: list
330 6a290889 Guido Trotter
    @param fields: names of fields to return
331 6a290889 Guido Trotter
    @rtype: list
332 6a290889 Guido Trotter
    @return: list with one element for each field
333 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
334 6a290889 Guido Trotter
        has been passed
335 6a290889 Guido Trotter

336 6a290889 Guido Trotter
    """
337 6a290889 Guido Trotter
    row = []
338 6a290889 Guido Trotter
    for fname in fields:
339 6a290889 Guido Trotter
      if fname == "id":
340 6a290889 Guido Trotter
        row.append(self.id)
341 6a290889 Guido Trotter
      elif fname == "status":
342 6a290889 Guido Trotter
        row.append(self.CalcStatus())
343 6a290889 Guido Trotter
      elif fname == "ops":
344 6a290889 Guido Trotter
        row.append([op.input.__getstate__() for op in self.ops])
345 6a290889 Guido Trotter
      elif fname == "opresult":
346 6a290889 Guido Trotter
        row.append([op.result for op in self.ops])
347 6a290889 Guido Trotter
      elif fname == "opstatus":
348 6a290889 Guido Trotter
        row.append([op.status for op in self.ops])
349 6a290889 Guido Trotter
      elif fname == "oplog":
350 6a290889 Guido Trotter
        row.append([op.log for op in self.ops])
351 6a290889 Guido Trotter
      elif fname == "opstart":
352 6a290889 Guido Trotter
        row.append([op.start_timestamp for op in self.ops])
353 6a290889 Guido Trotter
      elif fname == "opexec":
354 6a290889 Guido Trotter
        row.append([op.exec_timestamp for op in self.ops])
355 6a290889 Guido Trotter
      elif fname == "opend":
356 6a290889 Guido Trotter
        row.append([op.end_timestamp for op in self.ops])
357 6a290889 Guido Trotter
      elif fname == "received_ts":
358 6a290889 Guido Trotter
        row.append(self.received_timestamp)
359 6a290889 Guido Trotter
      elif fname == "start_ts":
360 6a290889 Guido Trotter
        row.append(self.start_timestamp)
361 6a290889 Guido Trotter
      elif fname == "end_ts":
362 6a290889 Guido Trotter
        row.append(self.end_timestamp)
363 6a290889 Guido Trotter
      elif fname == "summary":
364 6a290889 Guido Trotter
        row.append([op.input.Summary() for op in self.ops])
365 6a290889 Guido Trotter
      else:
366 6a290889 Guido Trotter
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
367 6a290889 Guido Trotter
    return row
368 6a290889 Guido Trotter
369 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
370 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
371 34327f51 Iustin Pop

372 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
373 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
374 34327f51 Iustin Pop
    finalised are not changed.
375 34327f51 Iustin Pop

376 34327f51 Iustin Pop
    @param status: a given opcode status
377 34327f51 Iustin Pop
    @param result: the opcode result
378 34327f51 Iustin Pop

379 34327f51 Iustin Pop
    """
380 39ed3a98 Guido Trotter
    try:
381 39ed3a98 Guido Trotter
      not_marked = True
382 39ed3a98 Guido Trotter
      for op in self.ops:
383 39ed3a98 Guido Trotter
        if op.status in constants.OPS_FINALIZED:
384 39ed3a98 Guido Trotter
          assert not_marked, "Finalized opcodes found after non-finalized ones"
385 39ed3a98 Guido Trotter
          continue
386 39ed3a98 Guido Trotter
        op.status = status
387 39ed3a98 Guido Trotter
        op.result = result
388 39ed3a98 Guido Trotter
        not_marked = False
389 39ed3a98 Guido Trotter
    finally:
390 39ed3a98 Guido Trotter
      self.queue.UpdateJobUnlocked(self)
391 34327f51 Iustin Pop
392 f1048938 Iustin Pop
393 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
394 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
395 031a3e57 Michael Hanselmann
    """Initializes this class.
396 ea03467c Iustin Pop

397 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
398 031a3e57 Michael Hanselmann
    @param queue: Job queue
399 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
400 031a3e57 Michael Hanselmann
    @param job: Job object
401 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
402 031a3e57 Michael Hanselmann
    @param op: OpCode
403 031a3e57 Michael Hanselmann

404 031a3e57 Michael Hanselmann
    """
405 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
406 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
407 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
408 031a3e57 Michael Hanselmann
409 031a3e57 Michael Hanselmann
    self._queue = queue
410 031a3e57 Michael Hanselmann
    self._job = job
411 031a3e57 Michael Hanselmann
    self._op = op
412 031a3e57 Michael Hanselmann
413 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
414 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
415 dc1e2262 Michael Hanselmann

416 dc1e2262 Michael Hanselmann
    """
417 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
418 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
419 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
420 dc1e2262 Michael Hanselmann
      raise CancelJob()
421 dc1e2262 Michael Hanselmann
422 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
423 031a3e57 Michael Hanselmann
  def NotifyStart(self):
424 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
425 e92376d7 Iustin Pop

426 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
427 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
428 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
429 031a3e57 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
430 e92376d7 Iustin Pop

431 e92376d7 Iustin Pop
    """
432 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
433 271daef8 Iustin Pop
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
434 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
435 fbf0262f Michael Hanselmann
436 271daef8 Iustin Pop
    # Cancel here if we were asked to
437 dc1e2262 Michael Hanselmann
    self._CheckCancel()
438 fbf0262f Michael Hanselmann
439 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
440 9bdab621 Michael Hanselmann
441 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
442 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
443 271daef8 Iustin Pop
444 271daef8 Iustin Pop
    # And finally replicate the job status
445 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
446 031a3e57 Michael Hanselmann
447 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
448 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
449 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
450 9bf5e01f Guido Trotter

451 9bf5e01f Guido Trotter
    """
452 9bf5e01f Guido Trotter
    self._job.log_serial += 1
453 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
454 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
455 9bf5e01f Guido Trotter
456 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
457 031a3e57 Michael Hanselmann
    """Append a log entry.
458 031a3e57 Michael Hanselmann

459 031a3e57 Michael Hanselmann
    """
460 031a3e57 Michael Hanselmann
    assert len(args) < 3
461 031a3e57 Michael Hanselmann
462 031a3e57 Michael Hanselmann
    if len(args) == 1:
463 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
464 031a3e57 Michael Hanselmann
      log_msg = args[0]
465 031a3e57 Michael Hanselmann
    else:
466 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
467 031a3e57 Michael Hanselmann
468 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
469 031a3e57 Michael Hanselmann
    # precision.
470 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
471 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
472 031a3e57 Michael Hanselmann
473 ef2df7d3 Michael Hanselmann
  def ReportLocks(self, msg):
474 ef2df7d3 Michael Hanselmann
    """Write locking information to the job.
475 ef2df7d3 Michael Hanselmann

476 ef2df7d3 Michael Hanselmann
    Called whenever the LU processor is waiting for a lock or has acquired one.
477 ef2df7d3 Michael Hanselmann

478 ef2df7d3 Michael Hanselmann
    """
479 dc1e2262 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
480 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
481 dc1e2262 Michael Hanselmann
482 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
483 dc1e2262 Michael Hanselmann
    self._CheckCancel()
484 dc1e2262 Michael Hanselmann
485 031a3e57 Michael Hanselmann
486 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
487 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
488 989a8bee Michael Hanselmann
    """Initializes this class.
489 6c2549d6 Guido Trotter

490 989a8bee Michael Hanselmann
    @type fields: list of strings
491 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
492 989a8bee Michael Hanselmann
    @type prev_job_info: string
493 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
494 989a8bee Michael Hanselmann
    @type prev_log_serial: string
495 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
496 6c2549d6 Guido Trotter

497 989a8bee Michael Hanselmann
    """
498 989a8bee Michael Hanselmann
    self._fields = fields
499 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
500 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
501 6c2549d6 Guido Trotter
502 989a8bee Michael Hanselmann
  def __call__(self, job):
503 989a8bee Michael Hanselmann
    """Checks whether job has changed.
504 6c2549d6 Guido Trotter

505 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
506 989a8bee Michael Hanselmann
    @param job: Job object
507 6c2549d6 Guido Trotter

508 6c2549d6 Guido Trotter
    """
509 989a8bee Michael Hanselmann
    status = job.CalcStatus()
510 989a8bee Michael Hanselmann
    job_info = job.GetInfo(self._fields)
511 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
512 6c2549d6 Guido Trotter
513 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
514 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
515 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
516 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
517 6c2549d6 Guido Trotter
    # significantly different.
518 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
519 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
520 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
521 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
522 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
523 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
524 6c2549d6 Guido Trotter
525 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
526 6c2549d6 Guido Trotter
    # no changes.
527 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
528 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
529 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_WAITLOCK) or
530 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
531 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
532 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
533 989a8bee Michael Hanselmann
      return (job_info, log_entries)
534 6c2549d6 Guido Trotter
535 989a8bee Michael Hanselmann
    return None
536 989a8bee Michael Hanselmann
537 989a8bee Michael Hanselmann
538 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
539 989a8bee Michael Hanselmann
  def __init__(self, filename):
540 989a8bee Michael Hanselmann
    """Initializes this class.
541 989a8bee Michael Hanselmann

542 989a8bee Michael Hanselmann
    @type filename: string
543 989a8bee Michael Hanselmann
    @param filename: Path to job file
544 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
545 6c2549d6 Guido Trotter

546 989a8bee Michael Hanselmann
    """
547 989a8bee Michael Hanselmann
    self._wm = pyinotify.WatchManager()
548 989a8bee Michael Hanselmann
    self._inotify_handler = \
549 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
550 989a8bee Michael Hanselmann
    self._notifier = \
551 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
552 989a8bee Michael Hanselmann
    try:
553 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
554 989a8bee Michael Hanselmann
    except Exception:
555 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
556 989a8bee Michael Hanselmann
      self._notifier.stop()
557 989a8bee Michael Hanselmann
      raise
558 989a8bee Michael Hanselmann
559 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
560 989a8bee Michael Hanselmann
    """Callback for inotify.
561 989a8bee Michael Hanselmann

562 989a8bee Michael Hanselmann
    """
563 6c2549d6 Guido Trotter
    if not notifier_enabled:
564 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
565 989a8bee Michael Hanselmann
566 989a8bee Michael Hanselmann
  def Wait(self, timeout):
567 989a8bee Michael Hanselmann
    """Waits for the job file to change.
568 989a8bee Michael Hanselmann

569 989a8bee Michael Hanselmann
    @type timeout: float
570 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
571 989a8bee Michael Hanselmann
    @return: Whether there have been events
572 989a8bee Michael Hanselmann

573 989a8bee Michael Hanselmann
    """
574 989a8bee Michael Hanselmann
    assert timeout >= 0
575 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
576 989a8bee Michael Hanselmann
    if have_events:
577 989a8bee Michael Hanselmann
      self._notifier.read_events()
578 989a8bee Michael Hanselmann
    self._notifier.process_events()
579 989a8bee Michael Hanselmann
    return have_events
580 989a8bee Michael Hanselmann
581 989a8bee Michael Hanselmann
  def Close(self):
582 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
583 989a8bee Michael Hanselmann

584 989a8bee Michael Hanselmann
    """
585 989a8bee Michael Hanselmann
    self._notifier.stop()
586 989a8bee Michael Hanselmann
587 989a8bee Michael Hanselmann
588 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
589 989a8bee Michael Hanselmann
  def __init__(self, filename):
590 989a8bee Michael Hanselmann
    """Initializes this class.
591 989a8bee Michael Hanselmann

592 989a8bee Michael Hanselmann
    @type filename: string
593 989a8bee Michael Hanselmann
    @param filename: Path to job file
594 989a8bee Michael Hanselmann

595 989a8bee Michael Hanselmann
    """
596 989a8bee Michael Hanselmann
    self._filewaiter = None
597 989a8bee Michael Hanselmann
    self._filename = filename
598 6c2549d6 Guido Trotter
599 989a8bee Michael Hanselmann
  def Wait(self, timeout):
600 989a8bee Michael Hanselmann
    """Waits for a job to change.
601 6c2549d6 Guido Trotter

602 989a8bee Michael Hanselmann
    @type timeout: float
603 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
604 989a8bee Michael Hanselmann
    @return: Whether there have been events
605 989a8bee Michael Hanselmann

606 989a8bee Michael Hanselmann
    """
607 989a8bee Michael Hanselmann
    if self._filewaiter:
608 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
609 989a8bee Michael Hanselmann
610 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
611 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
612 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
613 989a8bee Michael Hanselmann
    # race condition.
614 989a8bee Michael Hanselmann
    self._filewaiter = _JobFileChangesWaiter(self._filename)
615 989a8bee Michael Hanselmann
616 989a8bee Michael Hanselmann
    return True
617 989a8bee Michael Hanselmann
618 989a8bee Michael Hanselmann
  def Close(self):
619 989a8bee Michael Hanselmann
    """Closes underlying waiter.
620 989a8bee Michael Hanselmann

621 989a8bee Michael Hanselmann
    """
622 989a8bee Michael Hanselmann
    if self._filewaiter:
623 989a8bee Michael Hanselmann
      self._filewaiter.Close()
624 989a8bee Michael Hanselmann
625 989a8bee Michael Hanselmann
626 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
627 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
628 989a8bee Michael Hanselmann

629 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
630 989a8bee Michael Hanselmann
  the current job status has changed.
631 989a8bee Michael Hanselmann

632 989a8bee Michael Hanselmann
  """
633 989a8bee Michael Hanselmann
  @staticmethod
634 989a8bee Michael Hanselmann
  def _CheckForChanges(job_load_fn, check_fn):
635 989a8bee Michael Hanselmann
    job = job_load_fn()
636 989a8bee Michael Hanselmann
    if not job:
637 989a8bee Michael Hanselmann
      raise errors.JobLost()
638 989a8bee Michael Hanselmann
639 989a8bee Michael Hanselmann
    result = check_fn(job)
640 989a8bee Michael Hanselmann
    if result is None:
641 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
642 989a8bee Michael Hanselmann
643 989a8bee Michael Hanselmann
    return result
644 989a8bee Michael Hanselmann
645 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
646 989a8bee Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout):
647 989a8bee Michael Hanselmann
    """Waits for changes on a job.
648 989a8bee Michael Hanselmann

649 989a8bee Michael Hanselmann
    @type filename: string
650 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
651 989a8bee Michael Hanselmann
    @type job_load_fn: callable
652 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
653 989a8bee Michael Hanselmann
    @type fields: list of strings
654 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
655 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
656 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
657 989a8bee Michael Hanselmann
    @type prev_log_serial: int
658 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
659 989a8bee Michael Hanselmann
    @type timeout: float
660 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
661 989a8bee Michael Hanselmann

662 989a8bee Michael Hanselmann
    """
663 6c2549d6 Guido Trotter
    try:
664 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
665 989a8bee Michael Hanselmann
      waiter = _JobChangesWaiter(filename)
666 989a8bee Michael Hanselmann
      try:
667 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
668 989a8bee Michael Hanselmann
                                          job_load_fn, check_fn),
669 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
670 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
671 989a8bee Michael Hanselmann
      finally:
672 989a8bee Michael Hanselmann
        waiter.Close()
673 6c2549d6 Guido Trotter
    except (errors.InotifyError, errors.JobLost):
674 6c2549d6 Guido Trotter
      return None
675 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
676 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
677 6c2549d6 Guido Trotter
678 6c2549d6 Guido Trotter
679 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
680 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
681 6760e4ed Michael Hanselmann

682 6760e4ed Michael Hanselmann
  """
683 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
684 6760e4ed Michael Hanselmann
    to_encode = err
685 6760e4ed Michael Hanselmann
  else:
686 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
687 6760e4ed Michael Hanselmann
688 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
689 6760e4ed Michael Hanselmann
690 6760e4ed Michael Hanselmann
691 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
692 031a3e57 Michael Hanselmann
  """The actual job workers.
693 031a3e57 Michael Hanselmann

694 031a3e57 Michael Hanselmann
  """
695 7260cfbe Iustin Pop
  def RunTask(self, job): # pylint: disable-msg=W0221
696 e2715f69 Michael Hanselmann
    """Job executor.
697 e2715f69 Michael Hanselmann

698 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
699 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
700 e2715f69 Michael Hanselmann

701 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
702 ea03467c Iustin Pop
    @param job: the job to be processed
703 ea03467c Iustin Pop

704 e2715f69 Michael Hanselmann
    """
705 daba67c7 Michael Hanselmann
    self.SetTaskName("Job%s" % job.id)
706 daba67c7 Michael Hanselmann
707 02fc74da Michael Hanselmann
    logging.info("Processing job %s", job.id)
708 adfa97e3 Guido Trotter
    proc = mcpu.Processor(self.pool.queue.context, job.id)
709 031a3e57 Michael Hanselmann
    queue = job.queue
710 e2715f69 Michael Hanselmann
    try:
711 85f03e0d Michael Hanselmann
      try:
712 85f03e0d Michael Hanselmann
        count = len(job.ops)
713 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
714 d21d09d6 Iustin Pop
          op_summary = op.input.Summary()
715 f6424741 Iustin Pop
          if op.status == constants.OP_STATUS_SUCCESS:
716 f6424741 Iustin Pop
            # this is a job that was partially completed before master
717 f6424741 Iustin Pop
            # daemon shutdown, so it can be expected that some opcodes
718 f6424741 Iustin Pop
            # are already completed successfully (if any did error
719 f6424741 Iustin Pop
            # out, then the whole job should have been aborted and not
720 f6424741 Iustin Pop
            # resubmitted for processing)
721 f6424741 Iustin Pop
            logging.info("Op %s/%s: opcode %s already processed, skipping",
722 f6424741 Iustin Pop
                         idx + 1, count, op_summary)
723 f6424741 Iustin Pop
            continue
724 85f03e0d Michael Hanselmann
          try:
725 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
726 d21d09d6 Iustin Pop
                         op_summary)
727 85f03e0d Michael Hanselmann
728 3c0d60d0 Guido Trotter
            queue.acquire(shared=1)
729 85f03e0d Michael Hanselmann
            try:
730 df0fb067 Iustin Pop
              if op.status == constants.OP_STATUS_CANCELED:
731 e35344b4 Michael Hanselmann
                logging.debug("Canceling opcode")
732 df0fb067 Iustin Pop
                raise CancelJob()
733 fbf0262f Michael Hanselmann
              assert op.status == constants.OP_STATUS_QUEUED
734 e35344b4 Michael Hanselmann
              logging.debug("Opcode %s/%s waiting for locks",
735 e35344b4 Michael Hanselmann
                            idx + 1, count)
736 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
737 85f03e0d Michael Hanselmann
              op.result = None
738 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
739 c56ec146 Iustin Pop
              if idx == 0: # first opcode
740 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
741 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
742 85f03e0d Michael Hanselmann
743 38206f3c Iustin Pop
              input_opcode = op.input
744 85f03e0d Michael Hanselmann
            finally:
745 85f03e0d Michael Hanselmann
              queue.release()
746 85f03e0d Michael Hanselmann
747 031a3e57 Michael Hanselmann
            # Make sure not to hold queue lock while calling ExecOpCode
748 031a3e57 Michael Hanselmann
            result = proc.ExecOpCode(input_opcode,
749 ef2df7d3 Michael Hanselmann
                                     _OpExecCallbacks(queue, job, op))
750 85f03e0d Michael Hanselmann
751 3c0d60d0 Guido Trotter
            queue.acquire(shared=1)
752 85f03e0d Michael Hanselmann
            try:
753 e35344b4 Michael Hanselmann
              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
754 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
755 85f03e0d Michael Hanselmann
              op.result = result
756 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
757 6ea72e43 Michael Hanselmann
              if idx == count - 1:
758 6ea72e43 Michael Hanselmann
                job.end_timestamp = TimeStampNow()
759 963a068b Michael Hanselmann
760 963a068b Michael Hanselmann
                # Consistency check
761 963a068b Michael Hanselmann
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
762 963a068b Michael Hanselmann
                                  for i in job.ops)
763 963a068b Michael Hanselmann
764 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
765 85f03e0d Michael Hanselmann
            finally:
766 85f03e0d Michael Hanselmann
              queue.release()
767 85f03e0d Michael Hanselmann
768 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Successfully finished opcode %s",
769 d21d09d6 Iustin Pop
                         idx + 1, count, op_summary)
770 fbf0262f Michael Hanselmann
          except CancelJob:
771 fbf0262f Michael Hanselmann
            # Will be handled further up
772 fbf0262f Michael Hanselmann
            raise
773 85f03e0d Michael Hanselmann
          except Exception, err:
774 3c0d60d0 Guido Trotter
            queue.acquire(shared=1)
775 85f03e0d Michael Hanselmann
            try:
776 85f03e0d Michael Hanselmann
              try:
777 e35344b4 Michael Hanselmann
                logging.debug("Opcode %s/%s failed", idx + 1, count)
778 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
779 6760e4ed Michael Hanselmann
                op.result = _EncodeOpError(err)
780 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
781 0f6be82a Iustin Pop
                logging.info("Op %s/%s: Error in opcode %s: %s",
782 0f6be82a Iustin Pop
                             idx + 1, count, op_summary, err)
783 963a068b Michael Hanselmann
784 963a068b Michael Hanselmann
                to_encode = errors.OpExecError("Preceding opcode failed")
785 963a068b Michael Hanselmann
                job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
786 6760e4ed Michael Hanselmann
                                      _EncodeOpError(to_encode))
787 963a068b Michael Hanselmann
788 963a068b Michael Hanselmann
                # Consistency check
789 963a068b Michael Hanselmann
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
790 963a068b Michael Hanselmann
                                  for i in job.ops[:idx])
791 963a068b Michael Hanselmann
                assert compat.all(i.status == constants.OP_STATUS_ERROR and
792 963a068b Michael Hanselmann
                                  errors.GetEncodedError(i.result)
793 963a068b Michael Hanselmann
                                  for i in job.ops[idx:])
794 85f03e0d Michael Hanselmann
              finally:
795 6ea72e43 Michael Hanselmann
                job.end_timestamp = TimeStampNow()
796 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
797 85f03e0d Michael Hanselmann
            finally:
798 85f03e0d Michael Hanselmann
              queue.release()
799 85f03e0d Michael Hanselmann
            raise
800 85f03e0d Michael Hanselmann
801 fbf0262f Michael Hanselmann
      except CancelJob:
802 3c0d60d0 Guido Trotter
        queue.acquire(shared=1)
803 fbf0262f Michael Hanselmann
        try:
804 39ed3a98 Guido Trotter
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
805 39ed3a98 Guido Trotter
                                "Job canceled by request")
806 6ea72e43 Michael Hanselmann
          job.end_timestamp = TimeStampNow()
807 6ea72e43 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
808 fbf0262f Michael Hanselmann
        finally:
809 fbf0262f Michael Hanselmann
          queue.release()
810 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
811 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
812 85f03e0d Michael Hanselmann
      except:
813 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
814 e2715f69 Michael Hanselmann
    finally:
815 6ea72e43 Michael Hanselmann
      status = job.CalcStatus()
816 6ea72e43 Michael Hanselmann
      logging.info("Finished job %s, status = %s", job.id, status)
817 e2715f69 Michael Hanselmann
818 e2715f69 Michael Hanselmann
819 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
820 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
821 ea03467c Iustin Pop

822 ea03467c Iustin Pop
  """
823 5bdce580 Michael Hanselmann
  def __init__(self, queue):
824 89e2b4d2 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
825 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
826 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
827 5bdce580 Michael Hanselmann
    self.queue = queue
828 e2715f69 Michael Hanselmann
829 e2715f69 Michael Hanselmann
830 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
831 6c881c52 Iustin Pop
  """Decorator for "public" functions.
832 ea03467c Iustin Pop

833 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
834 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
835 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
836 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
837 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
838 ea03467c Iustin Pop

839 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
840 f1da30e6 Michael Hanselmann

841 6c881c52 Iustin Pop
  Example::
842 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
843 6c881c52 Iustin Pop
    @_RequireOpenQueue
844 6c881c52 Iustin Pop
    def Example(self):
845 6c881c52 Iustin Pop
      pass
846 db37da70 Michael Hanselmann

847 6c881c52 Iustin Pop
  """
848 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
849 7260cfbe Iustin Pop
    # pylint: disable-msg=W0212
850 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
851 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
852 6c881c52 Iustin Pop
  return wrapper
853 db37da70 Michael Hanselmann
854 db37da70 Michael Hanselmann
855 6c881c52 Iustin Pop
class JobQueue(object):
856 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
857 db37da70 Michael Hanselmann

858 6c881c52 Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
859 6c881c52 Iustin Pop

860 6c881c52 Iustin Pop
  """
861 6c881c52 Iustin Pop
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
862 db37da70 Michael Hanselmann
863 85f03e0d Michael Hanselmann
  def __init__(self, context):
864 ea03467c Iustin Pop
    """Constructor for JobQueue.
865 ea03467c Iustin Pop

866 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
867 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
868 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
869 ea03467c Iustin Pop
    running).
870 ea03467c Iustin Pop

871 ea03467c Iustin Pop
    @type context: GanetiContext
872 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
873 ea03467c Iustin Pop
        data and other ganeti objects
874 ea03467c Iustin Pop

875 ea03467c Iustin Pop
    """
876 5bdce580 Michael Hanselmann
    self.context = context
877 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
878 a744b676 Manuel Franceschini
    self._my_hostname = netutils.HostInfo().name
879 f1da30e6 Michael Hanselmann
880 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
881 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
882 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
883 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
884 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
885 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
886 ebb80afa Guido Trotter
887 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
888 ebb80afa Guido Trotter
    self.release = self._lock.release
889 85f03e0d Michael Hanselmann
890 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
891 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
892 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
893 f1da30e6 Michael Hanselmann
894 04ab05ce Michael Hanselmann
    # Read serial file
895 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
896 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
897 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
898 c4beba1c Iustin Pop
899 23752136 Michael Hanselmann
    # Get initial list of nodes
900 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
901 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
902 59303563 Iustin Pop
                       if n.master_candidate)
903 8e00939c Michael Hanselmann
904 8e00939c Michael Hanselmann
    # Remove master node
905 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
906 23752136 Michael Hanselmann
907 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
908 23752136 Michael Hanselmann
909 20571a26 Guido Trotter
    self._queue_size = 0
910 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
911 20571a26 Guido Trotter
    self._drained = self._IsQueueMarkedDrain()
912 20571a26 Guido Trotter
913 85f03e0d Michael Hanselmann
    # Setup worker pool
914 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
915 85f03e0d Michael Hanselmann
    try:
916 16714921 Michael Hanselmann
      # We need to lock here because WorkerPool.AddTask() may start a job while
917 16714921 Michael Hanselmann
      # we're still doing our work.
918 16714921 Michael Hanselmann
      self.acquire()
919 16714921 Michael Hanselmann
      try:
920 711b5124 Michael Hanselmann
        logging.info("Inspecting job queue")
921 711b5124 Michael Hanselmann
922 711b5124 Michael Hanselmann
        all_job_ids = self._GetJobIDsUnlocked()
923 b7cb9024 Michael Hanselmann
        jobs_count = len(all_job_ids)
924 711b5124 Michael Hanselmann
        lastinfo = time.time()
925 711b5124 Michael Hanselmann
        for idx, job_id in enumerate(all_job_ids):
926 711b5124 Michael Hanselmann
          # Give an update every 1000 jobs or 10 seconds
927 b7cb9024 Michael Hanselmann
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
928 b7cb9024 Michael Hanselmann
              idx == (jobs_count - 1)):
929 711b5124 Michael Hanselmann
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
930 b7cb9024 Michael Hanselmann
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
931 711b5124 Michael Hanselmann
            lastinfo = time.time()
932 711b5124 Michael Hanselmann
933 711b5124 Michael Hanselmann
          job = self._LoadJobUnlocked(job_id)
934 711b5124 Michael Hanselmann
935 16714921 Michael Hanselmann
          # a failure in loading the job can cause 'None' to be returned
936 16714921 Michael Hanselmann
          if job is None:
937 16714921 Michael Hanselmann
            continue
938 94ed59a5 Iustin Pop
939 16714921 Michael Hanselmann
          status = job.CalcStatus()
940 85f03e0d Michael Hanselmann
941 16714921 Michael Hanselmann
          if status in (constants.JOB_STATUS_QUEUED, ):
942 b2e8a4d9 Michael Hanselmann
            self._wpool.AddTask((job, ))
943 85f03e0d Michael Hanselmann
944 16714921 Michael Hanselmann
          elif status in (constants.JOB_STATUS_RUNNING,
945 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK,
946 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_CANCELING):
947 16714921 Michael Hanselmann
            logging.warning("Unfinished job %s found: %s", job.id, job)
948 39ed3a98 Guido Trotter
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
949 39ed3a98 Guido Trotter
                                  "Unclean master daemon shutdown")
950 711b5124 Michael Hanselmann
951 711b5124 Michael Hanselmann
        logging.info("Job queue inspection finished")
952 16714921 Michael Hanselmann
      finally:
953 16714921 Michael Hanselmann
        self.release()
954 16714921 Michael Hanselmann
    except:
955 16714921 Michael Hanselmann
      self._wpool.TerminateWorkers()
956 16714921 Michael Hanselmann
      raise
957 85f03e0d Michael Hanselmann
958 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
959 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
960 99aabbed Iustin Pop
  def AddNode(self, node):
961 99aabbed Iustin Pop
    """Register a new node with the queue.
962 99aabbed Iustin Pop

963 99aabbed Iustin Pop
    @type node: L{objects.Node}
964 99aabbed Iustin Pop
    @param node: the node object to be added
965 99aabbed Iustin Pop

966 99aabbed Iustin Pop
    """
967 99aabbed Iustin Pop
    node_name = node.name
968 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
969 23752136 Michael Hanselmann
970 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
971 c8457ce7 Iustin Pop
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
972 3cebe102 Michael Hanselmann
    msg = result.fail_msg
973 c8457ce7 Iustin Pop
    if msg:
974 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
975 c8457ce7 Iustin Pop
                      node_name, msg)
976 23752136 Michael Hanselmann
977 59303563 Iustin Pop
    if not node.master_candidate:
978 59303563 Iustin Pop
      # remove if existing, ignoring errors
979 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
980 59303563 Iustin Pop
      # and skip the replication of the job ids
981 59303563 Iustin Pop
      return
982 59303563 Iustin Pop
983 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
984 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
985 23752136 Michael Hanselmann
986 d2e03a33 Michael Hanselmann
    # Upload current serial file
987 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
988 d2e03a33 Michael Hanselmann
989 d2e03a33 Michael Hanselmann
    for file_name in files:
990 9f774ee8 Michael Hanselmann
      # Read file content
991 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
992 9f774ee8 Michael Hanselmann
993 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
994 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
995 a3811745 Michael Hanselmann
                                                  file_name, content)
996 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
997 c8457ce7 Iustin Pop
      if msg:
998 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
999 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1000 d2e03a33 Michael Hanselmann
1001 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1002 d2e03a33 Michael Hanselmann
1003 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1004 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1005 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1006 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1007 ea03467c Iustin Pop

1008 ea03467c Iustin Pop
    @type node_name: str
1009 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1010 ea03467c Iustin Pop

1011 ea03467c Iustin Pop
    """
1012 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1013 23752136 Michael Hanselmann
1014 7e950d31 Iustin Pop
  @staticmethod
1015 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1016 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1017 ea03467c Iustin Pop

1018 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1019 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1020 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1021 ea03467c Iustin Pop

1022 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1023 ea03467c Iustin Pop
    @type nodes: list
1024 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1025 ea03467c Iustin Pop
    @type failmsg: str
1026 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1027 ea03467c Iustin Pop

1028 ea03467c Iustin Pop
    """
1029 e74798c1 Michael Hanselmann
    failed = []
1030 e74798c1 Michael Hanselmann
    success = []
1031 e74798c1 Michael Hanselmann
1032 e74798c1 Michael Hanselmann
    for node in nodes:
1033 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1034 c8457ce7 Iustin Pop
      if msg:
1035 e74798c1 Michael Hanselmann
        failed.append(node)
1036 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1037 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1038 c8457ce7 Iustin Pop
      else:
1039 c8457ce7 Iustin Pop
        success.append(node)
1040 e74798c1 Michael Hanselmann
1041 e74798c1 Michael Hanselmann
    # +1 for the master node
1042 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1043 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1044 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1045 e74798c1 Michael Hanselmann
1046 99aabbed Iustin Pop
  def _GetNodeIp(self):
1047 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1048 99aabbed Iustin Pop

1049 ea03467c Iustin Pop
    @rtype: (list, list)
1050 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1051 ea03467c Iustin Pop
        names and the second one with the node addresses
1052 ea03467c Iustin Pop

1053 99aabbed Iustin Pop
    """
1054 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1055 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1056 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1057 99aabbed Iustin Pop
    return name_list, addr_list
1058 99aabbed Iustin Pop
1059 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1060 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1061 8e00939c Michael Hanselmann

1062 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1063 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1064 ea03467c Iustin Pop

1065 ea03467c Iustin Pop
    @type file_name: str
1066 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1067 ea03467c Iustin Pop
    @type data: str
1068 ea03467c Iustin Pop
    @param data: the new contents of the file
1069 4c36bdf5 Guido Trotter
    @type replicate: boolean
1070 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1071 ea03467c Iustin Pop

1072 8e00939c Michael Hanselmann
    """
1073 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
1074 8e00939c Michael Hanselmann
1075 4c36bdf5 Guido Trotter
    if replicate:
1076 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1077 4c36bdf5 Guido Trotter
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1078 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1079 23752136 Michael Hanselmann
1080 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1081 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1082 ea03467c Iustin Pop

1083 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1084 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1085 ea03467c Iustin Pop

1086 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1087 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1088 ea03467c Iustin Pop

1089 ea03467c Iustin Pop
    """
1090 dd875d32 Michael Hanselmann
    # Rename them locally
1091 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1092 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1093 abc1f2ce Michael Hanselmann
1094 dd875d32 Michael Hanselmann
    # ... and on all nodes
1095 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1096 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1097 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1098 abc1f2ce Michael Hanselmann
1099 7e950d31 Iustin Pop
  @staticmethod
1100 7e950d31 Iustin Pop
  def _FormatJobID(job_id):
1101 ea03467c Iustin Pop
    """Convert a job ID to string format.
1102 ea03467c Iustin Pop

1103 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
1104 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
1105 ea03467c Iustin Pop
    abstract this change.
1106 ea03467c Iustin Pop

1107 ea03467c Iustin Pop
    @type job_id: int or long
1108 ea03467c Iustin Pop
    @param job_id: the numeric job id
1109 ea03467c Iustin Pop
    @rtype: str
1110 ea03467c Iustin Pop
    @return: the formatted job id
1111 ea03467c Iustin Pop

1112 ea03467c Iustin Pop
    """
1113 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
1114 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1115 85f03e0d Michael Hanselmann
    if job_id < 0:
1116 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1117 85f03e0d Michael Hanselmann
1118 85f03e0d Michael Hanselmann
    return str(job_id)
1119 85f03e0d Michael Hanselmann
1120 58b22b6e Michael Hanselmann
  @classmethod
1121 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
1122 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
1123 58b22b6e Michael Hanselmann

1124 58b22b6e Michael Hanselmann
    @type job_id: str
1125 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
1126 58b22b6e Michael Hanselmann
    @rtype: str
1127 58b22b6e Michael Hanselmann
    @return: Directory name
1128 58b22b6e Michael Hanselmann

1129 58b22b6e Michael Hanselmann
    """
1130 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1131 58b22b6e Michael Hanselmann
1132 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1133 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1134 f1da30e6 Michael Hanselmann

1135 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1136 f1da30e6 Michael Hanselmann

1137 009e73d0 Iustin Pop
    @type count: integer
1138 009e73d0 Iustin Pop
    @param count: how many serials to return
1139 ea03467c Iustin Pop
    @rtype: str
1140 ea03467c Iustin Pop
    @return: a string representing the job identifier.
1141 f1da30e6 Michael Hanselmann

1142 f1da30e6 Michael Hanselmann
    """
1143 009e73d0 Iustin Pop
    assert count > 0
1144 f1da30e6 Michael Hanselmann
    # New number
1145 009e73d0 Iustin Pop
    serial = self._last_serial + count
1146 f1da30e6 Michael Hanselmann
1147 f1da30e6 Michael Hanselmann
    # Write to file
1148 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1149 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1150 f1da30e6 Michael Hanselmann
1151 009e73d0 Iustin Pop
    result = [self._FormatJobID(v)
1152 009e73d0 Iustin Pop
              for v in range(self._last_serial, serial + 1)]
1153 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1154 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1155 f1da30e6 Michael Hanselmann
1156 009e73d0 Iustin Pop
    return result
1157 f1da30e6 Michael Hanselmann
1158 85f03e0d Michael Hanselmann
  @staticmethod
1159 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1160 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1161 ea03467c Iustin Pop

1162 ea03467c Iustin Pop
    @type job_id: str
1163 ea03467c Iustin Pop
    @param job_id: the job identifier
1164 ea03467c Iustin Pop
    @rtype: str
1165 ea03467c Iustin Pop
    @return: the path to the job file
1166 ea03467c Iustin Pop

1167 ea03467c Iustin Pop
    """
1168 c4feafe8 Iustin Pop
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1169 f1da30e6 Michael Hanselmann
1170 58b22b6e Michael Hanselmann
  @classmethod
1171 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
1172 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1173 ea03467c Iustin Pop

1174 ea03467c Iustin Pop
    @type job_id: str
1175 ea03467c Iustin Pop
    @param job_id: the job identifier
1176 ea03467c Iustin Pop
    @rtype: str
1177 ea03467c Iustin Pop
    @return: the path to the archived job file
1178 ea03467c Iustin Pop

1179 ea03467c Iustin Pop
    """
1180 0411c011 Iustin Pop
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1181 0411c011 Iustin Pop
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1182 0cb94105 Michael Hanselmann
1183 85a1c57d Guido Trotter
  def _GetJobIDsUnlocked(self, sort=True):
1184 911a495b Iustin Pop
    """Return all known job IDs.
1185 911a495b Iustin Pop

1186 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1187 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1188 ac0930b9 Iustin Pop
    extra IDs).
1189 ac0930b9 Iustin Pop

1190 85a1c57d Guido Trotter
    @type sort: boolean
1191 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1192 ea03467c Iustin Pop
    @rtype: list
1193 ea03467c Iustin Pop
    @return: the list of job IDs
1194 ea03467c Iustin Pop

1195 911a495b Iustin Pop
    """
1196 85a1c57d Guido Trotter
    jlist = []
1197 b5b8309d Guido Trotter
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1198 85a1c57d Guido Trotter
      m = self._RE_JOB_FILE.match(filename)
1199 85a1c57d Guido Trotter
      if m:
1200 85a1c57d Guido Trotter
        jlist.append(m.group(1))
1201 85a1c57d Guido Trotter
    if sort:
1202 85a1c57d Guido Trotter
      jlist = utils.NiceSort(jlist)
1203 f0d874fe Iustin Pop
    return jlist
1204 911a495b Iustin Pop
1205 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1206 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1207 ea03467c Iustin Pop

1208 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1209 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1210 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1211 ea03467c Iustin Pop

1212 ea03467c Iustin Pop
    @param job_id: the job id
1213 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1214 ea03467c Iustin Pop
    @return: either None or the job object
1215 ea03467c Iustin Pop

1216 ea03467c Iustin Pop
    """
1217 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
1218 5685c1a5 Michael Hanselmann
    if job:
1219 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
1220 5685c1a5 Michael Hanselmann
      return job
1221 ac0930b9 Iustin Pop
1222 3d6c5566 Guido Trotter
    try:
1223 3d6c5566 Guido Trotter
      job = self._LoadJobFromDisk(job_id)
1224 aa9f8167 Iustin Pop
      if job is None:
1225 aa9f8167 Iustin Pop
        return job
1226 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
1227 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
1228 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
1229 3d6c5566 Guido Trotter
      if old_path == new_path:
1230 3d6c5566 Guido Trotter
        # job already archived (future case)
1231 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
1232 3d6c5566 Guido Trotter
      else:
1233 3d6c5566 Guido Trotter
        # non-archived case
1234 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
1235 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
1236 3d6c5566 Guido Trotter
      return None
1237 162c8636 Guido Trotter
1238 162c8636 Guido Trotter
    self._memcache[job_id] = job
1239 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
1240 162c8636 Guido Trotter
    return job
1241 162c8636 Guido Trotter
1242 162c8636 Guido Trotter
  def _LoadJobFromDisk(self, job_id):
1243 162c8636 Guido Trotter
    """Load the given job file from disk.
1244 162c8636 Guido Trotter

1245 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1246 162c8636 Guido Trotter

1247 162c8636 Guido Trotter
    @type job_id: string
1248 162c8636 Guido Trotter
    @param job_id: job identifier
1249 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
1250 162c8636 Guido Trotter
    @return: either None or the job object
1251 162c8636 Guido Trotter

1252 162c8636 Guido Trotter
    """
1253 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
1254 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
1255 f1da30e6 Michael Hanselmann
    try:
1256 13998ef2 Michael Hanselmann
      raw_data = utils.ReadFile(filepath)
1257 162c8636 Guido Trotter
    except EnvironmentError, err:
1258 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
1259 f1da30e6 Michael Hanselmann
        return None
1260 f1da30e6 Michael Hanselmann
      raise
1261 13998ef2 Michael Hanselmann
1262 94ed59a5 Iustin Pop
    try:
1263 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
1264 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
1265 7260cfbe Iustin Pop
    except Exception, err: # pylint: disable-msg=W0703
1266 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
1267 94ed59a5 Iustin Pop
1268 ac0930b9 Iustin Pop
    return job
1269 f1da30e6 Michael Hanselmann
1270 0f9c08dc Guido Trotter
  def SafeLoadJobFromDisk(self, job_id):
1271 0f9c08dc Guido Trotter
    """Load the given job file from disk.
1272 0f9c08dc Guido Trotter

1273 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1274 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
1275 0f9c08dc Guido Trotter
    exception is logged.
1276 0f9c08dc Guido Trotter

1277 0f9c08dc Guido Trotter
    @type job_id: string
1278 0f9c08dc Guido Trotter
    @param job_id: job identifier
1279 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
1280 0f9c08dc Guido Trotter
    @return: either None or the job object
1281 0f9c08dc Guido Trotter

1282 0f9c08dc Guido Trotter
    """
1283 0f9c08dc Guido Trotter
    try:
1284 0f9c08dc Guido Trotter
      return self._LoadJobFromDisk(job_id)
1285 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
1286 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
1287 0f9c08dc Guido Trotter
      return None
1288 0f9c08dc Guido Trotter
1289 686d7433 Iustin Pop
  @staticmethod
1290 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
1291 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
1292 686d7433 Iustin Pop

1293 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
1294 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
1295 686d7433 Iustin Pop

1296 ea03467c Iustin Pop
    @rtype: boolean
1297 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
1298 ea03467c Iustin Pop

1299 686d7433 Iustin Pop
    """
1300 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1301 686d7433 Iustin Pop
1302 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
1303 20571a26 Guido Trotter
    """Update the queue size.
1304 20571a26 Guido Trotter

1305 20571a26 Guido Trotter
    """
1306 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1307 20571a26 Guido Trotter
1308 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1309 20571a26 Guido Trotter
  @_RequireOpenQueue
1310 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
1311 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
1312 3ccafd0e Iustin Pop

1313 ea03467c Iustin Pop
    @type drain_flag: boolean
1314 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
1315 ea03467c Iustin Pop

1316 3ccafd0e Iustin Pop
    """
1317 3ccafd0e Iustin Pop
    if drain_flag:
1318 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1319 3ccafd0e Iustin Pop
    else:
1320 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1321 20571a26 Guido Trotter
1322 20571a26 Guido Trotter
    self._drained = drain_flag
1323 20571a26 Guido Trotter
1324 3ccafd0e Iustin Pop
    return True
1325 3ccafd0e Iustin Pop
1326 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1327 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
1328 85f03e0d Michael Hanselmann
    """Create and store a new job.
1329 f1da30e6 Michael Hanselmann

1330 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
1331 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
1332 c3f0a12f Iustin Pop

1333 009e73d0 Iustin Pop
    @type job_id: job ID
1334 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
1335 c3f0a12f Iustin Pop
    @type ops: list
1336 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
1337 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
1338 7beb1e53 Guido Trotter
    @return: the job object to be queued
1339 7beb1e53 Guido Trotter
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1340 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1341 c3f0a12f Iustin Pop

1342 c3f0a12f Iustin Pop
    """
1343 20571a26 Guido Trotter
    # Ok when sharing the big job queue lock, as the drain file is created when
1344 20571a26 Guido Trotter
    # the lock is exclusive.
1345 20571a26 Guido Trotter
    if self._drained:
1346 2971c913 Iustin Pop
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1347 f87b405e Michael Hanselmann
1348 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1349 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
1350 f87b405e Michael Hanselmann
1351 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
1352 f1da30e6 Michael Hanselmann
1353 f1da30e6 Michael Hanselmann
    # Write to disk
1354 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
1355 f1da30e6 Michael Hanselmann
1356 20571a26 Guido Trotter
    self._queue_size += 1
1357 20571a26 Guido Trotter
1358 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
1359 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
1360 ac0930b9 Iustin Pop
1361 7beb1e53 Guido Trotter
    return job
1362 f1da30e6 Michael Hanselmann
1363 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1364 2971c913 Iustin Pop
  @_RequireOpenQueue
1365 2971c913 Iustin Pop
  def SubmitJob(self, ops):
1366 2971c913 Iustin Pop
    """Create and store a new job.
1367 2971c913 Iustin Pop

1368 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1369 2971c913 Iustin Pop

1370 2971c913 Iustin Pop
    """
1371 009e73d0 Iustin Pop
    job_id = self._NewSerialsUnlocked(1)[0]
1372 b2e8a4d9 Michael Hanselmann
    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1373 7beb1e53 Guido Trotter
    return job_id
1374 2971c913 Iustin Pop
1375 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1376 2971c913 Iustin Pop
  @_RequireOpenQueue
1377 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
1378 2971c913 Iustin Pop
    """Create and store multiple jobs.
1379 2971c913 Iustin Pop

1380 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1381 2971c913 Iustin Pop

1382 2971c913 Iustin Pop
    """
1383 2971c913 Iustin Pop
    results = []
1384 7beb1e53 Guido Trotter
    tasks = []
1385 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1386 009e73d0 Iustin Pop
    for job_id, ops in zip(all_job_ids, jobs):
1387 2971c913 Iustin Pop
      try:
1388 7beb1e53 Guido Trotter
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1389 2971c913 Iustin Pop
        status = True
1390 7beb1e53 Guido Trotter
        data = job_id
1391 2971c913 Iustin Pop
      except errors.GenericError, err:
1392 2971c913 Iustin Pop
        data = str(err)
1393 2971c913 Iustin Pop
        status = False
1394 2971c913 Iustin Pop
      results.append((status, data))
1395 7beb1e53 Guido Trotter
    self._wpool.AddManyTasks(tasks)
1396 2971c913 Iustin Pop
1397 2971c913 Iustin Pop
    return results
1398 2971c913 Iustin Pop
1399 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1400 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
1401 ea03467c Iustin Pop
    """Update a job's on disk storage.
1402 ea03467c Iustin Pop

1403 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
1404 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
1405 ea03467c Iustin Pop
    nodes.
1406 ea03467c Iustin Pop

1407 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1408 ea03467c Iustin Pop
    @param job: the changed job
1409 4c36bdf5 Guido Trotter
    @type replicate: boolean
1410 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
1411 ea03467c Iustin Pop

1412 ea03467c Iustin Pop
    """
1413 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
1414 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
1415 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
1416 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
1417 ac0930b9 Iustin Pop
1418 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1419 5c735209 Iustin Pop
                        timeout):
1420 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
1421 6c5a7090 Michael Hanselmann

1422 6c5a7090 Michael Hanselmann
    @type job_id: string
1423 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
1424 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1425 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1426 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1427 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1428 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1429 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1430 5c735209 Iustin Pop
    @type timeout: float
1431 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
1432 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1433 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1434 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1435 ea03467c Iustin Pop

1436 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1437 ea03467c Iustin Pop
        we instead return a special value,
1438 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1439 ea03467c Iustin Pop
        as such by the clients
1440 6c5a7090 Michael Hanselmann

1441 6c5a7090 Michael Hanselmann
    """
1442 989a8bee Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1443 989a8bee Michael Hanselmann
1444 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
1445 989a8bee Michael Hanselmann
1446 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
1447 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
1448 dfe57c22 Michael Hanselmann
1449 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1450 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1451 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1452 188c5e0a Michael Hanselmann
    """Cancels a job.
1453 188c5e0a Michael Hanselmann

1454 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1455 ea03467c Iustin Pop

1456 188c5e0a Michael Hanselmann
    @type job_id: string
1457 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1458 188c5e0a Michael Hanselmann

1459 188c5e0a Michael Hanselmann
    """
1460 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1461 188c5e0a Michael Hanselmann
1462 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1463 188c5e0a Michael Hanselmann
    if not job:
1464 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1465 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1466 fbf0262f Michael Hanselmann
1467 fbf0262f Michael Hanselmann
    job_status = job.CalcStatus()
1468 188c5e0a Michael Hanselmann
1469 fbf0262f Michael Hanselmann
    if job_status not in (constants.JOB_STATUS_QUEUED,
1470 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK):
1471 a9e97393 Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1472 a9e97393 Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1473 fbf0262f Michael Hanselmann
1474 fbf0262f Michael Hanselmann
    if job_status == constants.JOB_STATUS_QUEUED:
1475 39ed3a98 Guido Trotter
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1476 39ed3a98 Guido Trotter
                            "Job canceled by request")
1477 fbf0262f Michael Hanselmann
      return (True, "Job %s canceled" % job.id)
1478 188c5e0a Michael Hanselmann
1479 fbf0262f Michael Hanselmann
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1480 fbf0262f Michael Hanselmann
      # The worker will notice the new status and cancel the job
1481 39ed3a98 Guido Trotter
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1482 fbf0262f Michael Hanselmann
      return (True, "Job %s will be canceled" % job.id)
1483 fbf0262f Michael Hanselmann
1484 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1485 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
1486 d7fd1f28 Michael Hanselmann
    """Archives jobs.
1487 c609f802 Michael Hanselmann

1488 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
1489 25e7b43f Iustin Pop
    @param jobs: Job objects
1490 d7fd1f28 Michael Hanselmann
    @rtype: int
1491 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
1492 c609f802 Michael Hanselmann

1493 c609f802 Michael Hanselmann
    """
1494 d7fd1f28 Michael Hanselmann
    archive_jobs = []
1495 d7fd1f28 Michael Hanselmann
    rename_files = []
1496 d7fd1f28 Michael Hanselmann
    for job in jobs:
1497 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1498 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
1499 d7fd1f28 Michael Hanselmann
        continue
1500 c609f802 Michael Hanselmann
1501 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
1502 c609f802 Michael Hanselmann
1503 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
1504 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
1505 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
1506 c609f802 Michael Hanselmann
1507 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
1508 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
1509 f1da30e6 Michael Hanselmann
1510 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
1511 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
1512 d7fd1f28 Michael Hanselmann
1513 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1514 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
1515 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
1516 20571a26 Guido Trotter
    # archived jobs to fix this.
1517 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1518 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
1519 78d12585 Michael Hanselmann
1520 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1521 07cd723a Iustin Pop
  @_RequireOpenQueue
1522 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1523 07cd723a Iustin Pop
    """Archives a job.
1524 07cd723a Iustin Pop

1525 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1526 ea03467c Iustin Pop

1527 07cd723a Iustin Pop
    @type job_id: string
1528 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1529 78d12585 Michael Hanselmann
    @rtype: bool
1530 78d12585 Michael Hanselmann
    @return: Whether job was archived
1531 07cd723a Iustin Pop

1532 07cd723a Iustin Pop
    """
1533 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
1534 78d12585 Michael Hanselmann
1535 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1536 78d12585 Michael Hanselmann
    if not job:
1537 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1538 78d12585 Michael Hanselmann
      return False
1539 78d12585 Michael Hanselmann
1540 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
1541 07cd723a Iustin Pop
1542 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1543 07cd723a Iustin Pop
  @_RequireOpenQueue
1544 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
1545 07cd723a Iustin Pop
    """Archives all jobs based on age.
1546 07cd723a Iustin Pop

1547 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1548 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1549 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1550 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1551 07cd723a Iustin Pop

1552 07cd723a Iustin Pop
    @type age: int
1553 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1554 07cd723a Iustin Pop

1555 07cd723a Iustin Pop
    """
1556 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1557 07cd723a Iustin Pop
1558 07cd723a Iustin Pop
    now = time.time()
1559 f8ad5591 Michael Hanselmann
    end_time = now + timeout
1560 f8ad5591 Michael Hanselmann
    archived_count = 0
1561 f8ad5591 Michael Hanselmann
    last_touched = 0
1562 f8ad5591 Michael Hanselmann
1563 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
1564 d7fd1f28 Michael Hanselmann
    pending = []
1565 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1566 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
1567 f8ad5591 Michael Hanselmann
1568 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
1569 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
1570 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
1571 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
1572 f8ad5591 Michael Hanselmann
        break
1573 f8ad5591 Michael Hanselmann
1574 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
1575 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1576 f8ad5591 Michael Hanselmann
      if job:
1577 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
1578 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
1579 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
1580 f8ad5591 Michael Hanselmann
          else:
1581 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
1582 07cd723a Iustin Pop
        else:
1583 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
1584 f8ad5591 Michael Hanselmann
1585 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
1586 d7fd1f28 Michael Hanselmann
          pending.append(job)
1587 d7fd1f28 Michael Hanselmann
1588 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
1589 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
1590 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
1591 d7fd1f28 Michael Hanselmann
            pending = []
1592 f8ad5591 Michael Hanselmann
1593 d7fd1f28 Michael Hanselmann
    if pending:
1594 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
1595 07cd723a Iustin Pop
1596 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
1597 07cd723a Iustin Pop
1598 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1599 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1600 e2715f69 Michael Hanselmann

1601 ea03467c Iustin Pop
    @type job_ids: list
1602 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1603 ea03467c Iustin Pop
    @type fields: list
1604 ea03467c Iustin Pop
    @param fields: names of fields to return
1605 ea03467c Iustin Pop
    @rtype: list
1606 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1607 ea03467c Iustin Pop
        the requested fields
1608 e2715f69 Michael Hanselmann

1609 e2715f69 Michael Hanselmann
    """
1610 85f03e0d Michael Hanselmann
    jobs = []
1611 9f7b4967 Guido Trotter
    list_all = False
1612 9f7b4967 Guido Trotter
    if not job_ids:
1613 9f7b4967 Guido Trotter
      # Since files are added to/removed from the queue atomically, there's no
1614 9f7b4967 Guido Trotter
      # risk of getting the job ids in an inconsistent state.
1615 9f7b4967 Guido Trotter
      job_ids = self._GetJobIDsUnlocked()
1616 9f7b4967 Guido Trotter
      list_all = True
1617 e2715f69 Michael Hanselmann
1618 9f7b4967 Guido Trotter
    for job_id in job_ids:
1619 9f7b4967 Guido Trotter
      job = self.SafeLoadJobFromDisk(job_id)
1620 9f7b4967 Guido Trotter
      if job is not None:
1621 6a290889 Guido Trotter
        jobs.append(job.GetInfo(fields))
1622 9f7b4967 Guido Trotter
      elif not list_all:
1623 9f7b4967 Guido Trotter
        jobs.append(None)
1624 e2715f69 Michael Hanselmann
1625 85f03e0d Michael Hanselmann
    return jobs
1626 e2715f69 Michael Hanselmann
1627 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1628 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1629 e2715f69 Michael Hanselmann
  def Shutdown(self):
1630 e2715f69 Michael Hanselmann
    """Stops the job queue.
1631 e2715f69 Michael Hanselmann

1632 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1633 ea03467c Iustin Pop

1634 e2715f69 Michael Hanselmann
    """
1635 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1636 85f03e0d Michael Hanselmann
1637 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
1638 a71f9c7d Guido Trotter
    self._queue_filelock = None