Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 6c2549d6

History | View | Annotate | Download (45.7 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 e2715f69 Michael Hanselmann
import threading
35 f1da30e6 Michael Hanselmann
import errno
36 f1da30e6 Michael Hanselmann
import re
37 f1048938 Iustin Pop
import time
38 5685c1a5 Michael Hanselmann
import weakref
39 498ae1cc Iustin Pop
40 6c2549d6 Guido Trotter
try:
41 6c2549d6 Guido Trotter
  # pylint: disable-msg=E0611
42 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
43 6c2549d6 Guido Trotter
except ImportError:
44 6c2549d6 Guido Trotter
  import pyinotify
45 6c2549d6 Guido Trotter
46 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
47 e2715f69 Michael Hanselmann
from ganeti import constants
48 f1da30e6 Michael Hanselmann
from ganeti import serializer
49 e2715f69 Michael Hanselmann
from ganeti import workerpool
50 f1da30e6 Michael Hanselmann
from ganeti import opcodes
51 7a1ecaed Iustin Pop
from ganeti import errors
52 e2715f69 Michael Hanselmann
from ganeti import mcpu
53 7996a135 Iustin Pop
from ganeti import utils
54 04ab05ce Michael Hanselmann
from ganeti import jstore
55 c3f0a12f Iustin Pop
from ganeti import rpc
56 e2715f69 Michael Hanselmann
57 fbf0262f Michael Hanselmann
58 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
59 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
60 e2715f69 Michael Hanselmann
61 498ae1cc Iustin Pop
62 9728ae5d Iustin Pop
class CancelJob(Exception):
63 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
64 fbf0262f Michael Hanselmann

65 fbf0262f Michael Hanselmann
  """
66 fbf0262f Michael Hanselmann
67 fbf0262f Michael Hanselmann
68 70552c46 Michael Hanselmann
def TimeStampNow():
69 ea03467c Iustin Pop
  """Returns the current timestamp.
70 ea03467c Iustin Pop

71 ea03467c Iustin Pop
  @rtype: tuple
72 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
73 ea03467c Iustin Pop

74 ea03467c Iustin Pop
  """
75 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
76 70552c46 Michael Hanselmann
77 70552c46 Michael Hanselmann
78 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
79 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
80 e2715f69 Michael Hanselmann

81 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
82 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
83 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
84 ea03467c Iustin Pop
  @ivar status: the current status
85 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
86 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
87 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
88 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
89 f1048938 Iustin Pop

90 e2715f69 Michael Hanselmann
  """
91 66d895a8 Iustin Pop
  __slots__ = ["input", "status", "result", "log",
92 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
93 66d895a8 Iustin Pop
               "__weakref__"]
94 66d895a8 Iustin Pop
95 85f03e0d Michael Hanselmann
  def __init__(self, op):
96 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
97 ea03467c Iustin Pop

98 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
99 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
100 ea03467c Iustin Pop

101 ea03467c Iustin Pop
    """
102 85f03e0d Michael Hanselmann
    self.input = op
103 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
104 85f03e0d Michael Hanselmann
    self.result = None
105 85f03e0d Michael Hanselmann
    self.log = []
106 70552c46 Michael Hanselmann
    self.start_timestamp = None
107 b9b5abcb Iustin Pop
    self.exec_timestamp = None
108 70552c46 Michael Hanselmann
    self.end_timestamp = None
109 f1da30e6 Michael Hanselmann
110 f1da30e6 Michael Hanselmann
  @classmethod
111 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
112 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
113 ea03467c Iustin Pop

114 ea03467c Iustin Pop
    @type state: dict
115 ea03467c Iustin Pop
    @param state: the serialized state
116 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
117 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
118 ea03467c Iustin Pop

119 ea03467c Iustin Pop
    """
120 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
121 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
122 85f03e0d Michael Hanselmann
    obj.status = state["status"]
123 85f03e0d Michael Hanselmann
    obj.result = state["result"]
124 85f03e0d Michael Hanselmann
    obj.log = state["log"]
125 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
126 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
127 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
128 f1da30e6 Michael Hanselmann
    return obj
129 f1da30e6 Michael Hanselmann
130 f1da30e6 Michael Hanselmann
  def Serialize(self):
131 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
132 ea03467c Iustin Pop

133 ea03467c Iustin Pop
    @rtype: dict
134 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
135 ea03467c Iustin Pop

136 ea03467c Iustin Pop
    """
137 6c5a7090 Michael Hanselmann
    return {
138 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
139 6c5a7090 Michael Hanselmann
      "status": self.status,
140 6c5a7090 Michael Hanselmann
      "result": self.result,
141 6c5a7090 Michael Hanselmann
      "log": self.log,
142 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
143 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
144 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
145 6c5a7090 Michael Hanselmann
      }
146 f1048938 Iustin Pop
147 e2715f69 Michael Hanselmann
148 e2715f69 Michael Hanselmann
class _QueuedJob(object):
149 e2715f69 Michael Hanselmann
  """In-memory job representation.
150 e2715f69 Michael Hanselmann

151 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
152 ea03467c Iustin Pop
  be taken care of by users of this class.
153 ea03467c Iustin Pop

154 ea03467c Iustin Pop
  @type queue: L{JobQueue}
155 ea03467c Iustin Pop
  @ivar queue: the parent queue
156 ea03467c Iustin Pop
  @ivar id: the job ID
157 ea03467c Iustin Pop
  @type ops: list
158 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
159 ea03467c Iustin Pop
  @type log_serial: int
160 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
161 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
162 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
163 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
164 ef2df7d3 Michael Hanselmann
  @ivar lock_status: In-memory locking information for debugging
165 ea03467c Iustin Pop
  @ivar change: a Condition variable we use for waiting for job changes
166 e2715f69 Michael Hanselmann

167 e2715f69 Michael Hanselmann
  """
168 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
169 d25c1d6a Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial",
170 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
171 ef2df7d3 Michael Hanselmann
               "lock_status", "change",
172 66d895a8 Iustin Pop
               "__weakref__"]
173 66d895a8 Iustin Pop
174 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
175 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
176 ea03467c Iustin Pop

177 ea03467c Iustin Pop
    @type queue: L{JobQueue}
178 ea03467c Iustin Pop
    @param queue: our parent queue
179 ea03467c Iustin Pop
    @type job_id: job_id
180 ea03467c Iustin Pop
    @param job_id: our job id
181 ea03467c Iustin Pop
    @type ops: list
182 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
183 ea03467c Iustin Pop
        in _QueuedOpCodes
184 ea03467c Iustin Pop

185 ea03467c Iustin Pop
    """
186 e2715f69 Michael Hanselmann
    if not ops:
187 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
188 e2715f69 Michael Hanselmann
189 85f03e0d Michael Hanselmann
    self.queue = queue
190 f1da30e6 Michael Hanselmann
    self.id = job_id
191 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
192 6c5a7090 Michael Hanselmann
    self.log_serial = 0
193 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
194 c56ec146 Iustin Pop
    self.start_timestamp = None
195 c56ec146 Iustin Pop
    self.end_timestamp = None
196 6c5a7090 Michael Hanselmann
197 ef2df7d3 Michael Hanselmann
    # In-memory attributes
198 ef2df7d3 Michael Hanselmann
    self.lock_status = None
199 ef2df7d3 Michael Hanselmann
200 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
201 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
202 f1da30e6 Michael Hanselmann
203 9fa2e150 Michael Hanselmann
  def __repr__(self):
204 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
205 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
206 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
207 9fa2e150 Michael Hanselmann
208 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
209 9fa2e150 Michael Hanselmann
210 f1da30e6 Michael Hanselmann
  @classmethod
211 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
212 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
213 ea03467c Iustin Pop

214 ea03467c Iustin Pop
    @type queue: L{JobQueue}
215 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
216 ea03467c Iustin Pop
    @type state: dict
217 ea03467c Iustin Pop
    @param state: the serialized state
218 ea03467c Iustin Pop
    @rtype: _JobQueue
219 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
220 ea03467c Iustin Pop

221 ea03467c Iustin Pop
    """
222 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
223 85f03e0d Michael Hanselmann
    obj.queue = queue
224 85f03e0d Michael Hanselmann
    obj.id = state["id"]
225 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
226 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
227 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
228 6c5a7090 Michael Hanselmann
229 ef2df7d3 Michael Hanselmann
    # In-memory attributes
230 ef2df7d3 Michael Hanselmann
    obj.lock_status = None
231 ef2df7d3 Michael Hanselmann
232 6c5a7090 Michael Hanselmann
    obj.ops = []
233 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
234 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
235 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
236 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
237 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
238 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
239 6c5a7090 Michael Hanselmann
240 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
241 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
242 6c5a7090 Michael Hanselmann
243 f1da30e6 Michael Hanselmann
    return obj
244 f1da30e6 Michael Hanselmann
245 f1da30e6 Michael Hanselmann
  def Serialize(self):
246 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
247 ea03467c Iustin Pop

248 ea03467c Iustin Pop
    @rtype: dict
249 ea03467c Iustin Pop
    @return: the serialized state
250 ea03467c Iustin Pop

251 ea03467c Iustin Pop
    """
252 f1da30e6 Michael Hanselmann
    return {
253 f1da30e6 Michael Hanselmann
      "id": self.id,
254 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
255 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
256 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
257 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
258 f1da30e6 Michael Hanselmann
      }
259 f1da30e6 Michael Hanselmann
260 85f03e0d Michael Hanselmann
  def CalcStatus(self):
261 ea03467c Iustin Pop
    """Compute the status of this job.
262 ea03467c Iustin Pop

263 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
264 ea03467c Iustin Pop
    based on their status, computes the job status.
265 ea03467c Iustin Pop

266 ea03467c Iustin Pop
    The algorithm is:
267 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
268 ea03467c Iustin Pop
        status will be the same
269 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
270 ea03467c Iustin Pop
          - waitlock
271 fbf0262f Michael Hanselmann
          - canceling
272 ea03467c Iustin Pop
          - running
273 ea03467c Iustin Pop

274 ea03467c Iustin Pop
        will determine the job status
275 ea03467c Iustin Pop

276 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
277 ea03467c Iustin Pop
        and the job status will be the same
278 ea03467c Iustin Pop

279 ea03467c Iustin Pop
    @return: the job status
280 ea03467c Iustin Pop

281 ea03467c Iustin Pop
    """
282 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
283 e2715f69 Michael Hanselmann
284 e2715f69 Michael Hanselmann
    all_success = True
285 85f03e0d Michael Hanselmann
    for op in self.ops:
286 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
287 e2715f69 Michael Hanselmann
        continue
288 e2715f69 Michael Hanselmann
289 e2715f69 Michael Hanselmann
      all_success = False
290 e2715f69 Michael Hanselmann
291 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
292 e2715f69 Michael Hanselmann
        pass
293 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
294 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
295 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
296 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
297 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
298 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
299 fbf0262f Michael Hanselmann
        break
300 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
301 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
302 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
303 f1da30e6 Michael Hanselmann
        break
304 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
305 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
306 4cb1d919 Michael Hanselmann
        break
307 e2715f69 Michael Hanselmann
308 e2715f69 Michael Hanselmann
    if all_success:
309 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
310 e2715f69 Michael Hanselmann
311 e2715f69 Michael Hanselmann
    return status
312 e2715f69 Michael Hanselmann
313 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
314 ea03467c Iustin Pop
    """Selectively returns the log entries.
315 ea03467c Iustin Pop

316 ea03467c Iustin Pop
    @type newer_than: None or int
317 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
318 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
319 ea03467c Iustin Pop
        than this value
320 ea03467c Iustin Pop
    @rtype: list
321 ea03467c Iustin Pop
    @return: the list of the log entries selected
322 ea03467c Iustin Pop

323 ea03467c Iustin Pop
    """
324 6c5a7090 Michael Hanselmann
    if newer_than is None:
325 6c5a7090 Michael Hanselmann
      serial = -1
326 6c5a7090 Michael Hanselmann
    else:
327 6c5a7090 Michael Hanselmann
      serial = newer_than
328 6c5a7090 Michael Hanselmann
329 6c5a7090 Michael Hanselmann
    entries = []
330 6c5a7090 Michael Hanselmann
    for op in self.ops:
331 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
332 6c5a7090 Michael Hanselmann
333 6c5a7090 Michael Hanselmann
    return entries
334 6c5a7090 Michael Hanselmann
335 6a290889 Guido Trotter
  def GetInfo(self, fields):
336 6a290889 Guido Trotter
    """Returns information about a job.
337 6a290889 Guido Trotter

338 6a290889 Guido Trotter
    @type fields: list
339 6a290889 Guido Trotter
    @param fields: names of fields to return
340 6a290889 Guido Trotter
    @rtype: list
341 6a290889 Guido Trotter
    @return: list with one element for each field
342 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
343 6a290889 Guido Trotter
        has been passed
344 6a290889 Guido Trotter

345 6a290889 Guido Trotter
    """
346 6a290889 Guido Trotter
    row = []
347 6a290889 Guido Trotter
    for fname in fields:
348 6a290889 Guido Trotter
      if fname == "id":
349 6a290889 Guido Trotter
        row.append(self.id)
350 6a290889 Guido Trotter
      elif fname == "status":
351 6a290889 Guido Trotter
        row.append(self.CalcStatus())
352 6a290889 Guido Trotter
      elif fname == "ops":
353 6a290889 Guido Trotter
        row.append([op.input.__getstate__() for op in self.ops])
354 6a290889 Guido Trotter
      elif fname == "opresult":
355 6a290889 Guido Trotter
        row.append([op.result for op in self.ops])
356 6a290889 Guido Trotter
      elif fname == "opstatus":
357 6a290889 Guido Trotter
        row.append([op.status for op in self.ops])
358 6a290889 Guido Trotter
      elif fname == "oplog":
359 6a290889 Guido Trotter
        row.append([op.log for op in self.ops])
360 6a290889 Guido Trotter
      elif fname == "opstart":
361 6a290889 Guido Trotter
        row.append([op.start_timestamp for op in self.ops])
362 6a290889 Guido Trotter
      elif fname == "opexec":
363 6a290889 Guido Trotter
        row.append([op.exec_timestamp for op in self.ops])
364 6a290889 Guido Trotter
      elif fname == "opend":
365 6a290889 Guido Trotter
        row.append([op.end_timestamp for op in self.ops])
366 6a290889 Guido Trotter
      elif fname == "received_ts":
367 6a290889 Guido Trotter
        row.append(self.received_timestamp)
368 6a290889 Guido Trotter
      elif fname == "start_ts":
369 6a290889 Guido Trotter
        row.append(self.start_timestamp)
370 6a290889 Guido Trotter
      elif fname == "end_ts":
371 6a290889 Guido Trotter
        row.append(self.end_timestamp)
372 6a290889 Guido Trotter
      elif fname == "lock_status":
373 6a290889 Guido Trotter
        row.append(self.lock_status)
374 6a290889 Guido Trotter
      elif fname == "summary":
375 6a290889 Guido Trotter
        row.append([op.input.Summary() for op in self.ops])
376 6a290889 Guido Trotter
      else:
377 6a290889 Guido Trotter
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
378 6a290889 Guido Trotter
    return row
379 6a290889 Guido Trotter
380 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
381 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
382 34327f51 Iustin Pop

383 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
384 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
385 34327f51 Iustin Pop
    finalised are not changed.
386 34327f51 Iustin Pop

387 34327f51 Iustin Pop
    @param status: a given opcode status
388 34327f51 Iustin Pop
    @param result: the opcode result
389 34327f51 Iustin Pop

390 34327f51 Iustin Pop
    """
391 34327f51 Iustin Pop
    not_marked = True
392 34327f51 Iustin Pop
    for op in self.ops:
393 34327f51 Iustin Pop
      if op.status in constants.OPS_FINALIZED:
394 34327f51 Iustin Pop
        assert not_marked, "Finalized opcodes found after non-finalized ones"
395 34327f51 Iustin Pop
        continue
396 34327f51 Iustin Pop
      op.status = status
397 34327f51 Iustin Pop
      op.result = result
398 34327f51 Iustin Pop
      not_marked = False
399 34327f51 Iustin Pop
400 f1048938 Iustin Pop
401 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
402 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
403 031a3e57 Michael Hanselmann
    """Initializes this class.
404 ea03467c Iustin Pop

405 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
406 031a3e57 Michael Hanselmann
    @param queue: Job queue
407 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
408 031a3e57 Michael Hanselmann
    @param job: Job object
409 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
410 031a3e57 Michael Hanselmann
    @param op: OpCode
411 031a3e57 Michael Hanselmann

412 031a3e57 Michael Hanselmann
    """
413 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
414 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
415 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
416 031a3e57 Michael Hanselmann
417 031a3e57 Michael Hanselmann
    self._queue = queue
418 031a3e57 Michael Hanselmann
    self._job = job
419 031a3e57 Michael Hanselmann
    self._op = op
420 031a3e57 Michael Hanselmann
421 031a3e57 Michael Hanselmann
  def NotifyStart(self):
422 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
423 e92376d7 Iustin Pop

424 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
425 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
426 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
427 031a3e57 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
428 e92376d7 Iustin Pop

429 e92376d7 Iustin Pop
    """
430 031a3e57 Michael Hanselmann
    self._queue.acquire()
431 e92376d7 Iustin Pop
    try:
432 031a3e57 Michael Hanselmann
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
433 031a3e57 Michael Hanselmann
                                 constants.OP_STATUS_CANCELING)
434 fbf0262f Michael Hanselmann
435 ef2df7d3 Michael Hanselmann
      # All locks are acquired by now
436 ef2df7d3 Michael Hanselmann
      self._job.lock_status = None
437 ef2df7d3 Michael Hanselmann
438 fbf0262f Michael Hanselmann
      # Cancel here if we were asked to
439 031a3e57 Michael Hanselmann
      if self._op.status == constants.OP_STATUS_CANCELING:
440 fbf0262f Michael Hanselmann
        raise CancelJob()
441 fbf0262f Michael Hanselmann
442 031a3e57 Michael Hanselmann
      self._op.status = constants.OP_STATUS_RUNNING
443 b9b5abcb Iustin Pop
      self._op.exec_timestamp = TimeStampNow()
444 e92376d7 Iustin Pop
    finally:
445 031a3e57 Michael Hanselmann
      self._queue.release()
446 031a3e57 Michael Hanselmann
447 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
448 031a3e57 Michael Hanselmann
    """Append a log entry.
449 031a3e57 Michael Hanselmann

450 031a3e57 Michael Hanselmann
    """
451 031a3e57 Michael Hanselmann
    assert len(args) < 3
452 031a3e57 Michael Hanselmann
453 031a3e57 Michael Hanselmann
    if len(args) == 1:
454 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
455 031a3e57 Michael Hanselmann
      log_msg = args[0]
456 031a3e57 Michael Hanselmann
    else:
457 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
458 031a3e57 Michael Hanselmann
459 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
460 031a3e57 Michael Hanselmann
    # precision.
461 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
462 e92376d7 Iustin Pop
463 031a3e57 Michael Hanselmann
    self._queue.acquire()
464 031a3e57 Michael Hanselmann
    try:
465 031a3e57 Michael Hanselmann
      self._job.log_serial += 1
466 031a3e57 Michael Hanselmann
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
467 b3855790 Guido Trotter
      self._queue.UpdateJobUnlocked(self._job, replicate=False)
468 031a3e57 Michael Hanselmann
469 031a3e57 Michael Hanselmann
      self._job.change.notifyAll()
470 031a3e57 Michael Hanselmann
    finally:
471 031a3e57 Michael Hanselmann
      self._queue.release()
472 031a3e57 Michael Hanselmann
473 ef2df7d3 Michael Hanselmann
  def ReportLocks(self, msg):
474 ef2df7d3 Michael Hanselmann
    """Write locking information to the job.
475 ef2df7d3 Michael Hanselmann

476 ef2df7d3 Michael Hanselmann
    Called whenever the LU processor is waiting for a lock or has acquired one.
477 ef2df7d3 Michael Hanselmann

478 ef2df7d3 Michael Hanselmann
    """
479 ef2df7d3 Michael Hanselmann
    # Not getting the queue lock because this is a single assignment
480 ef2df7d3 Michael Hanselmann
    self._job.lock_status = msg
481 ef2df7d3 Michael Hanselmann
482 031a3e57 Michael Hanselmann
483 6c2549d6 Guido Trotter
class _WaitForJobChangesHelper(object):
484 6c2549d6 Guido Trotter
  """Helper class using initofy to wait for changes in a job file.
485 6c2549d6 Guido Trotter

486 6c2549d6 Guido Trotter
  This class takes a previous job status and serial, and alerts the client when
487 6c2549d6 Guido Trotter
  the current job status has changed.
488 6c2549d6 Guido Trotter

489 6c2549d6 Guido Trotter
  @type job_id: string
490 6c2549d6 Guido Trotter
  @ivar job_id: id of the job we're watching
491 6c2549d6 Guido Trotter
  @type prev_job_info: string
492 6c2549d6 Guido Trotter
  @ivar prev_job_info: previous job info, as passed by the luxi client
493 6c2549d6 Guido Trotter
  @type prev_log_serial: string
494 6c2549d6 Guido Trotter
  @ivar prev_log_serial: previous job serial, as passed by the luxi client
495 6c2549d6 Guido Trotter
  @type queue: L{JobQueue}
496 6c2549d6 Guido Trotter
  @ivar queue: job queue (used for a few utility functions)
497 6c2549d6 Guido Trotter
  @type job_path: string
498 6c2549d6 Guido Trotter
  @ivar job_path: absolute path of the job file
499 6c2549d6 Guido Trotter
  @type wm: pyinotify.WatchManager (or None)
500 6c2549d6 Guido Trotter
  @ivar wm: inotify watch manager to watch for changes
501 6c2549d6 Guido Trotter
  @type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
502 6c2549d6 Guido Trotter
  @ivar inotify_handler: single file event handler, used for watching
503 6c2549d6 Guido Trotter
  @type notifier: pyinotify.Notifier
504 6c2549d6 Guido Trotter
  @ivar notifier: inotify single-threaded notifier, used for watching
505 6c2549d6 Guido Trotter

506 6c2549d6 Guido Trotter
  """
507 6c2549d6 Guido Trotter
508 6c2549d6 Guido Trotter
  def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
509 6c2549d6 Guido Trotter
    self.job_id = job_id
510 6c2549d6 Guido Trotter
    self.fields = fields
511 6c2549d6 Guido Trotter
    self.prev_job_info = prev_job_info
512 6c2549d6 Guido Trotter
    self.prev_log_serial = prev_log_serial
513 6c2549d6 Guido Trotter
    self.queue = queue
514 6c2549d6 Guido Trotter
    # pylint: disable-msg=W0212
515 6c2549d6 Guido Trotter
    self.job_path = self.queue._GetJobPath(self.job_id)
516 6c2549d6 Guido Trotter
    self.wm = None
517 6c2549d6 Guido Trotter
    self.inotify_handler = None
518 6c2549d6 Guido Trotter
    self.notifier = None
519 6c2549d6 Guido Trotter
520 6c2549d6 Guido Trotter
  def _SetupInotify(self):
521 6c2549d6 Guido Trotter
    """Create the inotify
522 6c2549d6 Guido Trotter

523 6c2549d6 Guido Trotter
    @raises errors.InotifyError: if the notifier cannot be setup
524 6c2549d6 Guido Trotter

525 6c2549d6 Guido Trotter
    """
526 6c2549d6 Guido Trotter
    if self.wm:
527 6c2549d6 Guido Trotter
      return
528 6c2549d6 Guido Trotter
    self.wm = pyinotify.WatchManager()
529 6c2549d6 Guido Trotter
    self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
530 6c2549d6 Guido Trotter
                                                                self.OnInotify,
531 6c2549d6 Guido Trotter
                                                                self.job_path)
532 6c2549d6 Guido Trotter
    self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
533 6c2549d6 Guido Trotter
    self.inotify_handler.enable()
534 6c2549d6 Guido Trotter
535 6c2549d6 Guido Trotter
  def _LoadDiskStatus(self):
536 6c2549d6 Guido Trotter
    job = self.queue.SafeLoadJobFromDisk(self.job_id)
537 6c2549d6 Guido Trotter
    if not job:
538 6c2549d6 Guido Trotter
      raise errors.JobLost()
539 6c2549d6 Guido Trotter
    self.job_status = job.CalcStatus()
540 6c2549d6 Guido Trotter
541 6c2549d6 Guido Trotter
    job_info = job.GetInfo(self.fields)
542 6c2549d6 Guido Trotter
    log_entries = job.GetLogEntries(self.prev_log_serial)
543 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
544 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
545 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
546 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
547 6c2549d6 Guido Trotter
    # significantly different.
548 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
549 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
550 6c2549d6 Guido Trotter
    self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
551 6c2549d6 Guido Trotter
    self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
552 6c2549d6 Guido Trotter
553 6c2549d6 Guido Trotter
  def _CheckForChanges(self):
554 6c2549d6 Guido Trotter
    self._LoadDiskStatus()
555 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
556 6c2549d6 Guido Trotter
    # no changes.
557 6c2549d6 Guido Trotter
    if (self.job_status not in (constants.JOB_STATUS_QUEUED,
558 6c2549d6 Guido Trotter
                                constants.JOB_STATUS_RUNNING,
559 6c2549d6 Guido Trotter
                                constants.JOB_STATUS_WAITLOCK) or
560 6c2549d6 Guido Trotter
        self.prev_job_info != self.job_info or
561 6c2549d6 Guido Trotter
        (self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
562 6c2549d6 Guido Trotter
      logging.debug("Job %s changed", self.job_id)
563 6c2549d6 Guido Trotter
      return (self.job_info, self.log_entries)
564 6c2549d6 Guido Trotter
565 6c2549d6 Guido Trotter
    raise utils.RetryAgain()
566 6c2549d6 Guido Trotter
567 6c2549d6 Guido Trotter
  def OnInotify(self, notifier_enabled):
568 6c2549d6 Guido Trotter
    if not notifier_enabled:
569 6c2549d6 Guido Trotter
      self.inotify_handler.enable()
570 6c2549d6 Guido Trotter
571 6c2549d6 Guido Trotter
  def WaitFn(self, timeout):
572 6c2549d6 Guido Trotter
    self._SetupInotify()
573 6c2549d6 Guido Trotter
    if self.notifier.check_events(timeout*1000):
574 6c2549d6 Guido Trotter
      self.notifier.read_events()
575 6c2549d6 Guido Trotter
    self.notifier.process_events()
576 6c2549d6 Guido Trotter
577 6c2549d6 Guido Trotter
  def WaitForChanges(self, timeout):
578 6c2549d6 Guido Trotter
    try:
579 6c2549d6 Guido Trotter
      return utils.Retry(self._CheckForChanges,
580 6c2549d6 Guido Trotter
                         utils.RETRY_REMAINING_TIME,
581 6c2549d6 Guido Trotter
                         timeout,
582 6c2549d6 Guido Trotter
                         wait_fn=self.WaitFn)
583 6c2549d6 Guido Trotter
    except (errors.InotifyError, errors.JobLost):
584 6c2549d6 Guido Trotter
      return None
585 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
586 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
587 6c2549d6 Guido Trotter
588 6c2549d6 Guido Trotter
  def Close(self):
589 6c2549d6 Guido Trotter
    if self.wm:
590 6c2549d6 Guido Trotter
      self.notifier.stop()
591 6c2549d6 Guido Trotter
592 6c2549d6 Guido Trotter
593 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
594 031a3e57 Michael Hanselmann
  """The actual job workers.
595 031a3e57 Michael Hanselmann

596 031a3e57 Michael Hanselmann
  """
597 7260cfbe Iustin Pop
  def RunTask(self, job): # pylint: disable-msg=W0221
598 e2715f69 Michael Hanselmann
    """Job executor.
599 e2715f69 Michael Hanselmann

600 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
601 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
602 e2715f69 Michael Hanselmann

603 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
604 ea03467c Iustin Pop
    @param job: the job to be processed
605 ea03467c Iustin Pop

606 e2715f69 Michael Hanselmann
    """
607 02fc74da Michael Hanselmann
    logging.info("Processing job %s", job.id)
608 adfa97e3 Guido Trotter
    proc = mcpu.Processor(self.pool.queue.context, job.id)
609 031a3e57 Michael Hanselmann
    queue = job.queue
610 e2715f69 Michael Hanselmann
    try:
611 85f03e0d Michael Hanselmann
      try:
612 85f03e0d Michael Hanselmann
        count = len(job.ops)
613 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
614 d21d09d6 Iustin Pop
          op_summary = op.input.Summary()
615 f6424741 Iustin Pop
          if op.status == constants.OP_STATUS_SUCCESS:
616 f6424741 Iustin Pop
            # this is a job that was partially completed before master
617 f6424741 Iustin Pop
            # daemon shutdown, so it can be expected that some opcodes
618 f6424741 Iustin Pop
            # are already completed successfully (if any did error
619 f6424741 Iustin Pop
            # out, then the whole job should have been aborted and not
620 f6424741 Iustin Pop
            # resubmitted for processing)
621 f6424741 Iustin Pop
            logging.info("Op %s/%s: opcode %s already processed, skipping",
622 f6424741 Iustin Pop
                         idx + 1, count, op_summary)
623 f6424741 Iustin Pop
            continue
624 85f03e0d Michael Hanselmann
          try:
625 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
626 d21d09d6 Iustin Pop
                         op_summary)
627 85f03e0d Michael Hanselmann
628 85f03e0d Michael Hanselmann
            queue.acquire()
629 85f03e0d Michael Hanselmann
            try:
630 df0fb067 Iustin Pop
              if op.status == constants.OP_STATUS_CANCELED:
631 df0fb067 Iustin Pop
                raise CancelJob()
632 fbf0262f Michael Hanselmann
              assert op.status == constants.OP_STATUS_QUEUED
633 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
634 85f03e0d Michael Hanselmann
              op.result = None
635 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
636 c56ec146 Iustin Pop
              if idx == 0: # first opcode
637 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
638 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
639 85f03e0d Michael Hanselmann
640 38206f3c Iustin Pop
              input_opcode = op.input
641 85f03e0d Michael Hanselmann
            finally:
642 85f03e0d Michael Hanselmann
              queue.release()
643 85f03e0d Michael Hanselmann
644 031a3e57 Michael Hanselmann
            # Make sure not to hold queue lock while calling ExecOpCode
645 031a3e57 Michael Hanselmann
            result = proc.ExecOpCode(input_opcode,
646 ef2df7d3 Michael Hanselmann
                                     _OpExecCallbacks(queue, job, op))
647 85f03e0d Michael Hanselmann
648 85f03e0d Michael Hanselmann
            queue.acquire()
649 85f03e0d Michael Hanselmann
            try:
650 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
651 85f03e0d Michael Hanselmann
              op.result = result
652 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
653 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
654 85f03e0d Michael Hanselmann
            finally:
655 85f03e0d Michael Hanselmann
              queue.release()
656 85f03e0d Michael Hanselmann
657 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Successfully finished opcode %s",
658 d21d09d6 Iustin Pop
                         idx + 1, count, op_summary)
659 fbf0262f Michael Hanselmann
          except CancelJob:
660 fbf0262f Michael Hanselmann
            # Will be handled further up
661 fbf0262f Michael Hanselmann
            raise
662 85f03e0d Michael Hanselmann
          except Exception, err:
663 85f03e0d Michael Hanselmann
            queue.acquire()
664 85f03e0d Michael Hanselmann
            try:
665 85f03e0d Michael Hanselmann
              try:
666 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
667 bcb66fca Iustin Pop
                if isinstance(err, errors.GenericError):
668 bcb66fca Iustin Pop
                  op.result = errors.EncodeException(err)
669 bcb66fca Iustin Pop
                else:
670 bcb66fca Iustin Pop
                  op.result = str(err)
671 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
672 0f6be82a Iustin Pop
                logging.info("Op %s/%s: Error in opcode %s: %s",
673 0f6be82a Iustin Pop
                             idx + 1, count, op_summary, err)
674 85f03e0d Michael Hanselmann
              finally:
675 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
676 85f03e0d Michael Hanselmann
            finally:
677 85f03e0d Michael Hanselmann
              queue.release()
678 85f03e0d Michael Hanselmann
            raise
679 85f03e0d Michael Hanselmann
680 fbf0262f Michael Hanselmann
      except CancelJob:
681 fbf0262f Michael Hanselmann
        queue.acquire()
682 fbf0262f Michael Hanselmann
        try:
683 fbf0262f Michael Hanselmann
          queue.CancelJobUnlocked(job)
684 fbf0262f Michael Hanselmann
        finally:
685 fbf0262f Michael Hanselmann
          queue.release()
686 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
687 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
688 85f03e0d Michael Hanselmann
      except:
689 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
690 e2715f69 Michael Hanselmann
    finally:
691 85f03e0d Michael Hanselmann
      queue.acquire()
692 85f03e0d Michael Hanselmann
      try:
693 65548ed5 Michael Hanselmann
        try:
694 ef2df7d3 Michael Hanselmann
          job.lock_status = None
695 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
696 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
697 65548ed5 Michael Hanselmann
        finally:
698 65548ed5 Michael Hanselmann
          job_id = job.id
699 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
700 85f03e0d Michael Hanselmann
      finally:
701 85f03e0d Michael Hanselmann
        queue.release()
702 ef2df7d3 Michael Hanselmann
703 02fc74da Michael Hanselmann
      logging.info("Finished job %s, status = %s", job_id, status)
704 e2715f69 Michael Hanselmann
705 e2715f69 Michael Hanselmann
706 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
707 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
708 ea03467c Iustin Pop

709 ea03467c Iustin Pop
  """
710 5bdce580 Michael Hanselmann
  def __init__(self, queue):
711 89e2b4d2 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
712 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
713 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
714 5bdce580 Michael Hanselmann
    self.queue = queue
715 e2715f69 Michael Hanselmann
716 e2715f69 Michael Hanselmann
717 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
718 6c881c52 Iustin Pop
  """Decorator for "public" functions.
719 ea03467c Iustin Pop

720 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
721 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
722 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
723 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
724 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
725 ea03467c Iustin Pop

726 6c881c52 Iustin Pop
  @warning: Use this decorator only after utils.LockedMethod!
727 f1da30e6 Michael Hanselmann

728 6c881c52 Iustin Pop
  Example::
729 6c881c52 Iustin Pop
    @utils.LockedMethod
730 6c881c52 Iustin Pop
    @_RequireOpenQueue
731 6c881c52 Iustin Pop
    def Example(self):
732 6c881c52 Iustin Pop
      pass
733 db37da70 Michael Hanselmann

734 6c881c52 Iustin Pop
  """
735 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
736 7260cfbe Iustin Pop
    # pylint: disable-msg=W0212
737 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
738 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
739 6c881c52 Iustin Pop
  return wrapper
740 db37da70 Michael Hanselmann
741 db37da70 Michael Hanselmann
742 6c881c52 Iustin Pop
class JobQueue(object):
743 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
744 db37da70 Michael Hanselmann

745 6c881c52 Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
746 6c881c52 Iustin Pop

747 6c881c52 Iustin Pop
  """
748 6c881c52 Iustin Pop
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
749 db37da70 Michael Hanselmann
750 85f03e0d Michael Hanselmann
  def __init__(self, context):
751 ea03467c Iustin Pop
    """Constructor for JobQueue.
752 ea03467c Iustin Pop

753 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
754 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
755 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
756 ea03467c Iustin Pop
    running).
757 ea03467c Iustin Pop

758 ea03467c Iustin Pop
    @type context: GanetiContext
759 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
760 ea03467c Iustin Pop
        data and other ganeti objects
761 ea03467c Iustin Pop

762 ea03467c Iustin Pop
    """
763 5bdce580 Michael Hanselmann
    self.context = context
764 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
765 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
766 f1da30e6 Michael Hanselmann
767 85f03e0d Michael Hanselmann
    # Locking
768 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
769 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
770 85f03e0d Michael Hanselmann
    self.release = self._lock.release
771 85f03e0d Michael Hanselmann
772 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
773 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
774 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
775 f1da30e6 Michael Hanselmann
776 04ab05ce Michael Hanselmann
    # Read serial file
777 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
778 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
779 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
780 c4beba1c Iustin Pop
781 23752136 Michael Hanselmann
    # Get initial list of nodes
782 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
783 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
784 59303563 Iustin Pop
                       if n.master_candidate)
785 8e00939c Michael Hanselmann
786 8e00939c Michael Hanselmann
    # Remove master node
787 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
788 23752136 Michael Hanselmann
789 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
790 23752136 Michael Hanselmann
791 20571a26 Guido Trotter
    self._queue_size = 0
792 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
793 20571a26 Guido Trotter
    self._drained = self._IsQueueMarkedDrain()
794 20571a26 Guido Trotter
795 85f03e0d Michael Hanselmann
    # Setup worker pool
796 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
797 85f03e0d Michael Hanselmann
    try:
798 16714921 Michael Hanselmann
      # We need to lock here because WorkerPool.AddTask() may start a job while
799 16714921 Michael Hanselmann
      # we're still doing our work.
800 16714921 Michael Hanselmann
      self.acquire()
801 16714921 Michael Hanselmann
      try:
802 711b5124 Michael Hanselmann
        logging.info("Inspecting job queue")
803 711b5124 Michael Hanselmann
804 711b5124 Michael Hanselmann
        all_job_ids = self._GetJobIDsUnlocked()
805 b7cb9024 Michael Hanselmann
        jobs_count = len(all_job_ids)
806 711b5124 Michael Hanselmann
        lastinfo = time.time()
807 711b5124 Michael Hanselmann
        for idx, job_id in enumerate(all_job_ids):
808 711b5124 Michael Hanselmann
          # Give an update every 1000 jobs or 10 seconds
809 b7cb9024 Michael Hanselmann
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
810 b7cb9024 Michael Hanselmann
              idx == (jobs_count - 1)):
811 711b5124 Michael Hanselmann
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
812 b7cb9024 Michael Hanselmann
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
813 711b5124 Michael Hanselmann
            lastinfo = time.time()
814 711b5124 Michael Hanselmann
815 711b5124 Michael Hanselmann
          job = self._LoadJobUnlocked(job_id)
816 711b5124 Michael Hanselmann
817 16714921 Michael Hanselmann
          # a failure in loading the job can cause 'None' to be returned
818 16714921 Michael Hanselmann
          if job is None:
819 16714921 Michael Hanselmann
            continue
820 94ed59a5 Iustin Pop
821 16714921 Michael Hanselmann
          status = job.CalcStatus()
822 85f03e0d Michael Hanselmann
823 16714921 Michael Hanselmann
          if status in (constants.JOB_STATUS_QUEUED, ):
824 16714921 Michael Hanselmann
            self._wpool.AddTask(job)
825 85f03e0d Michael Hanselmann
826 16714921 Michael Hanselmann
          elif status in (constants.JOB_STATUS_RUNNING,
827 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK,
828 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_CANCELING):
829 16714921 Michael Hanselmann
            logging.warning("Unfinished job %s found: %s", job.id, job)
830 16714921 Michael Hanselmann
            try:
831 34327f51 Iustin Pop
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
832 34327f51 Iustin Pop
                                    "Unclean master daemon shutdown")
833 16714921 Michael Hanselmann
            finally:
834 16714921 Michael Hanselmann
              self.UpdateJobUnlocked(job)
835 711b5124 Michael Hanselmann
836 711b5124 Michael Hanselmann
        logging.info("Job queue inspection finished")
837 16714921 Michael Hanselmann
      finally:
838 16714921 Michael Hanselmann
        self.release()
839 16714921 Michael Hanselmann
    except:
840 16714921 Michael Hanselmann
      self._wpool.TerminateWorkers()
841 16714921 Michael Hanselmann
      raise
842 85f03e0d Michael Hanselmann
843 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
844 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
845 99aabbed Iustin Pop
  def AddNode(self, node):
846 99aabbed Iustin Pop
    """Register a new node with the queue.
847 99aabbed Iustin Pop

848 99aabbed Iustin Pop
    @type node: L{objects.Node}
849 99aabbed Iustin Pop
    @param node: the node object to be added
850 99aabbed Iustin Pop

851 99aabbed Iustin Pop
    """
852 99aabbed Iustin Pop
    node_name = node.name
853 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
854 23752136 Michael Hanselmann
855 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
856 c8457ce7 Iustin Pop
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
857 3cebe102 Michael Hanselmann
    msg = result.fail_msg
858 c8457ce7 Iustin Pop
    if msg:
859 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
860 c8457ce7 Iustin Pop
                      node_name, msg)
861 23752136 Michael Hanselmann
862 59303563 Iustin Pop
    if not node.master_candidate:
863 59303563 Iustin Pop
      # remove if existing, ignoring errors
864 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
865 59303563 Iustin Pop
      # and skip the replication of the job ids
866 59303563 Iustin Pop
      return
867 59303563 Iustin Pop
868 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
869 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
870 23752136 Michael Hanselmann
871 d2e03a33 Michael Hanselmann
    # Upload current serial file
872 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
873 d2e03a33 Michael Hanselmann
874 d2e03a33 Michael Hanselmann
    for file_name in files:
875 9f774ee8 Michael Hanselmann
      # Read file content
876 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
877 9f774ee8 Michael Hanselmann
878 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
879 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
880 a3811745 Michael Hanselmann
                                                  file_name, content)
881 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
882 c8457ce7 Iustin Pop
      if msg:
883 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
884 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
885 d2e03a33 Michael Hanselmann
886 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
887 d2e03a33 Michael Hanselmann
888 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
889 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
890 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
891 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
892 ea03467c Iustin Pop

893 ea03467c Iustin Pop
    @type node_name: str
894 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
895 ea03467c Iustin Pop

896 ea03467c Iustin Pop
    """
897 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
898 23752136 Michael Hanselmann
899 7e950d31 Iustin Pop
  @staticmethod
900 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
901 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
902 ea03467c Iustin Pop

903 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
904 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
905 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
906 ea03467c Iustin Pop

907 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
908 ea03467c Iustin Pop
    @type nodes: list
909 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
910 ea03467c Iustin Pop
    @type failmsg: str
911 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
912 ea03467c Iustin Pop

913 ea03467c Iustin Pop
    """
914 e74798c1 Michael Hanselmann
    failed = []
915 e74798c1 Michael Hanselmann
    success = []
916 e74798c1 Michael Hanselmann
917 e74798c1 Michael Hanselmann
    for node in nodes:
918 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
919 c8457ce7 Iustin Pop
      if msg:
920 e74798c1 Michael Hanselmann
        failed.append(node)
921 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
922 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
923 c8457ce7 Iustin Pop
      else:
924 c8457ce7 Iustin Pop
        success.append(node)
925 e74798c1 Michael Hanselmann
926 e74798c1 Michael Hanselmann
    # +1 for the master node
927 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
928 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
929 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
930 e74798c1 Michael Hanselmann
931 99aabbed Iustin Pop
  def _GetNodeIp(self):
932 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
933 99aabbed Iustin Pop

934 ea03467c Iustin Pop
    @rtype: (list, list)
935 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
936 ea03467c Iustin Pop
        names and the second one with the node addresses
937 ea03467c Iustin Pop

938 99aabbed Iustin Pop
    """
939 99aabbed Iustin Pop
    name_list = self._nodes.keys()
940 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
941 99aabbed Iustin Pop
    return name_list, addr_list
942 99aabbed Iustin Pop
943 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
944 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
945 8e00939c Michael Hanselmann

946 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
947 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
948 ea03467c Iustin Pop

949 ea03467c Iustin Pop
    @type file_name: str
950 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
951 ea03467c Iustin Pop
    @type data: str
952 ea03467c Iustin Pop
    @param data: the new contents of the file
953 4c36bdf5 Guido Trotter
    @type replicate: boolean
954 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
955 ea03467c Iustin Pop

956 8e00939c Michael Hanselmann
    """
957 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
958 8e00939c Michael Hanselmann
959 4c36bdf5 Guido Trotter
    if replicate:
960 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
961 4c36bdf5 Guido Trotter
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
962 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
963 23752136 Michael Hanselmann
964 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
965 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
966 ea03467c Iustin Pop

967 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
968 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
969 ea03467c Iustin Pop

970 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
971 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
972 ea03467c Iustin Pop

973 ea03467c Iustin Pop
    """
974 dd875d32 Michael Hanselmann
    # Rename them locally
975 d7fd1f28 Michael Hanselmann
    for old, new in rename:
976 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
977 abc1f2ce Michael Hanselmann
978 dd875d32 Michael Hanselmann
    # ... and on all nodes
979 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
980 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
981 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
982 abc1f2ce Michael Hanselmann
983 7e950d31 Iustin Pop
  @staticmethod
984 7e950d31 Iustin Pop
  def _FormatJobID(job_id):
985 ea03467c Iustin Pop
    """Convert a job ID to string format.
986 ea03467c Iustin Pop

987 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
988 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
989 ea03467c Iustin Pop
    abstract this change.
990 ea03467c Iustin Pop

991 ea03467c Iustin Pop
    @type job_id: int or long
992 ea03467c Iustin Pop
    @param job_id: the numeric job id
993 ea03467c Iustin Pop
    @rtype: str
994 ea03467c Iustin Pop
    @return: the formatted job id
995 ea03467c Iustin Pop

996 ea03467c Iustin Pop
    """
997 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
998 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
999 85f03e0d Michael Hanselmann
    if job_id < 0:
1000 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1001 85f03e0d Michael Hanselmann
1002 85f03e0d Michael Hanselmann
    return str(job_id)
1003 85f03e0d Michael Hanselmann
1004 58b22b6e Michael Hanselmann
  @classmethod
1005 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
1006 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
1007 58b22b6e Michael Hanselmann

1008 58b22b6e Michael Hanselmann
    @type job_id: str
1009 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
1010 58b22b6e Michael Hanselmann
    @rtype: str
1011 58b22b6e Michael Hanselmann
    @return: Directory name
1012 58b22b6e Michael Hanselmann

1013 58b22b6e Michael Hanselmann
    """
1014 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1015 58b22b6e Michael Hanselmann
1016 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1017 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1018 f1da30e6 Michael Hanselmann

1019 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1020 f1da30e6 Michael Hanselmann

1021 009e73d0 Iustin Pop
    @type count: integer
1022 009e73d0 Iustin Pop
    @param count: how many serials to return
1023 ea03467c Iustin Pop
    @rtype: str
1024 ea03467c Iustin Pop
    @return: a string representing the job identifier.
1025 f1da30e6 Michael Hanselmann

1026 f1da30e6 Michael Hanselmann
    """
1027 009e73d0 Iustin Pop
    assert count > 0
1028 f1da30e6 Michael Hanselmann
    # New number
1029 009e73d0 Iustin Pop
    serial = self._last_serial + count
1030 f1da30e6 Michael Hanselmann
1031 f1da30e6 Michael Hanselmann
    # Write to file
1032 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1033 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1034 f1da30e6 Michael Hanselmann
1035 009e73d0 Iustin Pop
    result = [self._FormatJobID(v)
1036 009e73d0 Iustin Pop
              for v in range(self._last_serial, serial + 1)]
1037 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1038 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1039 f1da30e6 Michael Hanselmann
1040 009e73d0 Iustin Pop
    return result
1041 f1da30e6 Michael Hanselmann
1042 85f03e0d Michael Hanselmann
  @staticmethod
1043 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1044 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1045 ea03467c Iustin Pop

1046 ea03467c Iustin Pop
    @type job_id: str
1047 ea03467c Iustin Pop
    @param job_id: the job identifier
1048 ea03467c Iustin Pop
    @rtype: str
1049 ea03467c Iustin Pop
    @return: the path to the job file
1050 ea03467c Iustin Pop

1051 ea03467c Iustin Pop
    """
1052 c4feafe8 Iustin Pop
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1053 f1da30e6 Michael Hanselmann
1054 58b22b6e Michael Hanselmann
  @classmethod
1055 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
1056 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1057 ea03467c Iustin Pop

1058 ea03467c Iustin Pop
    @type job_id: str
1059 ea03467c Iustin Pop
    @param job_id: the job identifier
1060 ea03467c Iustin Pop
    @rtype: str
1061 ea03467c Iustin Pop
    @return: the path to the archived job file
1062 ea03467c Iustin Pop

1063 ea03467c Iustin Pop
    """
1064 0411c011 Iustin Pop
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1065 0411c011 Iustin Pop
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1066 0cb94105 Michael Hanselmann
1067 85a1c57d Guido Trotter
  def _GetJobIDsUnlocked(self, sort=True):
1068 911a495b Iustin Pop
    """Return all known job IDs.
1069 911a495b Iustin Pop

1070 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1071 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1072 ac0930b9 Iustin Pop
    extra IDs).
1073 ac0930b9 Iustin Pop

1074 85a1c57d Guido Trotter
    @type sort: boolean
1075 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1076 ea03467c Iustin Pop
    @rtype: list
1077 ea03467c Iustin Pop
    @return: the list of job IDs
1078 ea03467c Iustin Pop

1079 911a495b Iustin Pop
    """
1080 85a1c57d Guido Trotter
    jlist = []
1081 b5b8309d Guido Trotter
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1082 85a1c57d Guido Trotter
      m = self._RE_JOB_FILE.match(filename)
1083 85a1c57d Guido Trotter
      if m:
1084 85a1c57d Guido Trotter
        jlist.append(m.group(1))
1085 85a1c57d Guido Trotter
    if sort:
1086 85a1c57d Guido Trotter
      jlist = utils.NiceSort(jlist)
1087 f0d874fe Iustin Pop
    return jlist
1088 911a495b Iustin Pop
1089 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1090 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1091 ea03467c Iustin Pop

1092 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1093 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1094 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1095 ea03467c Iustin Pop

1096 ea03467c Iustin Pop
    @param job_id: the job id
1097 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1098 ea03467c Iustin Pop
    @return: either None or the job object
1099 ea03467c Iustin Pop

1100 ea03467c Iustin Pop
    """
1101 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
1102 5685c1a5 Michael Hanselmann
    if job:
1103 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
1104 5685c1a5 Michael Hanselmann
      return job
1105 ac0930b9 Iustin Pop
1106 3d6c5566 Guido Trotter
    try:
1107 3d6c5566 Guido Trotter
      job = self._LoadJobFromDisk(job_id)
1108 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
1109 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
1110 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
1111 3d6c5566 Guido Trotter
      if old_path == new_path:
1112 3d6c5566 Guido Trotter
        # job already archived (future case)
1113 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
1114 3d6c5566 Guido Trotter
      else:
1115 3d6c5566 Guido Trotter
        # non-archived case
1116 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
1117 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
1118 3d6c5566 Guido Trotter
      return None
1119 162c8636 Guido Trotter
1120 162c8636 Guido Trotter
    self._memcache[job_id] = job
1121 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
1122 162c8636 Guido Trotter
    return job
1123 162c8636 Guido Trotter
1124 162c8636 Guido Trotter
  def _LoadJobFromDisk(self, job_id):
1125 162c8636 Guido Trotter
    """Load the given job file from disk.
1126 162c8636 Guido Trotter

1127 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1128 162c8636 Guido Trotter

1129 162c8636 Guido Trotter
    @type job_id: string
1130 162c8636 Guido Trotter
    @param job_id: job identifier
1131 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
1132 162c8636 Guido Trotter
    @return: either None or the job object
1133 162c8636 Guido Trotter

1134 162c8636 Guido Trotter
    """
1135 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
1136 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
1137 f1da30e6 Michael Hanselmann
    try:
1138 13998ef2 Michael Hanselmann
      raw_data = utils.ReadFile(filepath)
1139 162c8636 Guido Trotter
    except EnvironmentError, err:
1140 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
1141 f1da30e6 Michael Hanselmann
        return None
1142 f1da30e6 Michael Hanselmann
      raise
1143 13998ef2 Michael Hanselmann
1144 94ed59a5 Iustin Pop
    try:
1145 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
1146 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
1147 7260cfbe Iustin Pop
    except Exception, err: # pylint: disable-msg=W0703
1148 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
1149 94ed59a5 Iustin Pop
1150 ac0930b9 Iustin Pop
    return job
1151 f1da30e6 Michael Hanselmann
1152 0f9c08dc Guido Trotter
  def SafeLoadJobFromDisk(self, job_id):
1153 0f9c08dc Guido Trotter
    """Load the given job file from disk.
1154 0f9c08dc Guido Trotter

1155 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1156 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
1157 0f9c08dc Guido Trotter
    exception is logged.
1158 0f9c08dc Guido Trotter

1159 0f9c08dc Guido Trotter
    @type job_id: string
1160 0f9c08dc Guido Trotter
    @param job_id: job identifier
1161 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
1162 0f9c08dc Guido Trotter
    @return: either None or the job object
1163 0f9c08dc Guido Trotter

1164 0f9c08dc Guido Trotter
    """
1165 0f9c08dc Guido Trotter
    try:
1166 0f9c08dc Guido Trotter
      return self._LoadJobFromDisk(job_id)
1167 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
1168 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
1169 0f9c08dc Guido Trotter
      return None
1170 0f9c08dc Guido Trotter
1171 686d7433 Iustin Pop
  @staticmethod
1172 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
1173 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
1174 686d7433 Iustin Pop

1175 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
1176 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
1177 686d7433 Iustin Pop

1178 ea03467c Iustin Pop
    @rtype: boolean
1179 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
1180 ea03467c Iustin Pop

1181 686d7433 Iustin Pop
    """
1182 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1183 686d7433 Iustin Pop
1184 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
1185 20571a26 Guido Trotter
    """Update the queue size.
1186 20571a26 Guido Trotter

1187 20571a26 Guido Trotter
    """
1188 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1189 20571a26 Guido Trotter
1190 20571a26 Guido Trotter
  @utils.LockedMethod
1191 20571a26 Guido Trotter
  @_RequireOpenQueue
1192 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
1193 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
1194 3ccafd0e Iustin Pop

1195 ea03467c Iustin Pop
    @type drain_flag: boolean
1196 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
1197 ea03467c Iustin Pop

1198 3ccafd0e Iustin Pop
    """
1199 3ccafd0e Iustin Pop
    if drain_flag:
1200 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1201 3ccafd0e Iustin Pop
    else:
1202 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1203 20571a26 Guido Trotter
1204 20571a26 Guido Trotter
    self._drained = drain_flag
1205 20571a26 Guido Trotter
1206 3ccafd0e Iustin Pop
    return True
1207 3ccafd0e Iustin Pop
1208 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1209 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
1210 85f03e0d Michael Hanselmann
    """Create and store a new job.
1211 f1da30e6 Michael Hanselmann

1212 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
1213 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
1214 c3f0a12f Iustin Pop

1215 009e73d0 Iustin Pop
    @type job_id: job ID
1216 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
1217 c3f0a12f Iustin Pop
    @type ops: list
1218 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
1219 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
1220 7beb1e53 Guido Trotter
    @return: the job object to be queued
1221 7beb1e53 Guido Trotter
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1222 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1223 c3f0a12f Iustin Pop

1224 c3f0a12f Iustin Pop
    """
1225 20571a26 Guido Trotter
    # Ok when sharing the big job queue lock, as the drain file is created when
1226 20571a26 Guido Trotter
    # the lock is exclusive.
1227 20571a26 Guido Trotter
    if self._drained:
1228 2971c913 Iustin Pop
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1229 f87b405e Michael Hanselmann
1230 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1231 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
1232 f87b405e Michael Hanselmann
1233 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
1234 f1da30e6 Michael Hanselmann
1235 f1da30e6 Michael Hanselmann
    # Write to disk
1236 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
1237 f1da30e6 Michael Hanselmann
1238 20571a26 Guido Trotter
    self._queue_size += 1
1239 20571a26 Guido Trotter
1240 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
1241 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
1242 ac0930b9 Iustin Pop
1243 7beb1e53 Guido Trotter
    return job
1244 f1da30e6 Michael Hanselmann
1245 2971c913 Iustin Pop
  @utils.LockedMethod
1246 2971c913 Iustin Pop
  @_RequireOpenQueue
1247 2971c913 Iustin Pop
  def SubmitJob(self, ops):
1248 2971c913 Iustin Pop
    """Create and store a new job.
1249 2971c913 Iustin Pop

1250 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1251 2971c913 Iustin Pop

1252 2971c913 Iustin Pop
    """
1253 009e73d0 Iustin Pop
    job_id = self._NewSerialsUnlocked(1)[0]
1254 7beb1e53 Guido Trotter
    self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
1255 7beb1e53 Guido Trotter
    return job_id
1256 2971c913 Iustin Pop
1257 2971c913 Iustin Pop
  @utils.LockedMethod
1258 2971c913 Iustin Pop
  @_RequireOpenQueue
1259 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
1260 2971c913 Iustin Pop
    """Create and store multiple jobs.
1261 2971c913 Iustin Pop

1262 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1263 2971c913 Iustin Pop

1264 2971c913 Iustin Pop
    """
1265 2971c913 Iustin Pop
    results = []
1266 7beb1e53 Guido Trotter
    tasks = []
1267 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1268 009e73d0 Iustin Pop
    for job_id, ops in zip(all_job_ids, jobs):
1269 2971c913 Iustin Pop
      try:
1270 7beb1e53 Guido Trotter
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1271 2971c913 Iustin Pop
        status = True
1272 7beb1e53 Guido Trotter
        data = job_id
1273 2971c913 Iustin Pop
      except errors.GenericError, err:
1274 2971c913 Iustin Pop
        data = str(err)
1275 2971c913 Iustin Pop
        status = False
1276 2971c913 Iustin Pop
      results.append((status, data))
1277 7beb1e53 Guido Trotter
    self._wpool.AddManyTasks(tasks)
1278 2971c913 Iustin Pop
1279 2971c913 Iustin Pop
    return results
1280 2971c913 Iustin Pop
1281 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1282 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
1283 ea03467c Iustin Pop
    """Update a job's on disk storage.
1284 ea03467c Iustin Pop

1285 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
1286 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
1287 ea03467c Iustin Pop
    nodes.
1288 ea03467c Iustin Pop

1289 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1290 ea03467c Iustin Pop
    @param job: the changed job
1291 4c36bdf5 Guido Trotter
    @type replicate: boolean
1292 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
1293 ea03467c Iustin Pop

1294 ea03467c Iustin Pop
    """
1295 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
1296 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
1297 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
1298 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
1299 ac0930b9 Iustin Pop
1300 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
1301 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
1302 dfe57c22 Michael Hanselmann
1303 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1304 5c735209 Iustin Pop
                        timeout):
1305 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
1306 6c5a7090 Michael Hanselmann

1307 6c5a7090 Michael Hanselmann
    @type job_id: string
1308 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
1309 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1310 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1311 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1312 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1313 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1314 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1315 5c735209 Iustin Pop
    @type timeout: float
1316 5c735209 Iustin Pop
    @param timeout: maximum time to wait
1317 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1318 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1319 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1320 ea03467c Iustin Pop

1321 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1322 ea03467c Iustin Pop
        we instead return a special value,
1323 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1324 ea03467c Iustin Pop
        as such by the clients
1325 6c5a7090 Michael Hanselmann

1326 6c5a7090 Michael Hanselmann
    """
1327 6c2549d6 Guido Trotter
    helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
1328 6c2549d6 Guido Trotter
                                      prev_log_serial, self)
1329 6bcb1446 Michael Hanselmann
    try:
1330 6c2549d6 Guido Trotter
      return helper.WaitForChanges(timeout)
1331 6c2549d6 Guido Trotter
    finally:
1332 6c2549d6 Guido Trotter
      helper.Close()
1333 dfe57c22 Michael Hanselmann
1334 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1335 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1336 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1337 188c5e0a Michael Hanselmann
    """Cancels a job.
1338 188c5e0a Michael Hanselmann

1339 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1340 ea03467c Iustin Pop

1341 188c5e0a Michael Hanselmann
    @type job_id: string
1342 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1343 188c5e0a Michael Hanselmann

1344 188c5e0a Michael Hanselmann
    """
1345 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1346 188c5e0a Michael Hanselmann
1347 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1348 188c5e0a Michael Hanselmann
    if not job:
1349 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1350 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1351 fbf0262f Michael Hanselmann
1352 fbf0262f Michael Hanselmann
    job_status = job.CalcStatus()
1353 188c5e0a Michael Hanselmann
1354 fbf0262f Michael Hanselmann
    if job_status not in (constants.JOB_STATUS_QUEUED,
1355 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK):
1356 a9e97393 Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1357 a9e97393 Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1358 fbf0262f Michael Hanselmann
1359 fbf0262f Michael Hanselmann
    if job_status == constants.JOB_STATUS_QUEUED:
1360 fbf0262f Michael Hanselmann
      self.CancelJobUnlocked(job)
1361 fbf0262f Michael Hanselmann
      return (True, "Job %s canceled" % job.id)
1362 188c5e0a Michael Hanselmann
1363 fbf0262f Michael Hanselmann
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1364 fbf0262f Michael Hanselmann
      # The worker will notice the new status and cancel the job
1365 fbf0262f Michael Hanselmann
      try:
1366 34327f51 Iustin Pop
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1367 fbf0262f Michael Hanselmann
      finally:
1368 fbf0262f Michael Hanselmann
        self.UpdateJobUnlocked(job)
1369 fbf0262f Michael Hanselmann
      return (True, "Job %s will be canceled" % job.id)
1370 fbf0262f Michael Hanselmann
1371 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1372 fbf0262f Michael Hanselmann
  def CancelJobUnlocked(self, job):
1373 fbf0262f Michael Hanselmann
    """Marks a job as canceled.
1374 fbf0262f Michael Hanselmann

1375 fbf0262f Michael Hanselmann
    """
1376 85f03e0d Michael Hanselmann
    try:
1377 34327f51 Iustin Pop
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1378 34327f51 Iustin Pop
                            "Job canceled by request")
1379 85f03e0d Michael Hanselmann
    finally:
1380 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
1381 188c5e0a Michael Hanselmann
1382 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1383 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
1384 d7fd1f28 Michael Hanselmann
    """Archives jobs.
1385 c609f802 Michael Hanselmann

1386 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
1387 25e7b43f Iustin Pop
    @param jobs: Job objects
1388 d7fd1f28 Michael Hanselmann
    @rtype: int
1389 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
1390 c609f802 Michael Hanselmann

1391 c609f802 Michael Hanselmann
    """
1392 d7fd1f28 Michael Hanselmann
    archive_jobs = []
1393 d7fd1f28 Michael Hanselmann
    rename_files = []
1394 d7fd1f28 Michael Hanselmann
    for job in jobs:
1395 d7fd1f28 Michael Hanselmann
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1396 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_SUCCESS,
1397 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_ERROR):
1398 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
1399 d7fd1f28 Michael Hanselmann
        continue
1400 c609f802 Michael Hanselmann
1401 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
1402 c609f802 Michael Hanselmann
1403 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
1404 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
1405 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
1406 c609f802 Michael Hanselmann
1407 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
1408 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
1409 f1da30e6 Michael Hanselmann
1410 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
1411 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
1412 d7fd1f28 Michael Hanselmann
1413 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1414 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
1415 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
1416 20571a26 Guido Trotter
    # archived jobs to fix this.
1417 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1418 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
1419 78d12585 Michael Hanselmann
1420 07cd723a Iustin Pop
  @utils.LockedMethod
1421 07cd723a Iustin Pop
  @_RequireOpenQueue
1422 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1423 07cd723a Iustin Pop
    """Archives a job.
1424 07cd723a Iustin Pop

1425 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1426 ea03467c Iustin Pop

1427 07cd723a Iustin Pop
    @type job_id: string
1428 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1429 78d12585 Michael Hanselmann
    @rtype: bool
1430 78d12585 Michael Hanselmann
    @return: Whether job was archived
1431 07cd723a Iustin Pop

1432 07cd723a Iustin Pop
    """
1433 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
1434 78d12585 Michael Hanselmann
1435 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1436 78d12585 Michael Hanselmann
    if not job:
1437 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1438 78d12585 Michael Hanselmann
      return False
1439 78d12585 Michael Hanselmann
1440 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
1441 07cd723a Iustin Pop
1442 07cd723a Iustin Pop
  @utils.LockedMethod
1443 07cd723a Iustin Pop
  @_RequireOpenQueue
1444 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
1445 07cd723a Iustin Pop
    """Archives all jobs based on age.
1446 07cd723a Iustin Pop

1447 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1448 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1449 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1450 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1451 07cd723a Iustin Pop

1452 07cd723a Iustin Pop
    @type age: int
1453 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1454 07cd723a Iustin Pop

1455 07cd723a Iustin Pop
    """
1456 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1457 07cd723a Iustin Pop
1458 07cd723a Iustin Pop
    now = time.time()
1459 f8ad5591 Michael Hanselmann
    end_time = now + timeout
1460 f8ad5591 Michael Hanselmann
    archived_count = 0
1461 f8ad5591 Michael Hanselmann
    last_touched = 0
1462 f8ad5591 Michael Hanselmann
1463 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
1464 d7fd1f28 Michael Hanselmann
    pending = []
1465 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1466 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
1467 f8ad5591 Michael Hanselmann
1468 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
1469 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
1470 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
1471 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
1472 f8ad5591 Michael Hanselmann
        break
1473 f8ad5591 Michael Hanselmann
1474 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
1475 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1476 f8ad5591 Michael Hanselmann
      if job:
1477 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
1478 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
1479 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
1480 f8ad5591 Michael Hanselmann
          else:
1481 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
1482 07cd723a Iustin Pop
        else:
1483 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
1484 f8ad5591 Michael Hanselmann
1485 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
1486 d7fd1f28 Michael Hanselmann
          pending.append(job)
1487 d7fd1f28 Michael Hanselmann
1488 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
1489 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
1490 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
1491 d7fd1f28 Michael Hanselmann
            pending = []
1492 f8ad5591 Michael Hanselmann
1493 d7fd1f28 Michael Hanselmann
    if pending:
1494 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
1495 07cd723a Iustin Pop
1496 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
1497 07cd723a Iustin Pop
1498 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1499 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1500 e2715f69 Michael Hanselmann

1501 ea03467c Iustin Pop
    @type job_ids: list
1502 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1503 ea03467c Iustin Pop
    @type fields: list
1504 ea03467c Iustin Pop
    @param fields: names of fields to return
1505 ea03467c Iustin Pop
    @rtype: list
1506 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1507 ea03467c Iustin Pop
        the requested fields
1508 e2715f69 Michael Hanselmann

1509 e2715f69 Michael Hanselmann
    """
1510 85f03e0d Michael Hanselmann
    jobs = []
1511 9f7b4967 Guido Trotter
    list_all = False
1512 9f7b4967 Guido Trotter
    if not job_ids:
1513 9f7b4967 Guido Trotter
      # Since files are added to/removed from the queue atomically, there's no
1514 9f7b4967 Guido Trotter
      # risk of getting the job ids in an inconsistent state.
1515 9f7b4967 Guido Trotter
      job_ids = self._GetJobIDsUnlocked()
1516 9f7b4967 Guido Trotter
      list_all = True
1517 e2715f69 Michael Hanselmann
1518 9f7b4967 Guido Trotter
    for job_id in job_ids:
1519 9f7b4967 Guido Trotter
      job = self.SafeLoadJobFromDisk(job_id)
1520 9f7b4967 Guido Trotter
      if job is not None:
1521 6a290889 Guido Trotter
        jobs.append(job.GetInfo(fields))
1522 9f7b4967 Guido Trotter
      elif not list_all:
1523 9f7b4967 Guido Trotter
        jobs.append(None)
1524 e2715f69 Michael Hanselmann
1525 85f03e0d Michael Hanselmann
    return jobs
1526 e2715f69 Michael Hanselmann
1527 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1528 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1529 e2715f69 Michael Hanselmann
  def Shutdown(self):
1530 e2715f69 Michael Hanselmann
    """Stops the job queue.
1531 e2715f69 Michael Hanselmann

1532 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1533 ea03467c Iustin Pop

1534 e2715f69 Michael Hanselmann
    """
1535 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1536 85f03e0d Michael Hanselmann
1537 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
1538 a71f9c7d Guido Trotter
    self._queue_filelock = None