Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ f3044516

History | View | Annotate | Download (49.4 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 7f93570a Iustin Pop
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 f1da30e6 Michael Hanselmann
import errno
35 f1da30e6 Michael Hanselmann
import re
36 f1048938 Iustin Pop
import time
37 5685c1a5 Michael Hanselmann
import weakref
38 498ae1cc Iustin Pop
39 6c2549d6 Guido Trotter
try:
40 6c2549d6 Guido Trotter
  # pylint: disable-msg=E0611
41 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
42 6c2549d6 Guido Trotter
except ImportError:
43 6c2549d6 Guido Trotter
  import pyinotify
44 6c2549d6 Guido Trotter
45 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
46 e2715f69 Michael Hanselmann
from ganeti import constants
47 f1da30e6 Michael Hanselmann
from ganeti import serializer
48 e2715f69 Michael Hanselmann
from ganeti import workerpool
49 99bd4f0a Guido Trotter
from ganeti import locking
50 f1da30e6 Michael Hanselmann
from ganeti import opcodes
51 7a1ecaed Iustin Pop
from ganeti import errors
52 e2715f69 Michael Hanselmann
from ganeti import mcpu
53 7996a135 Iustin Pop
from ganeti import utils
54 04ab05ce Michael Hanselmann
from ganeti import jstore
55 c3f0a12f Iustin Pop
from ganeti import rpc
56 a744b676 Manuel Franceschini
from ganeti import netutils
57 989a8bee Michael Hanselmann
from ganeti import compat
58 e2715f69 Michael Hanselmann
59 fbf0262f Michael Hanselmann
60 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
61 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
62 e2715f69 Michael Hanselmann
63 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
64 ebb80afa Guido Trotter
_LOCK = "_lock"
65 ebb80afa Guido Trotter
_QUEUE = "_queue"
66 99bd4f0a Guido Trotter
67 498ae1cc Iustin Pop
68 9728ae5d Iustin Pop
class CancelJob(Exception):
69 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
70 fbf0262f Michael Hanselmann

71 fbf0262f Michael Hanselmann
  """
72 fbf0262f Michael Hanselmann
73 fbf0262f Michael Hanselmann
74 70552c46 Michael Hanselmann
def TimeStampNow():
75 ea03467c Iustin Pop
  """Returns the current timestamp.
76 ea03467c Iustin Pop

77 ea03467c Iustin Pop
  @rtype: tuple
78 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
79 ea03467c Iustin Pop

80 ea03467c Iustin Pop
  """
81 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
82 70552c46 Michael Hanselmann
83 70552c46 Michael Hanselmann
84 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
85 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
86 e2715f69 Michael Hanselmann

87 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
88 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
89 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
90 ea03467c Iustin Pop
  @ivar status: the current status
91 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
92 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
93 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
95 f1048938 Iustin Pop

96 e2715f69 Michael Hanselmann
  """
97 66d895a8 Iustin Pop
  __slots__ = ["input", "status", "result", "log",
98 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
99 66d895a8 Iustin Pop
               "__weakref__"]
100 66d895a8 Iustin Pop
101 85f03e0d Michael Hanselmann
  def __init__(self, op):
102 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
103 ea03467c Iustin Pop

104 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
105 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
106 ea03467c Iustin Pop

107 ea03467c Iustin Pop
    """
108 85f03e0d Michael Hanselmann
    self.input = op
109 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
110 85f03e0d Michael Hanselmann
    self.result = None
111 85f03e0d Michael Hanselmann
    self.log = []
112 70552c46 Michael Hanselmann
    self.start_timestamp = None
113 b9b5abcb Iustin Pop
    self.exec_timestamp = None
114 70552c46 Michael Hanselmann
    self.end_timestamp = None
115 f1da30e6 Michael Hanselmann
116 f1da30e6 Michael Hanselmann
  @classmethod
117 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
118 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
119 ea03467c Iustin Pop

120 ea03467c Iustin Pop
    @type state: dict
121 ea03467c Iustin Pop
    @param state: the serialized state
122 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
123 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
124 ea03467c Iustin Pop

125 ea03467c Iustin Pop
    """
126 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
127 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128 85f03e0d Michael Hanselmann
    obj.status = state["status"]
129 85f03e0d Michael Hanselmann
    obj.result = state["result"]
130 85f03e0d Michael Hanselmann
    obj.log = state["log"]
131 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
132 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
133 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
134 f1da30e6 Michael Hanselmann
    return obj
135 f1da30e6 Michael Hanselmann
136 f1da30e6 Michael Hanselmann
  def Serialize(self):
137 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
138 ea03467c Iustin Pop

139 ea03467c Iustin Pop
    @rtype: dict
140 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
141 ea03467c Iustin Pop

142 ea03467c Iustin Pop
    """
143 6c5a7090 Michael Hanselmann
    return {
144 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
145 6c5a7090 Michael Hanselmann
      "status": self.status,
146 6c5a7090 Michael Hanselmann
      "result": self.result,
147 6c5a7090 Michael Hanselmann
      "log": self.log,
148 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
149 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
150 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
151 6c5a7090 Michael Hanselmann
      }
152 f1048938 Iustin Pop
153 e2715f69 Michael Hanselmann
154 e2715f69 Michael Hanselmann
class _QueuedJob(object):
155 e2715f69 Michael Hanselmann
  """In-memory job representation.
156 e2715f69 Michael Hanselmann

157 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
158 ea03467c Iustin Pop
  be taken care of by users of this class.
159 ea03467c Iustin Pop

160 ea03467c Iustin Pop
  @type queue: L{JobQueue}
161 ea03467c Iustin Pop
  @ivar queue: the parent queue
162 ea03467c Iustin Pop
  @ivar id: the job ID
163 ea03467c Iustin Pop
  @type ops: list
164 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
165 ea03467c Iustin Pop
  @type log_serial: int
166 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
167 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
168 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
169 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
170 ef2df7d3 Michael Hanselmann
  @ivar lock_status: In-memory locking information for debugging
171 e2715f69 Michael Hanselmann

172 e2715f69 Michael Hanselmann
  """
173 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
174 d25c1d6a Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial",
175 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
176 ef2df7d3 Michael Hanselmann
               "lock_status", "change",
177 66d895a8 Iustin Pop
               "__weakref__"]
178 66d895a8 Iustin Pop
179 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
180 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
181 ea03467c Iustin Pop

182 ea03467c Iustin Pop
    @type queue: L{JobQueue}
183 ea03467c Iustin Pop
    @param queue: our parent queue
184 ea03467c Iustin Pop
    @type job_id: job_id
185 ea03467c Iustin Pop
    @param job_id: our job id
186 ea03467c Iustin Pop
    @type ops: list
187 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
188 ea03467c Iustin Pop
        in _QueuedOpCodes
189 ea03467c Iustin Pop

190 ea03467c Iustin Pop
    """
191 e2715f69 Michael Hanselmann
    if not ops:
192 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
193 e2715f69 Michael Hanselmann
194 85f03e0d Michael Hanselmann
    self.queue = queue
195 f1da30e6 Michael Hanselmann
    self.id = job_id
196 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
197 6c5a7090 Michael Hanselmann
    self.log_serial = 0
198 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
199 c56ec146 Iustin Pop
    self.start_timestamp = None
200 c56ec146 Iustin Pop
    self.end_timestamp = None
201 6c5a7090 Michael Hanselmann
202 ef2df7d3 Michael Hanselmann
    # In-memory attributes
203 ef2df7d3 Michael Hanselmann
    self.lock_status = None
204 ef2df7d3 Michael Hanselmann
205 9fa2e150 Michael Hanselmann
  def __repr__(self):
206 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
207 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
208 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
209 9fa2e150 Michael Hanselmann
210 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
211 9fa2e150 Michael Hanselmann
212 f1da30e6 Michael Hanselmann
  @classmethod
213 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
214 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
215 ea03467c Iustin Pop

216 ea03467c Iustin Pop
    @type queue: L{JobQueue}
217 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
218 ea03467c Iustin Pop
    @type state: dict
219 ea03467c Iustin Pop
    @param state: the serialized state
220 ea03467c Iustin Pop
    @rtype: _JobQueue
221 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
222 ea03467c Iustin Pop

223 ea03467c Iustin Pop
    """
224 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
225 85f03e0d Michael Hanselmann
    obj.queue = queue
226 85f03e0d Michael Hanselmann
    obj.id = state["id"]
227 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
228 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
229 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
230 6c5a7090 Michael Hanselmann
231 ef2df7d3 Michael Hanselmann
    # In-memory attributes
232 ef2df7d3 Michael Hanselmann
    obj.lock_status = None
233 ef2df7d3 Michael Hanselmann
234 6c5a7090 Michael Hanselmann
    obj.ops = []
235 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
236 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
237 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
238 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
239 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
240 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
241 6c5a7090 Michael Hanselmann
242 f1da30e6 Michael Hanselmann
    return obj
243 f1da30e6 Michael Hanselmann
244 f1da30e6 Michael Hanselmann
  def Serialize(self):
245 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
246 ea03467c Iustin Pop

247 ea03467c Iustin Pop
    @rtype: dict
248 ea03467c Iustin Pop
    @return: the serialized state
249 ea03467c Iustin Pop

250 ea03467c Iustin Pop
    """
251 f1da30e6 Michael Hanselmann
    return {
252 f1da30e6 Michael Hanselmann
      "id": self.id,
253 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
254 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
255 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
256 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
257 f1da30e6 Michael Hanselmann
      }
258 f1da30e6 Michael Hanselmann
259 85f03e0d Michael Hanselmann
  def CalcStatus(self):
260 ea03467c Iustin Pop
    """Compute the status of this job.
261 ea03467c Iustin Pop

262 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
263 ea03467c Iustin Pop
    based on their status, computes the job status.
264 ea03467c Iustin Pop

265 ea03467c Iustin Pop
    The algorithm is:
266 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
267 ea03467c Iustin Pop
        status will be the same
268 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
269 ea03467c Iustin Pop
          - waitlock
270 fbf0262f Michael Hanselmann
          - canceling
271 ea03467c Iustin Pop
          - running
272 ea03467c Iustin Pop

273 ea03467c Iustin Pop
        will determine the job status
274 ea03467c Iustin Pop

275 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
276 ea03467c Iustin Pop
        and the job status will be the same
277 ea03467c Iustin Pop

278 ea03467c Iustin Pop
    @return: the job status
279 ea03467c Iustin Pop

280 ea03467c Iustin Pop
    """
281 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
282 e2715f69 Michael Hanselmann
283 e2715f69 Michael Hanselmann
    all_success = True
284 85f03e0d Michael Hanselmann
    for op in self.ops:
285 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
286 e2715f69 Michael Hanselmann
        continue
287 e2715f69 Michael Hanselmann
288 e2715f69 Michael Hanselmann
      all_success = False
289 e2715f69 Michael Hanselmann
290 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
291 e2715f69 Michael Hanselmann
        pass
292 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
293 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
294 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
295 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
296 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
297 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
298 fbf0262f Michael Hanselmann
        break
299 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
300 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
301 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
302 f1da30e6 Michael Hanselmann
        break
303 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
304 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
305 4cb1d919 Michael Hanselmann
        break
306 e2715f69 Michael Hanselmann
307 e2715f69 Michael Hanselmann
    if all_success:
308 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
309 e2715f69 Michael Hanselmann
310 e2715f69 Michael Hanselmann
    return status
311 e2715f69 Michael Hanselmann
312 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
313 ea03467c Iustin Pop
    """Selectively returns the log entries.
314 ea03467c Iustin Pop

315 ea03467c Iustin Pop
    @type newer_than: None or int
316 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
317 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
318 ea03467c Iustin Pop
        than this value
319 ea03467c Iustin Pop
    @rtype: list
320 ea03467c Iustin Pop
    @return: the list of the log entries selected
321 ea03467c Iustin Pop

322 ea03467c Iustin Pop
    """
323 6c5a7090 Michael Hanselmann
    if newer_than is None:
324 6c5a7090 Michael Hanselmann
      serial = -1
325 6c5a7090 Michael Hanselmann
    else:
326 6c5a7090 Michael Hanselmann
      serial = newer_than
327 6c5a7090 Michael Hanselmann
328 6c5a7090 Michael Hanselmann
    entries = []
329 6c5a7090 Michael Hanselmann
    for op in self.ops:
330 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
331 6c5a7090 Michael Hanselmann
332 6c5a7090 Michael Hanselmann
    return entries
333 6c5a7090 Michael Hanselmann
334 6a290889 Guido Trotter
  def GetInfo(self, fields):
335 6a290889 Guido Trotter
    """Returns information about a job.
336 6a290889 Guido Trotter

337 6a290889 Guido Trotter
    @type fields: list
338 6a290889 Guido Trotter
    @param fields: names of fields to return
339 6a290889 Guido Trotter
    @rtype: list
340 6a290889 Guido Trotter
    @return: list with one element for each field
341 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
342 6a290889 Guido Trotter
        has been passed
343 6a290889 Guido Trotter

344 6a290889 Guido Trotter
    """
345 6a290889 Guido Trotter
    row = []
346 6a290889 Guido Trotter
    for fname in fields:
347 6a290889 Guido Trotter
      if fname == "id":
348 6a290889 Guido Trotter
        row.append(self.id)
349 6a290889 Guido Trotter
      elif fname == "status":
350 6a290889 Guido Trotter
        row.append(self.CalcStatus())
351 6a290889 Guido Trotter
      elif fname == "ops":
352 6a290889 Guido Trotter
        row.append([op.input.__getstate__() for op in self.ops])
353 6a290889 Guido Trotter
      elif fname == "opresult":
354 6a290889 Guido Trotter
        row.append([op.result for op in self.ops])
355 6a290889 Guido Trotter
      elif fname == "opstatus":
356 6a290889 Guido Trotter
        row.append([op.status for op in self.ops])
357 6a290889 Guido Trotter
      elif fname == "oplog":
358 6a290889 Guido Trotter
        row.append([op.log for op in self.ops])
359 6a290889 Guido Trotter
      elif fname == "opstart":
360 6a290889 Guido Trotter
        row.append([op.start_timestamp for op in self.ops])
361 6a290889 Guido Trotter
      elif fname == "opexec":
362 6a290889 Guido Trotter
        row.append([op.exec_timestamp for op in self.ops])
363 6a290889 Guido Trotter
      elif fname == "opend":
364 6a290889 Guido Trotter
        row.append([op.end_timestamp for op in self.ops])
365 6a290889 Guido Trotter
      elif fname == "received_ts":
366 6a290889 Guido Trotter
        row.append(self.received_timestamp)
367 6a290889 Guido Trotter
      elif fname == "start_ts":
368 6a290889 Guido Trotter
        row.append(self.start_timestamp)
369 6a290889 Guido Trotter
      elif fname == "end_ts":
370 6a290889 Guido Trotter
        row.append(self.end_timestamp)
371 6a290889 Guido Trotter
      elif fname == "lock_status":
372 6a290889 Guido Trotter
        row.append(self.lock_status)
373 6a290889 Guido Trotter
      elif fname == "summary":
374 6a290889 Guido Trotter
        row.append([op.input.Summary() for op in self.ops])
375 6a290889 Guido Trotter
      else:
376 6a290889 Guido Trotter
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
377 6a290889 Guido Trotter
    return row
378 6a290889 Guido Trotter
379 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
380 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
381 34327f51 Iustin Pop

382 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
383 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
384 34327f51 Iustin Pop
    finalised are not changed.
385 34327f51 Iustin Pop

386 34327f51 Iustin Pop
    @param status: a given opcode status
387 34327f51 Iustin Pop
    @param result: the opcode result
388 34327f51 Iustin Pop

389 34327f51 Iustin Pop
    """
390 39ed3a98 Guido Trotter
    try:
391 39ed3a98 Guido Trotter
      not_marked = True
392 39ed3a98 Guido Trotter
      for op in self.ops:
393 39ed3a98 Guido Trotter
        if op.status in constants.OPS_FINALIZED:
394 39ed3a98 Guido Trotter
          assert not_marked, "Finalized opcodes found after non-finalized ones"
395 39ed3a98 Guido Trotter
          continue
396 39ed3a98 Guido Trotter
        op.status = status
397 39ed3a98 Guido Trotter
        op.result = result
398 39ed3a98 Guido Trotter
        not_marked = False
399 39ed3a98 Guido Trotter
    finally:
400 39ed3a98 Guido Trotter
      self.queue.UpdateJobUnlocked(self)
401 34327f51 Iustin Pop
402 f1048938 Iustin Pop
403 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
404 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
405 031a3e57 Michael Hanselmann
    """Initializes this class.
406 ea03467c Iustin Pop

407 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
408 031a3e57 Michael Hanselmann
    @param queue: Job queue
409 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
410 031a3e57 Michael Hanselmann
    @param job: Job object
411 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
412 031a3e57 Michael Hanselmann
    @param op: OpCode
413 031a3e57 Michael Hanselmann

414 031a3e57 Michael Hanselmann
    """
415 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
416 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
417 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
418 031a3e57 Michael Hanselmann
419 031a3e57 Michael Hanselmann
    self._queue = queue
420 031a3e57 Michael Hanselmann
    self._job = job
421 031a3e57 Michael Hanselmann
    self._op = op
422 031a3e57 Michael Hanselmann
423 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
424 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
425 dc1e2262 Michael Hanselmann

426 dc1e2262 Michael Hanselmann
    """
427 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
428 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
429 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
430 dc1e2262 Michael Hanselmann
      raise CancelJob()
431 dc1e2262 Michael Hanselmann
432 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
433 031a3e57 Michael Hanselmann
  def NotifyStart(self):
434 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
435 e92376d7 Iustin Pop

436 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
437 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
438 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
439 031a3e57 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
440 e92376d7 Iustin Pop

441 e92376d7 Iustin Pop
    """
442 271daef8 Iustin Pop
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
443 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
444 fbf0262f Michael Hanselmann
445 271daef8 Iustin Pop
    # All locks are acquired by now
446 271daef8 Iustin Pop
    self._job.lock_status = None
447 ef2df7d3 Michael Hanselmann
448 271daef8 Iustin Pop
    # Cancel here if we were asked to
449 dc1e2262 Michael Hanselmann
    self._CheckCancel()
450 fbf0262f Michael Hanselmann
451 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
452 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
453 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
454 271daef8 Iustin Pop
455 271daef8 Iustin Pop
    # And finally replicate the job status
456 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
457 031a3e57 Michael Hanselmann
458 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
459 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
460 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
461 9bf5e01f Guido Trotter

462 9bf5e01f Guido Trotter
    """
463 9bf5e01f Guido Trotter
    self._job.log_serial += 1
464 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
465 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
466 9bf5e01f Guido Trotter
467 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
468 031a3e57 Michael Hanselmann
    """Append a log entry.
469 031a3e57 Michael Hanselmann

470 031a3e57 Michael Hanselmann
    """
471 031a3e57 Michael Hanselmann
    assert len(args) < 3
472 031a3e57 Michael Hanselmann
473 031a3e57 Michael Hanselmann
    if len(args) == 1:
474 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
475 031a3e57 Michael Hanselmann
      log_msg = args[0]
476 031a3e57 Michael Hanselmann
    else:
477 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
478 031a3e57 Michael Hanselmann
479 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
480 031a3e57 Michael Hanselmann
    # precision.
481 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
482 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
483 031a3e57 Michael Hanselmann
484 ef2df7d3 Michael Hanselmann
  def ReportLocks(self, msg):
485 ef2df7d3 Michael Hanselmann
    """Write locking information to the job.
486 ef2df7d3 Michael Hanselmann

487 ef2df7d3 Michael Hanselmann
    Called whenever the LU processor is waiting for a lock or has acquired one.
488 ef2df7d3 Michael Hanselmann

489 ef2df7d3 Michael Hanselmann
    """
490 dc1e2262 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
491 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
492 dc1e2262 Michael Hanselmann
493 ef2df7d3 Michael Hanselmann
    # Not getting the queue lock because this is a single assignment
494 ef2df7d3 Michael Hanselmann
    self._job.lock_status = msg
495 ef2df7d3 Michael Hanselmann
496 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
497 dc1e2262 Michael Hanselmann
    self._CheckCancel()
498 dc1e2262 Michael Hanselmann
499 031a3e57 Michael Hanselmann
500 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
501 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
502 989a8bee Michael Hanselmann
    """Initializes this class.
503 6c2549d6 Guido Trotter

504 989a8bee Michael Hanselmann
    @type fields: list of strings
505 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
506 989a8bee Michael Hanselmann
    @type prev_job_info: string
507 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
508 989a8bee Michael Hanselmann
    @type prev_log_serial: string
509 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
510 6c2549d6 Guido Trotter

511 989a8bee Michael Hanselmann
    """
512 989a8bee Michael Hanselmann
    self._fields = fields
513 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
514 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
515 6c2549d6 Guido Trotter
516 989a8bee Michael Hanselmann
  def __call__(self, job):
517 989a8bee Michael Hanselmann
    """Checks whether job has changed.
518 6c2549d6 Guido Trotter

519 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
520 989a8bee Michael Hanselmann
    @param job: Job object
521 6c2549d6 Guido Trotter

522 6c2549d6 Guido Trotter
    """
523 989a8bee Michael Hanselmann
    status = job.CalcStatus()
524 989a8bee Michael Hanselmann
    job_info = job.GetInfo(self._fields)
525 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
526 6c2549d6 Guido Trotter
527 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
528 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
529 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
530 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
531 6c2549d6 Guido Trotter
    # significantly different.
532 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
533 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
534 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
535 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
536 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
537 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
538 6c2549d6 Guido Trotter
539 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
540 6c2549d6 Guido Trotter
    # no changes.
541 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
542 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
543 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_WAITLOCK) or
544 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
545 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
546 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
547 989a8bee Michael Hanselmann
      return (job_info, log_entries)
548 6c2549d6 Guido Trotter
549 989a8bee Michael Hanselmann
    return None
550 989a8bee Michael Hanselmann
551 989a8bee Michael Hanselmann
552 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
553 989a8bee Michael Hanselmann
  def __init__(self, filename):
554 989a8bee Michael Hanselmann
    """Initializes this class.
555 989a8bee Michael Hanselmann

556 989a8bee Michael Hanselmann
    @type filename: string
557 989a8bee Michael Hanselmann
    @param filename: Path to job file
558 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
559 6c2549d6 Guido Trotter

560 989a8bee Michael Hanselmann
    """
561 989a8bee Michael Hanselmann
    self._wm = pyinotify.WatchManager()
562 989a8bee Michael Hanselmann
    self._inotify_handler = \
563 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
564 989a8bee Michael Hanselmann
    self._notifier = \
565 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
566 989a8bee Michael Hanselmann
    try:
567 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
568 989a8bee Michael Hanselmann
    except Exception:
569 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
570 989a8bee Michael Hanselmann
      self._notifier.stop()
571 989a8bee Michael Hanselmann
      raise
572 989a8bee Michael Hanselmann
573 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
574 989a8bee Michael Hanselmann
    """Callback for inotify.
575 989a8bee Michael Hanselmann

576 989a8bee Michael Hanselmann
    """
577 6c2549d6 Guido Trotter
    if not notifier_enabled:
578 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
579 989a8bee Michael Hanselmann
580 989a8bee Michael Hanselmann
  def Wait(self, timeout):
581 989a8bee Michael Hanselmann
    """Waits for the job file to change.
582 989a8bee Michael Hanselmann

583 989a8bee Michael Hanselmann
    @type timeout: float
584 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
585 989a8bee Michael Hanselmann
    @return: Whether there have been events
586 989a8bee Michael Hanselmann

587 989a8bee Michael Hanselmann
    """
588 989a8bee Michael Hanselmann
    assert timeout >= 0
589 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
590 989a8bee Michael Hanselmann
    if have_events:
591 989a8bee Michael Hanselmann
      self._notifier.read_events()
592 989a8bee Michael Hanselmann
    self._notifier.process_events()
593 989a8bee Michael Hanselmann
    return have_events
594 989a8bee Michael Hanselmann
595 989a8bee Michael Hanselmann
  def Close(self):
596 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
597 989a8bee Michael Hanselmann

598 989a8bee Michael Hanselmann
    """
599 989a8bee Michael Hanselmann
    self._notifier.stop()
600 989a8bee Michael Hanselmann
601 989a8bee Michael Hanselmann
602 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
603 989a8bee Michael Hanselmann
  def __init__(self, filename):
604 989a8bee Michael Hanselmann
    """Initializes this class.
605 989a8bee Michael Hanselmann

606 989a8bee Michael Hanselmann
    @type filename: string
607 989a8bee Michael Hanselmann
    @param filename: Path to job file
608 989a8bee Michael Hanselmann

609 989a8bee Michael Hanselmann
    """
610 989a8bee Michael Hanselmann
    self._filewaiter = None
611 989a8bee Michael Hanselmann
    self._filename = filename
612 6c2549d6 Guido Trotter
613 989a8bee Michael Hanselmann
  def Wait(self, timeout):
614 989a8bee Michael Hanselmann
    """Waits for a job to change.
615 6c2549d6 Guido Trotter

616 989a8bee Michael Hanselmann
    @type timeout: float
617 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
618 989a8bee Michael Hanselmann
    @return: Whether there have been events
619 989a8bee Michael Hanselmann

620 989a8bee Michael Hanselmann
    """
621 989a8bee Michael Hanselmann
    if self._filewaiter:
622 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
623 989a8bee Michael Hanselmann
624 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
625 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
626 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
627 989a8bee Michael Hanselmann
    # race condition.
628 989a8bee Michael Hanselmann
    self._filewaiter = _JobFileChangesWaiter(self._filename)
629 989a8bee Michael Hanselmann
630 989a8bee Michael Hanselmann
    return True
631 989a8bee Michael Hanselmann
632 989a8bee Michael Hanselmann
  def Close(self):
633 989a8bee Michael Hanselmann
    """Closes underlying waiter.
634 989a8bee Michael Hanselmann

635 989a8bee Michael Hanselmann
    """
636 989a8bee Michael Hanselmann
    if self._filewaiter:
637 989a8bee Michael Hanselmann
      self._filewaiter.Close()
638 989a8bee Michael Hanselmann
639 989a8bee Michael Hanselmann
640 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
641 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
642 989a8bee Michael Hanselmann

643 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
644 989a8bee Michael Hanselmann
  the current job status has changed.
645 989a8bee Michael Hanselmann

646 989a8bee Michael Hanselmann
  """
647 989a8bee Michael Hanselmann
  @staticmethod
648 989a8bee Michael Hanselmann
  def _CheckForChanges(job_load_fn, check_fn):
649 989a8bee Michael Hanselmann
    job = job_load_fn()
650 989a8bee Michael Hanselmann
    if not job:
651 989a8bee Michael Hanselmann
      raise errors.JobLost()
652 989a8bee Michael Hanselmann
653 989a8bee Michael Hanselmann
    result = check_fn(job)
654 989a8bee Michael Hanselmann
    if result is None:
655 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
656 989a8bee Michael Hanselmann
657 989a8bee Michael Hanselmann
    return result
658 989a8bee Michael Hanselmann
659 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
660 989a8bee Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout):
661 989a8bee Michael Hanselmann
    """Waits for changes on a job.
662 989a8bee Michael Hanselmann

663 989a8bee Michael Hanselmann
    @type filename: string
664 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
665 989a8bee Michael Hanselmann
    @type job_load_fn: callable
666 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
667 989a8bee Michael Hanselmann
    @type fields: list of strings
668 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
669 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
670 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
671 989a8bee Michael Hanselmann
    @type prev_log_serial: int
672 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
673 989a8bee Michael Hanselmann
    @type timeout: float
674 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
675 989a8bee Michael Hanselmann

676 989a8bee Michael Hanselmann
    """
677 6c2549d6 Guido Trotter
    try:
678 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
679 989a8bee Michael Hanselmann
      waiter = _JobChangesWaiter(filename)
680 989a8bee Michael Hanselmann
      try:
681 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
682 989a8bee Michael Hanselmann
                                          job_load_fn, check_fn),
683 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
684 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
685 989a8bee Michael Hanselmann
      finally:
686 989a8bee Michael Hanselmann
        waiter.Close()
687 6c2549d6 Guido Trotter
    except (errors.InotifyError, errors.JobLost):
688 6c2549d6 Guido Trotter
      return None
689 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
690 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
691 6c2549d6 Guido Trotter
692 6c2549d6 Guido Trotter
693 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
694 031a3e57 Michael Hanselmann
  """The actual job workers.
695 031a3e57 Michael Hanselmann

696 031a3e57 Michael Hanselmann
  """
697 7260cfbe Iustin Pop
  def RunTask(self, job): # pylint: disable-msg=W0221
698 e2715f69 Michael Hanselmann
    """Job executor.
699 e2715f69 Michael Hanselmann

700 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
701 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
702 e2715f69 Michael Hanselmann

703 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
704 ea03467c Iustin Pop
    @param job: the job to be processed
705 ea03467c Iustin Pop

706 e2715f69 Michael Hanselmann
    """
707 02fc74da Michael Hanselmann
    logging.info("Processing job %s", job.id)
708 adfa97e3 Guido Trotter
    proc = mcpu.Processor(self.pool.queue.context, job.id)
709 031a3e57 Michael Hanselmann
    queue = job.queue
710 e2715f69 Michael Hanselmann
    try:
711 85f03e0d Michael Hanselmann
      try:
712 85f03e0d Michael Hanselmann
        count = len(job.ops)
713 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
714 d21d09d6 Iustin Pop
          op_summary = op.input.Summary()
715 f6424741 Iustin Pop
          if op.status == constants.OP_STATUS_SUCCESS:
716 f6424741 Iustin Pop
            # this is a job that was partially completed before master
717 f6424741 Iustin Pop
            # daemon shutdown, so it can be expected that some opcodes
718 f6424741 Iustin Pop
            # are already completed successfully (if any did error
719 f6424741 Iustin Pop
            # out, then the whole job should have been aborted and not
720 f6424741 Iustin Pop
            # resubmitted for processing)
721 f6424741 Iustin Pop
            logging.info("Op %s/%s: opcode %s already processed, skipping",
722 f6424741 Iustin Pop
                         idx + 1, count, op_summary)
723 f6424741 Iustin Pop
            continue
724 85f03e0d Michael Hanselmann
          try:
725 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
726 d21d09d6 Iustin Pop
                         op_summary)
727 85f03e0d Michael Hanselmann
728 3c0d60d0 Guido Trotter
            queue.acquire(shared=1)
729 85f03e0d Michael Hanselmann
            try:
730 df0fb067 Iustin Pop
              if op.status == constants.OP_STATUS_CANCELED:
731 e35344b4 Michael Hanselmann
                logging.debug("Canceling opcode")
732 df0fb067 Iustin Pop
                raise CancelJob()
733 fbf0262f Michael Hanselmann
              assert op.status == constants.OP_STATUS_QUEUED
734 e35344b4 Michael Hanselmann
              logging.debug("Opcode %s/%s waiting for locks",
735 e35344b4 Michael Hanselmann
                            idx + 1, count)
736 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
737 85f03e0d Michael Hanselmann
              op.result = None
738 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
739 c56ec146 Iustin Pop
              if idx == 0: # first opcode
740 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
741 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
742 85f03e0d Michael Hanselmann
743 38206f3c Iustin Pop
              input_opcode = op.input
744 85f03e0d Michael Hanselmann
            finally:
745 85f03e0d Michael Hanselmann
              queue.release()
746 85f03e0d Michael Hanselmann
747 031a3e57 Michael Hanselmann
            # Make sure not to hold queue lock while calling ExecOpCode
748 031a3e57 Michael Hanselmann
            result = proc.ExecOpCode(input_opcode,
749 ef2df7d3 Michael Hanselmann
                                     _OpExecCallbacks(queue, job, op))
750 85f03e0d Michael Hanselmann
751 3c0d60d0 Guido Trotter
            queue.acquire(shared=1)
752 85f03e0d Michael Hanselmann
            try:
753 e35344b4 Michael Hanselmann
              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
754 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
755 85f03e0d Michael Hanselmann
              op.result = result
756 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
757 6ea72e43 Michael Hanselmann
              if idx == count - 1:
758 6ea72e43 Michael Hanselmann
                job.lock_status = None
759 6ea72e43 Michael Hanselmann
                job.end_timestamp = TimeStampNow()
760 963a068b Michael Hanselmann
761 963a068b Michael Hanselmann
                # Consistency check
762 963a068b Michael Hanselmann
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
763 963a068b Michael Hanselmann
                                  for i in job.ops)
764 963a068b Michael Hanselmann
765 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
766 85f03e0d Michael Hanselmann
            finally:
767 85f03e0d Michael Hanselmann
              queue.release()
768 85f03e0d Michael Hanselmann
769 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Successfully finished opcode %s",
770 d21d09d6 Iustin Pop
                         idx + 1, count, op_summary)
771 fbf0262f Michael Hanselmann
          except CancelJob:
772 fbf0262f Michael Hanselmann
            # Will be handled further up
773 fbf0262f Michael Hanselmann
            raise
774 85f03e0d Michael Hanselmann
          except Exception, err:
775 3c0d60d0 Guido Trotter
            queue.acquire(shared=1)
776 85f03e0d Michael Hanselmann
            try:
777 85f03e0d Michael Hanselmann
              try:
778 e35344b4 Michael Hanselmann
                logging.debug("Opcode %s/%s failed", idx + 1, count)
779 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
780 bcb66fca Iustin Pop
                if isinstance(err, errors.GenericError):
781 599ee321 Iustin Pop
                  to_encode = err
782 bcb66fca Iustin Pop
                else:
783 599ee321 Iustin Pop
                  to_encode = errors.OpExecError(str(err))
784 599ee321 Iustin Pop
                op.result = errors.EncodeException(to_encode)
785 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
786 0f6be82a Iustin Pop
                logging.info("Op %s/%s: Error in opcode %s: %s",
787 0f6be82a Iustin Pop
                             idx + 1, count, op_summary, err)
788 963a068b Michael Hanselmann
789 963a068b Michael Hanselmann
                to_encode = errors.OpExecError("Preceding opcode failed")
790 963a068b Michael Hanselmann
                job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
791 963a068b Michael Hanselmann
                                      errors.EncodeException(to_encode))
792 963a068b Michael Hanselmann
793 963a068b Michael Hanselmann
                # Consistency check
794 963a068b Michael Hanselmann
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
795 963a068b Michael Hanselmann
                                  for i in job.ops[:idx])
796 963a068b Michael Hanselmann
                assert compat.all(i.status == constants.OP_STATUS_ERROR and
797 963a068b Michael Hanselmann
                                  errors.GetEncodedError(i.result)
798 963a068b Michael Hanselmann
                                  for i in job.ops[idx:])
799 85f03e0d Michael Hanselmann
              finally:
800 6ea72e43 Michael Hanselmann
                job.lock_status = None
801 6ea72e43 Michael Hanselmann
                job.end_timestamp = TimeStampNow()
802 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
803 85f03e0d Michael Hanselmann
            finally:
804 85f03e0d Michael Hanselmann
              queue.release()
805 85f03e0d Michael Hanselmann
            raise
806 85f03e0d Michael Hanselmann
807 fbf0262f Michael Hanselmann
      except CancelJob:
808 3c0d60d0 Guido Trotter
        queue.acquire(shared=1)
809 fbf0262f Michael Hanselmann
        try:
810 39ed3a98 Guido Trotter
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
811 39ed3a98 Guido Trotter
                                "Job canceled by request")
812 6ea72e43 Michael Hanselmann
          job.lock_status = None
813 6ea72e43 Michael Hanselmann
          job.end_timestamp = TimeStampNow()
814 6ea72e43 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
815 fbf0262f Michael Hanselmann
        finally:
816 fbf0262f Michael Hanselmann
          queue.release()
817 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
818 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
819 85f03e0d Michael Hanselmann
      except:
820 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
821 e2715f69 Michael Hanselmann
    finally:
822 6ea72e43 Michael Hanselmann
      status = job.CalcStatus()
823 6ea72e43 Michael Hanselmann
      logging.info("Finished job %s, status = %s", job.id, status)
824 e2715f69 Michael Hanselmann
825 e2715f69 Michael Hanselmann
826 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
827 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
828 ea03467c Iustin Pop

829 ea03467c Iustin Pop
  """
830 5bdce580 Michael Hanselmann
  def __init__(self, queue):
831 89e2b4d2 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
832 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
833 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
834 5bdce580 Michael Hanselmann
    self.queue = queue
835 e2715f69 Michael Hanselmann
836 e2715f69 Michael Hanselmann
837 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
838 6c881c52 Iustin Pop
  """Decorator for "public" functions.
839 ea03467c Iustin Pop

840 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
841 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
842 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
843 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
844 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
845 ea03467c Iustin Pop

846 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
847 f1da30e6 Michael Hanselmann

848 6c881c52 Iustin Pop
  Example::
849 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
850 6c881c52 Iustin Pop
    @_RequireOpenQueue
851 6c881c52 Iustin Pop
    def Example(self):
852 6c881c52 Iustin Pop
      pass
853 db37da70 Michael Hanselmann

854 6c881c52 Iustin Pop
  """
855 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
856 7260cfbe Iustin Pop
    # pylint: disable-msg=W0212
857 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
858 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
859 6c881c52 Iustin Pop
  return wrapper
860 db37da70 Michael Hanselmann
861 db37da70 Michael Hanselmann
862 6c881c52 Iustin Pop
class JobQueue(object):
863 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
864 db37da70 Michael Hanselmann

865 6c881c52 Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
866 6c881c52 Iustin Pop

867 6c881c52 Iustin Pop
  """
868 6c881c52 Iustin Pop
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
869 db37da70 Michael Hanselmann
870 85f03e0d Michael Hanselmann
  def __init__(self, context):
871 ea03467c Iustin Pop
    """Constructor for JobQueue.
872 ea03467c Iustin Pop

873 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
874 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
875 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
876 ea03467c Iustin Pop
    running).
877 ea03467c Iustin Pop

878 ea03467c Iustin Pop
    @type context: GanetiContext
879 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
880 ea03467c Iustin Pop
        data and other ganeti objects
881 ea03467c Iustin Pop

882 ea03467c Iustin Pop
    """
883 5bdce580 Michael Hanselmann
    self.context = context
884 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
885 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
886 f1da30e6 Michael Hanselmann
887 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
888 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
889 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
890 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
891 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
892 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
893 ebb80afa Guido Trotter
894 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
895 ebb80afa Guido Trotter
    self.release = self._lock.release
896 85f03e0d Michael Hanselmann
897 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
898 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
899 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
900 f1da30e6 Michael Hanselmann
901 04ab05ce Michael Hanselmann
    # Read serial file
902 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
903 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
904 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
905 c4beba1c Iustin Pop
906 23752136 Michael Hanselmann
    # Get initial list of nodes
907 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
908 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
909 59303563 Iustin Pop
                       if n.master_candidate)
910 8e00939c Michael Hanselmann
911 8e00939c Michael Hanselmann
    # Remove master node
912 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
913 23752136 Michael Hanselmann
914 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
915 23752136 Michael Hanselmann
916 20571a26 Guido Trotter
    self._queue_size = 0
917 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
918 20571a26 Guido Trotter
    self._drained = self._IsQueueMarkedDrain()
919 20571a26 Guido Trotter
920 85f03e0d Michael Hanselmann
    # Setup worker pool
921 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
922 85f03e0d Michael Hanselmann
    try:
923 16714921 Michael Hanselmann
      # We need to lock here because WorkerPool.AddTask() may start a job while
924 16714921 Michael Hanselmann
      # we're still doing our work.
925 16714921 Michael Hanselmann
      self.acquire()
926 16714921 Michael Hanselmann
      try:
927 711b5124 Michael Hanselmann
        logging.info("Inspecting job queue")
928 711b5124 Michael Hanselmann
929 711b5124 Michael Hanselmann
        all_job_ids = self._GetJobIDsUnlocked()
930 b7cb9024 Michael Hanselmann
        jobs_count = len(all_job_ids)
931 711b5124 Michael Hanselmann
        lastinfo = time.time()
932 711b5124 Michael Hanselmann
        for idx, job_id in enumerate(all_job_ids):
933 711b5124 Michael Hanselmann
          # Give an update every 1000 jobs or 10 seconds
934 b7cb9024 Michael Hanselmann
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
935 b7cb9024 Michael Hanselmann
              idx == (jobs_count - 1)):
936 711b5124 Michael Hanselmann
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
937 b7cb9024 Michael Hanselmann
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
938 711b5124 Michael Hanselmann
            lastinfo = time.time()
939 711b5124 Michael Hanselmann
940 711b5124 Michael Hanselmann
          job = self._LoadJobUnlocked(job_id)
941 711b5124 Michael Hanselmann
942 16714921 Michael Hanselmann
          # a failure in loading the job can cause 'None' to be returned
943 16714921 Michael Hanselmann
          if job is None:
944 16714921 Michael Hanselmann
            continue
945 94ed59a5 Iustin Pop
946 16714921 Michael Hanselmann
          status = job.CalcStatus()
947 85f03e0d Michael Hanselmann
948 16714921 Michael Hanselmann
          if status in (constants.JOB_STATUS_QUEUED, ):
949 b2e8a4d9 Michael Hanselmann
            self._wpool.AddTask((job, ))
950 85f03e0d Michael Hanselmann
951 16714921 Michael Hanselmann
          elif status in (constants.JOB_STATUS_RUNNING,
952 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK,
953 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_CANCELING):
954 16714921 Michael Hanselmann
            logging.warning("Unfinished job %s found: %s", job.id, job)
955 39ed3a98 Guido Trotter
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
956 39ed3a98 Guido Trotter
                                  "Unclean master daemon shutdown")
957 711b5124 Michael Hanselmann
958 711b5124 Michael Hanselmann
        logging.info("Job queue inspection finished")
959 16714921 Michael Hanselmann
      finally:
960 16714921 Michael Hanselmann
        self.release()
961 16714921 Michael Hanselmann
    except:
962 16714921 Michael Hanselmann
      self._wpool.TerminateWorkers()
963 16714921 Michael Hanselmann
      raise
964 85f03e0d Michael Hanselmann
965 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
966 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
967 99aabbed Iustin Pop
  def AddNode(self, node):
968 99aabbed Iustin Pop
    """Register a new node with the queue.
969 99aabbed Iustin Pop

970 99aabbed Iustin Pop
    @type node: L{objects.Node}
971 99aabbed Iustin Pop
    @param node: the node object to be added
972 99aabbed Iustin Pop

973 99aabbed Iustin Pop
    """
974 99aabbed Iustin Pop
    node_name = node.name
975 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
976 23752136 Michael Hanselmann
977 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
978 c8457ce7 Iustin Pop
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
979 3cebe102 Michael Hanselmann
    msg = result.fail_msg
980 c8457ce7 Iustin Pop
    if msg:
981 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
982 c8457ce7 Iustin Pop
                      node_name, msg)
983 23752136 Michael Hanselmann
984 59303563 Iustin Pop
    if not node.master_candidate:
985 59303563 Iustin Pop
      # remove if existing, ignoring errors
986 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
987 59303563 Iustin Pop
      # and skip the replication of the job ids
988 59303563 Iustin Pop
      return
989 59303563 Iustin Pop
990 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
991 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
992 23752136 Michael Hanselmann
993 d2e03a33 Michael Hanselmann
    # Upload current serial file
994 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
995 d2e03a33 Michael Hanselmann
996 d2e03a33 Michael Hanselmann
    for file_name in files:
997 9f774ee8 Michael Hanselmann
      # Read file content
998 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
999 9f774ee8 Michael Hanselmann
1000 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1001 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
1002 a3811745 Michael Hanselmann
                                                  file_name, content)
1003 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
1004 c8457ce7 Iustin Pop
      if msg:
1005 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1006 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1007 d2e03a33 Michael Hanselmann
1008 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1009 d2e03a33 Michael Hanselmann
1010 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1011 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1012 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1013 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1014 ea03467c Iustin Pop

1015 ea03467c Iustin Pop
    @type node_name: str
1016 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1017 ea03467c Iustin Pop

1018 ea03467c Iustin Pop
    """
1019 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1020 23752136 Michael Hanselmann
1021 7e950d31 Iustin Pop
  @staticmethod
1022 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1023 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1024 ea03467c Iustin Pop

1025 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1026 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1027 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1028 ea03467c Iustin Pop

1029 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1030 ea03467c Iustin Pop
    @type nodes: list
1031 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1032 ea03467c Iustin Pop
    @type failmsg: str
1033 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1034 ea03467c Iustin Pop

1035 ea03467c Iustin Pop
    """
1036 e74798c1 Michael Hanselmann
    failed = []
1037 e74798c1 Michael Hanselmann
    success = []
1038 e74798c1 Michael Hanselmann
1039 e74798c1 Michael Hanselmann
    for node in nodes:
1040 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1041 c8457ce7 Iustin Pop
      if msg:
1042 e74798c1 Michael Hanselmann
        failed.append(node)
1043 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1044 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1045 c8457ce7 Iustin Pop
      else:
1046 c8457ce7 Iustin Pop
        success.append(node)
1047 e74798c1 Michael Hanselmann
1048 e74798c1 Michael Hanselmann
    # +1 for the master node
1049 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1050 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1051 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1052 e74798c1 Michael Hanselmann
1053 99aabbed Iustin Pop
  def _GetNodeIp(self):
1054 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1055 99aabbed Iustin Pop

1056 ea03467c Iustin Pop
    @rtype: (list, list)
1057 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1058 ea03467c Iustin Pop
        names and the second one with the node addresses
1059 ea03467c Iustin Pop

1060 99aabbed Iustin Pop
    """
1061 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1062 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1063 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1064 99aabbed Iustin Pop
    return name_list, addr_list
1065 99aabbed Iustin Pop
1066 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1067 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1068 8e00939c Michael Hanselmann

1069 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1070 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1071 ea03467c Iustin Pop

1072 ea03467c Iustin Pop
    @type file_name: str
1073 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1074 ea03467c Iustin Pop
    @type data: str
1075 ea03467c Iustin Pop
    @param data: the new contents of the file
1076 4c36bdf5 Guido Trotter
    @type replicate: boolean
1077 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1078 ea03467c Iustin Pop

1079 8e00939c Michael Hanselmann
    """
1080 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
1081 8e00939c Michael Hanselmann
1082 4c36bdf5 Guido Trotter
    if replicate:
1083 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1084 4c36bdf5 Guido Trotter
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1085 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1086 23752136 Michael Hanselmann
1087 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1088 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1089 ea03467c Iustin Pop

1090 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1091 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1092 ea03467c Iustin Pop

1093 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1094 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1095 ea03467c Iustin Pop

1096 ea03467c Iustin Pop
    """
1097 dd875d32 Michael Hanselmann
    # Rename them locally
1098 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1099 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1100 abc1f2ce Michael Hanselmann
1101 dd875d32 Michael Hanselmann
    # ... and on all nodes
1102 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1103 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1104 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1105 abc1f2ce Michael Hanselmann
1106 7e950d31 Iustin Pop
  @staticmethod
1107 7e950d31 Iustin Pop
  def _FormatJobID(job_id):
1108 ea03467c Iustin Pop
    """Convert a job ID to string format.
1109 ea03467c Iustin Pop

1110 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
1111 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
1112 ea03467c Iustin Pop
    abstract this change.
1113 ea03467c Iustin Pop

1114 ea03467c Iustin Pop
    @type job_id: int or long
1115 ea03467c Iustin Pop
    @param job_id: the numeric job id
1116 ea03467c Iustin Pop
    @rtype: str
1117 ea03467c Iustin Pop
    @return: the formatted job id
1118 ea03467c Iustin Pop

1119 ea03467c Iustin Pop
    """
1120 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
1121 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1122 85f03e0d Michael Hanselmann
    if job_id < 0:
1123 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1124 85f03e0d Michael Hanselmann
1125 85f03e0d Michael Hanselmann
    return str(job_id)
1126 85f03e0d Michael Hanselmann
1127 58b22b6e Michael Hanselmann
  @classmethod
1128 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
1129 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
1130 58b22b6e Michael Hanselmann

1131 58b22b6e Michael Hanselmann
    @type job_id: str
1132 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
1133 58b22b6e Michael Hanselmann
    @rtype: str
1134 58b22b6e Michael Hanselmann
    @return: Directory name
1135 58b22b6e Michael Hanselmann

1136 58b22b6e Michael Hanselmann
    """
1137 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1138 58b22b6e Michael Hanselmann
1139 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1140 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1141 f1da30e6 Michael Hanselmann

1142 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1143 f1da30e6 Michael Hanselmann

1144 009e73d0 Iustin Pop
    @type count: integer
1145 009e73d0 Iustin Pop
    @param count: how many serials to return
1146 ea03467c Iustin Pop
    @rtype: str
1147 ea03467c Iustin Pop
    @return: a string representing the job identifier.
1148 f1da30e6 Michael Hanselmann

1149 f1da30e6 Michael Hanselmann
    """
1150 009e73d0 Iustin Pop
    assert count > 0
1151 f1da30e6 Michael Hanselmann
    # New number
1152 009e73d0 Iustin Pop
    serial = self._last_serial + count
1153 f1da30e6 Michael Hanselmann
1154 f1da30e6 Michael Hanselmann
    # Write to file
1155 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1156 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1157 f1da30e6 Michael Hanselmann
1158 009e73d0 Iustin Pop
    result = [self._FormatJobID(v)
1159 009e73d0 Iustin Pop
              for v in range(self._last_serial, serial + 1)]
1160 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1161 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1162 f1da30e6 Michael Hanselmann
1163 009e73d0 Iustin Pop
    return result
1164 f1da30e6 Michael Hanselmann
1165 85f03e0d Michael Hanselmann
  @staticmethod
1166 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1167 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1168 ea03467c Iustin Pop

1169 ea03467c Iustin Pop
    @type job_id: str
1170 ea03467c Iustin Pop
    @param job_id: the job identifier
1171 ea03467c Iustin Pop
    @rtype: str
1172 ea03467c Iustin Pop
    @return: the path to the job file
1173 ea03467c Iustin Pop

1174 ea03467c Iustin Pop
    """
1175 c4feafe8 Iustin Pop
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1176 f1da30e6 Michael Hanselmann
1177 58b22b6e Michael Hanselmann
  @classmethod
1178 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
1179 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1180 ea03467c Iustin Pop

1181 ea03467c Iustin Pop
    @type job_id: str
1182 ea03467c Iustin Pop
    @param job_id: the job identifier
1183 ea03467c Iustin Pop
    @rtype: str
1184 ea03467c Iustin Pop
    @return: the path to the archived job file
1185 ea03467c Iustin Pop

1186 ea03467c Iustin Pop
    """
1187 0411c011 Iustin Pop
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1188 0411c011 Iustin Pop
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1189 0cb94105 Michael Hanselmann
1190 85a1c57d Guido Trotter
  def _GetJobIDsUnlocked(self, sort=True):
1191 911a495b Iustin Pop
    """Return all known job IDs.
1192 911a495b Iustin Pop

1193 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1194 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1195 ac0930b9 Iustin Pop
    extra IDs).
1196 ac0930b9 Iustin Pop

1197 85a1c57d Guido Trotter
    @type sort: boolean
1198 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1199 ea03467c Iustin Pop
    @rtype: list
1200 ea03467c Iustin Pop
    @return: the list of job IDs
1201 ea03467c Iustin Pop

1202 911a495b Iustin Pop
    """
1203 85a1c57d Guido Trotter
    jlist = []
1204 b5b8309d Guido Trotter
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1205 85a1c57d Guido Trotter
      m = self._RE_JOB_FILE.match(filename)
1206 85a1c57d Guido Trotter
      if m:
1207 85a1c57d Guido Trotter
        jlist.append(m.group(1))
1208 85a1c57d Guido Trotter
    if sort:
1209 85a1c57d Guido Trotter
      jlist = utils.NiceSort(jlist)
1210 f0d874fe Iustin Pop
    return jlist
1211 911a495b Iustin Pop
1212 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1213 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1214 ea03467c Iustin Pop

1215 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1216 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1217 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1218 ea03467c Iustin Pop

1219 ea03467c Iustin Pop
    @param job_id: the job id
1220 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1221 ea03467c Iustin Pop
    @return: either None or the job object
1222 ea03467c Iustin Pop

1223 ea03467c Iustin Pop
    """
1224 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
1225 5685c1a5 Michael Hanselmann
    if job:
1226 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
1227 5685c1a5 Michael Hanselmann
      return job
1228 ac0930b9 Iustin Pop
1229 3d6c5566 Guido Trotter
    try:
1230 3d6c5566 Guido Trotter
      job = self._LoadJobFromDisk(job_id)
1231 aa9f8167 Iustin Pop
      if job is None:
1232 aa9f8167 Iustin Pop
        return job
1233 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
1234 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
1235 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
1236 3d6c5566 Guido Trotter
      if old_path == new_path:
1237 3d6c5566 Guido Trotter
        # job already archived (future case)
1238 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
1239 3d6c5566 Guido Trotter
      else:
1240 3d6c5566 Guido Trotter
        # non-archived case
1241 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
1242 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
1243 3d6c5566 Guido Trotter
      return None
1244 162c8636 Guido Trotter
1245 162c8636 Guido Trotter
    self._memcache[job_id] = job
1246 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
1247 162c8636 Guido Trotter
    return job
1248 162c8636 Guido Trotter
1249 162c8636 Guido Trotter
  def _LoadJobFromDisk(self, job_id):
1250 162c8636 Guido Trotter
    """Load the given job file from disk.
1251 162c8636 Guido Trotter

1252 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1253 162c8636 Guido Trotter

1254 162c8636 Guido Trotter
    @type job_id: string
1255 162c8636 Guido Trotter
    @param job_id: job identifier
1256 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
1257 162c8636 Guido Trotter
    @return: either None or the job object
1258 162c8636 Guido Trotter

1259 162c8636 Guido Trotter
    """
1260 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
1261 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
1262 f1da30e6 Michael Hanselmann
    try:
1263 13998ef2 Michael Hanselmann
      raw_data = utils.ReadFile(filepath)
1264 162c8636 Guido Trotter
    except EnvironmentError, err:
1265 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
1266 f1da30e6 Michael Hanselmann
        return None
1267 f1da30e6 Michael Hanselmann
      raise
1268 13998ef2 Michael Hanselmann
1269 94ed59a5 Iustin Pop
    try:
1270 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
1271 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
1272 7260cfbe Iustin Pop
    except Exception, err: # pylint: disable-msg=W0703
1273 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
1274 94ed59a5 Iustin Pop
1275 ac0930b9 Iustin Pop
    return job
1276 f1da30e6 Michael Hanselmann
1277 0f9c08dc Guido Trotter
  def SafeLoadJobFromDisk(self, job_id):
1278 0f9c08dc Guido Trotter
    """Load the given job file from disk.
1279 0f9c08dc Guido Trotter

1280 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1281 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
1282 0f9c08dc Guido Trotter
    exception is logged.
1283 0f9c08dc Guido Trotter

1284 0f9c08dc Guido Trotter
    @type job_id: string
1285 0f9c08dc Guido Trotter
    @param job_id: job identifier
1286 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
1287 0f9c08dc Guido Trotter
    @return: either None or the job object
1288 0f9c08dc Guido Trotter

1289 0f9c08dc Guido Trotter
    """
1290 0f9c08dc Guido Trotter
    try:
1291 0f9c08dc Guido Trotter
      return self._LoadJobFromDisk(job_id)
1292 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
1293 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
1294 0f9c08dc Guido Trotter
      return None
1295 0f9c08dc Guido Trotter
1296 686d7433 Iustin Pop
  @staticmethod
1297 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
1298 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
1299 686d7433 Iustin Pop

1300 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
1301 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
1302 686d7433 Iustin Pop

1303 ea03467c Iustin Pop
    @rtype: boolean
1304 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
1305 ea03467c Iustin Pop

1306 686d7433 Iustin Pop
    """
1307 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1308 686d7433 Iustin Pop
1309 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
1310 20571a26 Guido Trotter
    """Update the queue size.
1311 20571a26 Guido Trotter

1312 20571a26 Guido Trotter
    """
1313 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1314 20571a26 Guido Trotter
1315 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1316 20571a26 Guido Trotter
  @_RequireOpenQueue
1317 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
1318 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
1319 3ccafd0e Iustin Pop

1320 ea03467c Iustin Pop
    @type drain_flag: boolean
1321 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
1322 ea03467c Iustin Pop

1323 3ccafd0e Iustin Pop
    """
1324 3ccafd0e Iustin Pop
    if drain_flag:
1325 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1326 3ccafd0e Iustin Pop
    else:
1327 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1328 20571a26 Guido Trotter
1329 20571a26 Guido Trotter
    self._drained = drain_flag
1330 20571a26 Guido Trotter
1331 3ccafd0e Iustin Pop
    return True
1332 3ccafd0e Iustin Pop
1333 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1334 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
1335 85f03e0d Michael Hanselmann
    """Create and store a new job.
1336 f1da30e6 Michael Hanselmann

1337 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
1338 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
1339 c3f0a12f Iustin Pop

1340 009e73d0 Iustin Pop
    @type job_id: job ID
1341 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
1342 c3f0a12f Iustin Pop
    @type ops: list
1343 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
1344 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
1345 7beb1e53 Guido Trotter
    @return: the job object to be queued
1346 7beb1e53 Guido Trotter
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1347 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1348 c3f0a12f Iustin Pop

1349 c3f0a12f Iustin Pop
    """
1350 20571a26 Guido Trotter
    # Ok when sharing the big job queue lock, as the drain file is created when
1351 20571a26 Guido Trotter
    # the lock is exclusive.
1352 20571a26 Guido Trotter
    if self._drained:
1353 2971c913 Iustin Pop
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1354 f87b405e Michael Hanselmann
1355 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1356 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
1357 f87b405e Michael Hanselmann
1358 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
1359 f1da30e6 Michael Hanselmann
1360 f1da30e6 Michael Hanselmann
    # Write to disk
1361 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
1362 f1da30e6 Michael Hanselmann
1363 20571a26 Guido Trotter
    self._queue_size += 1
1364 20571a26 Guido Trotter
1365 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
1366 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
1367 ac0930b9 Iustin Pop
1368 7beb1e53 Guido Trotter
    return job
1369 f1da30e6 Michael Hanselmann
1370 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1371 2971c913 Iustin Pop
  @_RequireOpenQueue
1372 2971c913 Iustin Pop
  def SubmitJob(self, ops):
1373 2971c913 Iustin Pop
    """Create and store a new job.
1374 2971c913 Iustin Pop

1375 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1376 2971c913 Iustin Pop

1377 2971c913 Iustin Pop
    """
1378 009e73d0 Iustin Pop
    job_id = self._NewSerialsUnlocked(1)[0]
1379 b2e8a4d9 Michael Hanselmann
    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1380 7beb1e53 Guido Trotter
    return job_id
1381 2971c913 Iustin Pop
1382 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1383 2971c913 Iustin Pop
  @_RequireOpenQueue
1384 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
1385 2971c913 Iustin Pop
    """Create and store multiple jobs.
1386 2971c913 Iustin Pop

1387 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1388 2971c913 Iustin Pop

1389 2971c913 Iustin Pop
    """
1390 2971c913 Iustin Pop
    results = []
1391 7beb1e53 Guido Trotter
    tasks = []
1392 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1393 009e73d0 Iustin Pop
    for job_id, ops in zip(all_job_ids, jobs):
1394 2971c913 Iustin Pop
      try:
1395 7beb1e53 Guido Trotter
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1396 2971c913 Iustin Pop
        status = True
1397 7beb1e53 Guido Trotter
        data = job_id
1398 2971c913 Iustin Pop
      except errors.GenericError, err:
1399 2971c913 Iustin Pop
        data = str(err)
1400 2971c913 Iustin Pop
        status = False
1401 2971c913 Iustin Pop
      results.append((status, data))
1402 7beb1e53 Guido Trotter
    self._wpool.AddManyTasks(tasks)
1403 2971c913 Iustin Pop
1404 2971c913 Iustin Pop
    return results
1405 2971c913 Iustin Pop
1406 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1407 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
1408 ea03467c Iustin Pop
    """Update a job's on disk storage.
1409 ea03467c Iustin Pop

1410 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
1411 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
1412 ea03467c Iustin Pop
    nodes.
1413 ea03467c Iustin Pop

1414 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1415 ea03467c Iustin Pop
    @param job: the changed job
1416 4c36bdf5 Guido Trotter
    @type replicate: boolean
1417 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
1418 ea03467c Iustin Pop

1419 ea03467c Iustin Pop
    """
1420 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
1421 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
1422 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
1423 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
1424 ac0930b9 Iustin Pop
1425 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1426 5c735209 Iustin Pop
                        timeout):
1427 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
1428 6c5a7090 Michael Hanselmann

1429 6c5a7090 Michael Hanselmann
    @type job_id: string
1430 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
1431 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1432 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1433 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1434 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1435 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1436 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1437 5c735209 Iustin Pop
    @type timeout: float
1438 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
1439 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1440 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1441 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1442 ea03467c Iustin Pop

1443 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1444 ea03467c Iustin Pop
        we instead return a special value,
1445 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1446 ea03467c Iustin Pop
        as such by the clients
1447 6c5a7090 Michael Hanselmann

1448 6c5a7090 Michael Hanselmann
    """
1449 989a8bee Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1450 989a8bee Michael Hanselmann
1451 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
1452 989a8bee Michael Hanselmann
1453 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
1454 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
1455 dfe57c22 Michael Hanselmann
1456 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1457 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1458 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1459 188c5e0a Michael Hanselmann
    """Cancels a job.
1460 188c5e0a Michael Hanselmann

1461 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1462 ea03467c Iustin Pop

1463 188c5e0a Michael Hanselmann
    @type job_id: string
1464 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1465 188c5e0a Michael Hanselmann

1466 188c5e0a Michael Hanselmann
    """
1467 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1468 188c5e0a Michael Hanselmann
1469 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1470 188c5e0a Michael Hanselmann
    if not job:
1471 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1472 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1473 fbf0262f Michael Hanselmann
1474 fbf0262f Michael Hanselmann
    job_status = job.CalcStatus()
1475 188c5e0a Michael Hanselmann
1476 fbf0262f Michael Hanselmann
    if job_status not in (constants.JOB_STATUS_QUEUED,
1477 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK):
1478 a9e97393 Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1479 a9e97393 Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1480 fbf0262f Michael Hanselmann
1481 fbf0262f Michael Hanselmann
    if job_status == constants.JOB_STATUS_QUEUED:
1482 39ed3a98 Guido Trotter
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1483 39ed3a98 Guido Trotter
                            "Job canceled by request")
1484 fbf0262f Michael Hanselmann
      return (True, "Job %s canceled" % job.id)
1485 188c5e0a Michael Hanselmann
1486 fbf0262f Michael Hanselmann
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1487 fbf0262f Michael Hanselmann
      # The worker will notice the new status and cancel the job
1488 39ed3a98 Guido Trotter
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1489 fbf0262f Michael Hanselmann
      return (True, "Job %s will be canceled" % job.id)
1490 fbf0262f Michael Hanselmann
1491 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1492 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
1493 d7fd1f28 Michael Hanselmann
    """Archives jobs.
1494 c609f802 Michael Hanselmann

1495 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
1496 25e7b43f Iustin Pop
    @param jobs: Job objects
1497 d7fd1f28 Michael Hanselmann
    @rtype: int
1498 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
1499 c609f802 Michael Hanselmann

1500 c609f802 Michael Hanselmann
    """
1501 d7fd1f28 Michael Hanselmann
    archive_jobs = []
1502 d7fd1f28 Michael Hanselmann
    rename_files = []
1503 d7fd1f28 Michael Hanselmann
    for job in jobs:
1504 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1505 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
1506 d7fd1f28 Michael Hanselmann
        continue
1507 c609f802 Michael Hanselmann
1508 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
1509 c609f802 Michael Hanselmann
1510 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
1511 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
1512 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
1513 c609f802 Michael Hanselmann
1514 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
1515 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
1516 f1da30e6 Michael Hanselmann
1517 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
1518 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
1519 d7fd1f28 Michael Hanselmann
1520 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1521 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
1522 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
1523 20571a26 Guido Trotter
    # archived jobs to fix this.
1524 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1525 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
1526 78d12585 Michael Hanselmann
1527 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1528 07cd723a Iustin Pop
  @_RequireOpenQueue
1529 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1530 07cd723a Iustin Pop
    """Archives a job.
1531 07cd723a Iustin Pop

1532 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1533 ea03467c Iustin Pop

1534 07cd723a Iustin Pop
    @type job_id: string
1535 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1536 78d12585 Michael Hanselmann
    @rtype: bool
1537 78d12585 Michael Hanselmann
    @return: Whether job was archived
1538 07cd723a Iustin Pop

1539 07cd723a Iustin Pop
    """
1540 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
1541 78d12585 Michael Hanselmann
1542 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1543 78d12585 Michael Hanselmann
    if not job:
1544 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1545 78d12585 Michael Hanselmann
      return False
1546 78d12585 Michael Hanselmann
1547 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
1548 07cd723a Iustin Pop
1549 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1550 07cd723a Iustin Pop
  @_RequireOpenQueue
1551 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
1552 07cd723a Iustin Pop
    """Archives all jobs based on age.
1553 07cd723a Iustin Pop

1554 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1555 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1556 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1557 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1558 07cd723a Iustin Pop

1559 07cd723a Iustin Pop
    @type age: int
1560 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1561 07cd723a Iustin Pop

1562 07cd723a Iustin Pop
    """
1563 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1564 07cd723a Iustin Pop
1565 07cd723a Iustin Pop
    now = time.time()
1566 f8ad5591 Michael Hanselmann
    end_time = now + timeout
1567 f8ad5591 Michael Hanselmann
    archived_count = 0
1568 f8ad5591 Michael Hanselmann
    last_touched = 0
1569 f8ad5591 Michael Hanselmann
1570 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
1571 d7fd1f28 Michael Hanselmann
    pending = []
1572 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1573 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
1574 f8ad5591 Michael Hanselmann
1575 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
1576 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
1577 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
1578 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
1579 f8ad5591 Michael Hanselmann
        break
1580 f8ad5591 Michael Hanselmann
1581 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
1582 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1583 f8ad5591 Michael Hanselmann
      if job:
1584 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
1585 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
1586 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
1587 f8ad5591 Michael Hanselmann
          else:
1588 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
1589 07cd723a Iustin Pop
        else:
1590 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
1591 f8ad5591 Michael Hanselmann
1592 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
1593 d7fd1f28 Michael Hanselmann
          pending.append(job)
1594 d7fd1f28 Michael Hanselmann
1595 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
1596 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
1597 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
1598 d7fd1f28 Michael Hanselmann
            pending = []
1599 f8ad5591 Michael Hanselmann
1600 d7fd1f28 Michael Hanselmann
    if pending:
1601 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
1602 07cd723a Iustin Pop
1603 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
1604 07cd723a Iustin Pop
1605 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1606 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1607 e2715f69 Michael Hanselmann

1608 ea03467c Iustin Pop
    @type job_ids: list
1609 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1610 ea03467c Iustin Pop
    @type fields: list
1611 ea03467c Iustin Pop
    @param fields: names of fields to return
1612 ea03467c Iustin Pop
    @rtype: list
1613 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1614 ea03467c Iustin Pop
        the requested fields
1615 e2715f69 Michael Hanselmann

1616 e2715f69 Michael Hanselmann
    """
1617 85f03e0d Michael Hanselmann
    jobs = []
1618 9f7b4967 Guido Trotter
    list_all = False
1619 9f7b4967 Guido Trotter
    if not job_ids:
1620 9f7b4967 Guido Trotter
      # Since files are added to/removed from the queue atomically, there's no
1621 9f7b4967 Guido Trotter
      # risk of getting the job ids in an inconsistent state.
1622 9f7b4967 Guido Trotter
      job_ids = self._GetJobIDsUnlocked()
1623 9f7b4967 Guido Trotter
      list_all = True
1624 e2715f69 Michael Hanselmann
1625 9f7b4967 Guido Trotter
    for job_id in job_ids:
1626 9f7b4967 Guido Trotter
      job = self.SafeLoadJobFromDisk(job_id)
1627 9f7b4967 Guido Trotter
      if job is not None:
1628 6a290889 Guido Trotter
        jobs.append(job.GetInfo(fields))
1629 9f7b4967 Guido Trotter
      elif not list_all:
1630 9f7b4967 Guido Trotter
        jobs.append(None)
1631 e2715f69 Michael Hanselmann
1632 85f03e0d Michael Hanselmann
    return jobs
1633 e2715f69 Michael Hanselmann
1634 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1635 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1636 e2715f69 Michael Hanselmann
  def Shutdown(self):
1637 e2715f69 Michael Hanselmann
    """Stops the job queue.
1638 e2715f69 Michael Hanselmann

1639 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1640 ea03467c Iustin Pop

1641 e2715f69 Michael Hanselmann
    """
1642 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1643 85f03e0d Michael Hanselmann
1644 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
1645 a71f9c7d Guido Trotter
    self._queue_filelock = None