Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ adb6d685

History | View | Annotate | Download (45.4 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 e2715f69 Michael Hanselmann
import threading
35 f1da30e6 Michael Hanselmann
import errno
36 f1da30e6 Michael Hanselmann
import re
37 f1048938 Iustin Pop
import time
38 5685c1a5 Michael Hanselmann
import weakref
39 498ae1cc Iustin Pop
40 6c2549d6 Guido Trotter
try:
41 6c2549d6 Guido Trotter
  # pylint: disable-msg=E0611
42 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
43 6c2549d6 Guido Trotter
except ImportError:
44 6c2549d6 Guido Trotter
  import pyinotify
45 6c2549d6 Guido Trotter
46 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
47 e2715f69 Michael Hanselmann
from ganeti import constants
48 f1da30e6 Michael Hanselmann
from ganeti import serializer
49 e2715f69 Michael Hanselmann
from ganeti import workerpool
50 f1da30e6 Michael Hanselmann
from ganeti import opcodes
51 7a1ecaed Iustin Pop
from ganeti import errors
52 e2715f69 Michael Hanselmann
from ganeti import mcpu
53 7996a135 Iustin Pop
from ganeti import utils
54 04ab05ce Michael Hanselmann
from ganeti import jstore
55 c3f0a12f Iustin Pop
from ganeti import rpc
56 e2715f69 Michael Hanselmann
57 fbf0262f Michael Hanselmann
58 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
59 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
60 e2715f69 Michael Hanselmann
61 498ae1cc Iustin Pop
62 9728ae5d Iustin Pop
class CancelJob(Exception):
63 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
64 fbf0262f Michael Hanselmann

65 fbf0262f Michael Hanselmann
  """
66 fbf0262f Michael Hanselmann
67 fbf0262f Michael Hanselmann
68 70552c46 Michael Hanselmann
def TimeStampNow():
69 ea03467c Iustin Pop
  """Returns the current timestamp.
70 ea03467c Iustin Pop

71 ea03467c Iustin Pop
  @rtype: tuple
72 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
73 ea03467c Iustin Pop

74 ea03467c Iustin Pop
  """
75 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
76 70552c46 Michael Hanselmann
77 70552c46 Michael Hanselmann
78 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
79 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
80 e2715f69 Michael Hanselmann

81 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
82 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
83 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
84 ea03467c Iustin Pop
  @ivar status: the current status
85 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
86 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
87 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
88 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
89 f1048938 Iustin Pop

90 e2715f69 Michael Hanselmann
  """
91 66d895a8 Iustin Pop
  __slots__ = ["input", "status", "result", "log",
92 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
93 66d895a8 Iustin Pop
               "__weakref__"]
94 66d895a8 Iustin Pop
95 85f03e0d Michael Hanselmann
  def __init__(self, op):
96 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
97 ea03467c Iustin Pop

98 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
99 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
100 ea03467c Iustin Pop

101 ea03467c Iustin Pop
    """
102 85f03e0d Michael Hanselmann
    self.input = op
103 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
104 85f03e0d Michael Hanselmann
    self.result = None
105 85f03e0d Michael Hanselmann
    self.log = []
106 70552c46 Michael Hanselmann
    self.start_timestamp = None
107 b9b5abcb Iustin Pop
    self.exec_timestamp = None
108 70552c46 Michael Hanselmann
    self.end_timestamp = None
109 f1da30e6 Michael Hanselmann
110 f1da30e6 Michael Hanselmann
  @classmethod
111 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
112 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
113 ea03467c Iustin Pop

114 ea03467c Iustin Pop
    @type state: dict
115 ea03467c Iustin Pop
    @param state: the serialized state
116 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
117 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
118 ea03467c Iustin Pop

119 ea03467c Iustin Pop
    """
120 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
121 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
122 85f03e0d Michael Hanselmann
    obj.status = state["status"]
123 85f03e0d Michael Hanselmann
    obj.result = state["result"]
124 85f03e0d Michael Hanselmann
    obj.log = state["log"]
125 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
126 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
127 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
128 f1da30e6 Michael Hanselmann
    return obj
129 f1da30e6 Michael Hanselmann
130 f1da30e6 Michael Hanselmann
  def Serialize(self):
131 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
132 ea03467c Iustin Pop

133 ea03467c Iustin Pop
    @rtype: dict
134 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
135 ea03467c Iustin Pop

136 ea03467c Iustin Pop
    """
137 6c5a7090 Michael Hanselmann
    return {
138 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
139 6c5a7090 Michael Hanselmann
      "status": self.status,
140 6c5a7090 Michael Hanselmann
      "result": self.result,
141 6c5a7090 Michael Hanselmann
      "log": self.log,
142 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
143 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
144 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
145 6c5a7090 Michael Hanselmann
      }
146 f1048938 Iustin Pop
147 e2715f69 Michael Hanselmann
148 e2715f69 Michael Hanselmann
class _QueuedJob(object):
149 e2715f69 Michael Hanselmann
  """In-memory job representation.
150 e2715f69 Michael Hanselmann

151 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
152 ea03467c Iustin Pop
  be taken care of by users of this class.
153 ea03467c Iustin Pop

154 ea03467c Iustin Pop
  @type queue: L{JobQueue}
155 ea03467c Iustin Pop
  @ivar queue: the parent queue
156 ea03467c Iustin Pop
  @ivar id: the job ID
157 ea03467c Iustin Pop
  @type ops: list
158 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
159 ea03467c Iustin Pop
  @type log_serial: int
160 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
161 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
162 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
163 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
164 ef2df7d3 Michael Hanselmann
  @ivar lock_status: In-memory locking information for debugging
165 e2715f69 Michael Hanselmann

166 e2715f69 Michael Hanselmann
  """
167 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
168 d25c1d6a Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial",
169 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
170 ef2df7d3 Michael Hanselmann
               "lock_status", "change",
171 66d895a8 Iustin Pop
               "__weakref__"]
172 66d895a8 Iustin Pop
173 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
174 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
175 ea03467c Iustin Pop

176 ea03467c Iustin Pop
    @type queue: L{JobQueue}
177 ea03467c Iustin Pop
    @param queue: our parent queue
178 ea03467c Iustin Pop
    @type job_id: job_id
179 ea03467c Iustin Pop
    @param job_id: our job id
180 ea03467c Iustin Pop
    @type ops: list
181 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
182 ea03467c Iustin Pop
        in _QueuedOpCodes
183 ea03467c Iustin Pop

184 ea03467c Iustin Pop
    """
185 e2715f69 Michael Hanselmann
    if not ops:
186 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
187 e2715f69 Michael Hanselmann
188 85f03e0d Michael Hanselmann
    self.queue = queue
189 f1da30e6 Michael Hanselmann
    self.id = job_id
190 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
191 6c5a7090 Michael Hanselmann
    self.log_serial = 0
192 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
193 c56ec146 Iustin Pop
    self.start_timestamp = None
194 c56ec146 Iustin Pop
    self.end_timestamp = None
195 6c5a7090 Michael Hanselmann
196 ef2df7d3 Michael Hanselmann
    # In-memory attributes
197 ef2df7d3 Michael Hanselmann
    self.lock_status = None
198 ef2df7d3 Michael Hanselmann
199 9fa2e150 Michael Hanselmann
  def __repr__(self):
200 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
201 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
202 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
203 9fa2e150 Michael Hanselmann
204 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
205 9fa2e150 Michael Hanselmann
206 f1da30e6 Michael Hanselmann
  @classmethod
207 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
208 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
209 ea03467c Iustin Pop

210 ea03467c Iustin Pop
    @type queue: L{JobQueue}
211 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
212 ea03467c Iustin Pop
    @type state: dict
213 ea03467c Iustin Pop
    @param state: the serialized state
214 ea03467c Iustin Pop
    @rtype: _JobQueue
215 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
216 ea03467c Iustin Pop

217 ea03467c Iustin Pop
    """
218 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
219 85f03e0d Michael Hanselmann
    obj.queue = queue
220 85f03e0d Michael Hanselmann
    obj.id = state["id"]
221 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
222 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
223 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
224 6c5a7090 Michael Hanselmann
225 ef2df7d3 Michael Hanselmann
    # In-memory attributes
226 ef2df7d3 Michael Hanselmann
    obj.lock_status = None
227 ef2df7d3 Michael Hanselmann
228 6c5a7090 Michael Hanselmann
    obj.ops = []
229 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
230 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
231 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
232 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
233 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
234 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
235 6c5a7090 Michael Hanselmann
236 f1da30e6 Michael Hanselmann
    return obj
237 f1da30e6 Michael Hanselmann
238 f1da30e6 Michael Hanselmann
  def Serialize(self):
239 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
240 ea03467c Iustin Pop

241 ea03467c Iustin Pop
    @rtype: dict
242 ea03467c Iustin Pop
    @return: the serialized state
243 ea03467c Iustin Pop

244 ea03467c Iustin Pop
    """
245 f1da30e6 Michael Hanselmann
    return {
246 f1da30e6 Michael Hanselmann
      "id": self.id,
247 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
248 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
249 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
250 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
251 f1da30e6 Michael Hanselmann
      }
252 f1da30e6 Michael Hanselmann
253 85f03e0d Michael Hanselmann
  def CalcStatus(self):
254 ea03467c Iustin Pop
    """Compute the status of this job.
255 ea03467c Iustin Pop

256 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
257 ea03467c Iustin Pop
    based on their status, computes the job status.
258 ea03467c Iustin Pop

259 ea03467c Iustin Pop
    The algorithm is:
260 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
261 ea03467c Iustin Pop
        status will be the same
262 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
263 ea03467c Iustin Pop
          - waitlock
264 fbf0262f Michael Hanselmann
          - canceling
265 ea03467c Iustin Pop
          - running
266 ea03467c Iustin Pop

267 ea03467c Iustin Pop
        will determine the job status
268 ea03467c Iustin Pop

269 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
270 ea03467c Iustin Pop
        and the job status will be the same
271 ea03467c Iustin Pop

272 ea03467c Iustin Pop
    @return: the job status
273 ea03467c Iustin Pop

274 ea03467c Iustin Pop
    """
275 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
276 e2715f69 Michael Hanselmann
277 e2715f69 Michael Hanselmann
    all_success = True
278 85f03e0d Michael Hanselmann
    for op in self.ops:
279 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
280 e2715f69 Michael Hanselmann
        continue
281 e2715f69 Michael Hanselmann
282 e2715f69 Michael Hanselmann
      all_success = False
283 e2715f69 Michael Hanselmann
284 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
285 e2715f69 Michael Hanselmann
        pass
286 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
287 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
288 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
289 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
290 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
291 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
292 fbf0262f Michael Hanselmann
        break
293 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
294 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
295 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
296 f1da30e6 Michael Hanselmann
        break
297 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
298 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
299 4cb1d919 Michael Hanselmann
        break
300 e2715f69 Michael Hanselmann
301 e2715f69 Michael Hanselmann
    if all_success:
302 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
303 e2715f69 Michael Hanselmann
304 e2715f69 Michael Hanselmann
    return status
305 e2715f69 Michael Hanselmann
306 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
307 ea03467c Iustin Pop
    """Selectively returns the log entries.
308 ea03467c Iustin Pop

309 ea03467c Iustin Pop
    @type newer_than: None or int
310 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
311 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
312 ea03467c Iustin Pop
        than this value
313 ea03467c Iustin Pop
    @rtype: list
314 ea03467c Iustin Pop
    @return: the list of the log entries selected
315 ea03467c Iustin Pop

316 ea03467c Iustin Pop
    """
317 6c5a7090 Michael Hanselmann
    if newer_than is None:
318 6c5a7090 Michael Hanselmann
      serial = -1
319 6c5a7090 Michael Hanselmann
    else:
320 6c5a7090 Michael Hanselmann
      serial = newer_than
321 6c5a7090 Michael Hanselmann
322 6c5a7090 Michael Hanselmann
    entries = []
323 6c5a7090 Michael Hanselmann
    for op in self.ops:
324 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
325 6c5a7090 Michael Hanselmann
326 6c5a7090 Michael Hanselmann
    return entries
327 6c5a7090 Michael Hanselmann
328 6a290889 Guido Trotter
  def GetInfo(self, fields):
329 6a290889 Guido Trotter
    """Returns information about a job.
330 6a290889 Guido Trotter

331 6a290889 Guido Trotter
    @type fields: list
332 6a290889 Guido Trotter
    @param fields: names of fields to return
333 6a290889 Guido Trotter
    @rtype: list
334 6a290889 Guido Trotter
    @return: list with one element for each field
335 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
336 6a290889 Guido Trotter
        has been passed
337 6a290889 Guido Trotter

338 6a290889 Guido Trotter
    """
339 6a290889 Guido Trotter
    row = []
340 6a290889 Guido Trotter
    for fname in fields:
341 6a290889 Guido Trotter
      if fname == "id":
342 6a290889 Guido Trotter
        row.append(self.id)
343 6a290889 Guido Trotter
      elif fname == "status":
344 6a290889 Guido Trotter
        row.append(self.CalcStatus())
345 6a290889 Guido Trotter
      elif fname == "ops":
346 6a290889 Guido Trotter
        row.append([op.input.__getstate__() for op in self.ops])
347 6a290889 Guido Trotter
      elif fname == "opresult":
348 6a290889 Guido Trotter
        row.append([op.result for op in self.ops])
349 6a290889 Guido Trotter
      elif fname == "opstatus":
350 6a290889 Guido Trotter
        row.append([op.status for op in self.ops])
351 6a290889 Guido Trotter
      elif fname == "oplog":
352 6a290889 Guido Trotter
        row.append([op.log for op in self.ops])
353 6a290889 Guido Trotter
      elif fname == "opstart":
354 6a290889 Guido Trotter
        row.append([op.start_timestamp for op in self.ops])
355 6a290889 Guido Trotter
      elif fname == "opexec":
356 6a290889 Guido Trotter
        row.append([op.exec_timestamp for op in self.ops])
357 6a290889 Guido Trotter
      elif fname == "opend":
358 6a290889 Guido Trotter
        row.append([op.end_timestamp for op in self.ops])
359 6a290889 Guido Trotter
      elif fname == "received_ts":
360 6a290889 Guido Trotter
        row.append(self.received_timestamp)
361 6a290889 Guido Trotter
      elif fname == "start_ts":
362 6a290889 Guido Trotter
        row.append(self.start_timestamp)
363 6a290889 Guido Trotter
      elif fname == "end_ts":
364 6a290889 Guido Trotter
        row.append(self.end_timestamp)
365 6a290889 Guido Trotter
      elif fname == "lock_status":
366 6a290889 Guido Trotter
        row.append(self.lock_status)
367 6a290889 Guido Trotter
      elif fname == "summary":
368 6a290889 Guido Trotter
        row.append([op.input.Summary() for op in self.ops])
369 6a290889 Guido Trotter
      else:
370 6a290889 Guido Trotter
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
371 6a290889 Guido Trotter
    return row
372 6a290889 Guido Trotter
373 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
374 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
375 34327f51 Iustin Pop

376 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
377 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
378 34327f51 Iustin Pop
    finalised are not changed.
379 34327f51 Iustin Pop

380 34327f51 Iustin Pop
    @param status: a given opcode status
381 34327f51 Iustin Pop
    @param result: the opcode result
382 34327f51 Iustin Pop

383 34327f51 Iustin Pop
    """
384 34327f51 Iustin Pop
    not_marked = True
385 34327f51 Iustin Pop
    for op in self.ops:
386 34327f51 Iustin Pop
      if op.status in constants.OPS_FINALIZED:
387 34327f51 Iustin Pop
        assert not_marked, "Finalized opcodes found after non-finalized ones"
388 34327f51 Iustin Pop
        continue
389 34327f51 Iustin Pop
      op.status = status
390 34327f51 Iustin Pop
      op.result = result
391 34327f51 Iustin Pop
      not_marked = False
392 34327f51 Iustin Pop
393 f1048938 Iustin Pop
394 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
395 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
396 031a3e57 Michael Hanselmann
    """Initializes this class.
397 ea03467c Iustin Pop

398 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
399 031a3e57 Michael Hanselmann
    @param queue: Job queue
400 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
401 031a3e57 Michael Hanselmann
    @param job: Job object
402 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
403 031a3e57 Michael Hanselmann
    @param op: OpCode
404 031a3e57 Michael Hanselmann

405 031a3e57 Michael Hanselmann
    """
406 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
407 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
408 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
409 031a3e57 Michael Hanselmann
410 031a3e57 Michael Hanselmann
    self._queue = queue
411 031a3e57 Michael Hanselmann
    self._job = job
412 031a3e57 Michael Hanselmann
    self._op = op
413 031a3e57 Michael Hanselmann
414 031a3e57 Michael Hanselmann
  def NotifyStart(self):
415 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
416 e92376d7 Iustin Pop

417 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
418 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
419 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
420 031a3e57 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
421 e92376d7 Iustin Pop

422 e92376d7 Iustin Pop
    """
423 031a3e57 Michael Hanselmann
    self._queue.acquire()
424 e92376d7 Iustin Pop
    try:
425 031a3e57 Michael Hanselmann
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
426 031a3e57 Michael Hanselmann
                                 constants.OP_STATUS_CANCELING)
427 fbf0262f Michael Hanselmann
428 ef2df7d3 Michael Hanselmann
      # All locks are acquired by now
429 ef2df7d3 Michael Hanselmann
      self._job.lock_status = None
430 ef2df7d3 Michael Hanselmann
431 fbf0262f Michael Hanselmann
      # Cancel here if we were asked to
432 031a3e57 Michael Hanselmann
      if self._op.status == constants.OP_STATUS_CANCELING:
433 fbf0262f Michael Hanselmann
        raise CancelJob()
434 fbf0262f Michael Hanselmann
435 031a3e57 Michael Hanselmann
      self._op.status = constants.OP_STATUS_RUNNING
436 b9b5abcb Iustin Pop
      self._op.exec_timestamp = TimeStampNow()
437 e92376d7 Iustin Pop
    finally:
438 031a3e57 Michael Hanselmann
      self._queue.release()
439 031a3e57 Michael Hanselmann
440 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
441 031a3e57 Michael Hanselmann
    """Append a log entry.
442 031a3e57 Michael Hanselmann

443 031a3e57 Michael Hanselmann
    """
444 031a3e57 Michael Hanselmann
    assert len(args) < 3
445 031a3e57 Michael Hanselmann
446 031a3e57 Michael Hanselmann
    if len(args) == 1:
447 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
448 031a3e57 Michael Hanselmann
      log_msg = args[0]
449 031a3e57 Michael Hanselmann
    else:
450 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
451 031a3e57 Michael Hanselmann
452 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
453 031a3e57 Michael Hanselmann
    # precision.
454 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
455 e92376d7 Iustin Pop
456 031a3e57 Michael Hanselmann
    self._queue.acquire()
457 031a3e57 Michael Hanselmann
    try:
458 031a3e57 Michael Hanselmann
      self._job.log_serial += 1
459 031a3e57 Michael Hanselmann
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
460 b3855790 Guido Trotter
      self._queue.UpdateJobUnlocked(self._job, replicate=False)
461 031a3e57 Michael Hanselmann
    finally:
462 031a3e57 Michael Hanselmann
      self._queue.release()
463 031a3e57 Michael Hanselmann
464 ef2df7d3 Michael Hanselmann
  def ReportLocks(self, msg):
465 ef2df7d3 Michael Hanselmann
    """Write locking information to the job.
466 ef2df7d3 Michael Hanselmann

467 ef2df7d3 Michael Hanselmann
    Called whenever the LU processor is waiting for a lock or has acquired one.
468 ef2df7d3 Michael Hanselmann

469 ef2df7d3 Michael Hanselmann
    """
470 ef2df7d3 Michael Hanselmann
    # Not getting the queue lock because this is a single assignment
471 ef2df7d3 Michael Hanselmann
    self._job.lock_status = msg
472 ef2df7d3 Michael Hanselmann
473 031a3e57 Michael Hanselmann
474 6c2549d6 Guido Trotter
class _WaitForJobChangesHelper(object):
475 6c2549d6 Guido Trotter
  """Helper class using initofy to wait for changes in a job file.
476 6c2549d6 Guido Trotter

477 6c2549d6 Guido Trotter
  This class takes a previous job status and serial, and alerts the client when
478 6c2549d6 Guido Trotter
  the current job status has changed.
479 6c2549d6 Guido Trotter

480 6c2549d6 Guido Trotter
  @type job_id: string
481 6c2549d6 Guido Trotter
  @ivar job_id: id of the job we're watching
482 6c2549d6 Guido Trotter
  @type prev_job_info: string
483 6c2549d6 Guido Trotter
  @ivar prev_job_info: previous job info, as passed by the luxi client
484 6c2549d6 Guido Trotter
  @type prev_log_serial: string
485 6c2549d6 Guido Trotter
  @ivar prev_log_serial: previous job serial, as passed by the luxi client
486 6c2549d6 Guido Trotter
  @type queue: L{JobQueue}
487 6c2549d6 Guido Trotter
  @ivar queue: job queue (used for a few utility functions)
488 6c2549d6 Guido Trotter
  @type job_path: string
489 6c2549d6 Guido Trotter
  @ivar job_path: absolute path of the job file
490 6c2549d6 Guido Trotter
  @type wm: pyinotify.WatchManager (or None)
491 6c2549d6 Guido Trotter
  @ivar wm: inotify watch manager to watch for changes
492 6c2549d6 Guido Trotter
  @type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
493 6c2549d6 Guido Trotter
  @ivar inotify_handler: single file event handler, used for watching
494 6c2549d6 Guido Trotter
  @type notifier: pyinotify.Notifier
495 6c2549d6 Guido Trotter
  @ivar notifier: inotify single-threaded notifier, used for watching
496 6c2549d6 Guido Trotter

497 6c2549d6 Guido Trotter
  """
498 6c2549d6 Guido Trotter
499 6c2549d6 Guido Trotter
  def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
500 6c2549d6 Guido Trotter
    self.job_id = job_id
501 6c2549d6 Guido Trotter
    self.fields = fields
502 6c2549d6 Guido Trotter
    self.prev_job_info = prev_job_info
503 6c2549d6 Guido Trotter
    self.prev_log_serial = prev_log_serial
504 6c2549d6 Guido Trotter
    self.queue = queue
505 6c2549d6 Guido Trotter
    # pylint: disable-msg=W0212
506 6c2549d6 Guido Trotter
    self.job_path = self.queue._GetJobPath(self.job_id)
507 6c2549d6 Guido Trotter
    self.wm = None
508 6c2549d6 Guido Trotter
    self.inotify_handler = None
509 6c2549d6 Guido Trotter
    self.notifier = None
510 6c2549d6 Guido Trotter
511 6c2549d6 Guido Trotter
  def _SetupInotify(self):
512 6c2549d6 Guido Trotter
    """Create the inotify
513 6c2549d6 Guido Trotter

514 6c2549d6 Guido Trotter
    @raises errors.InotifyError: if the notifier cannot be setup
515 6c2549d6 Guido Trotter

516 6c2549d6 Guido Trotter
    """
517 6c2549d6 Guido Trotter
    if self.wm:
518 6c2549d6 Guido Trotter
      return
519 6c2549d6 Guido Trotter
    self.wm = pyinotify.WatchManager()
520 6c2549d6 Guido Trotter
    self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
521 6c2549d6 Guido Trotter
                                                                self.OnInotify,
522 6c2549d6 Guido Trotter
                                                                self.job_path)
523 6c2549d6 Guido Trotter
    self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
524 6c2549d6 Guido Trotter
    self.inotify_handler.enable()
525 6c2549d6 Guido Trotter
526 6c2549d6 Guido Trotter
  def _LoadDiskStatus(self):
527 6c2549d6 Guido Trotter
    job = self.queue.SafeLoadJobFromDisk(self.job_id)
528 6c2549d6 Guido Trotter
    if not job:
529 6c2549d6 Guido Trotter
      raise errors.JobLost()
530 6c2549d6 Guido Trotter
    self.job_status = job.CalcStatus()
531 6c2549d6 Guido Trotter
532 6c2549d6 Guido Trotter
    job_info = job.GetInfo(self.fields)
533 6c2549d6 Guido Trotter
    log_entries = job.GetLogEntries(self.prev_log_serial)
534 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
535 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
536 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
537 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
538 6c2549d6 Guido Trotter
    # significantly different.
539 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
540 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
541 6c2549d6 Guido Trotter
    self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
542 6c2549d6 Guido Trotter
    self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
543 6c2549d6 Guido Trotter
544 6c2549d6 Guido Trotter
  def _CheckForChanges(self):
545 6c2549d6 Guido Trotter
    self._LoadDiskStatus()
546 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
547 6c2549d6 Guido Trotter
    # no changes.
548 6c2549d6 Guido Trotter
    if (self.job_status not in (constants.JOB_STATUS_QUEUED,
549 6c2549d6 Guido Trotter
                                constants.JOB_STATUS_RUNNING,
550 6c2549d6 Guido Trotter
                                constants.JOB_STATUS_WAITLOCK) or
551 6c2549d6 Guido Trotter
        self.prev_job_info != self.job_info or
552 6c2549d6 Guido Trotter
        (self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
553 6c2549d6 Guido Trotter
      logging.debug("Job %s changed", self.job_id)
554 6c2549d6 Guido Trotter
      return (self.job_info, self.log_entries)
555 6c2549d6 Guido Trotter
556 6c2549d6 Guido Trotter
    raise utils.RetryAgain()
557 6c2549d6 Guido Trotter
558 6c2549d6 Guido Trotter
  def OnInotify(self, notifier_enabled):
559 6c2549d6 Guido Trotter
    if not notifier_enabled:
560 6c2549d6 Guido Trotter
      self.inotify_handler.enable()
561 6c2549d6 Guido Trotter
562 6c2549d6 Guido Trotter
  def WaitFn(self, timeout):
563 6c2549d6 Guido Trotter
    self._SetupInotify()
564 6c2549d6 Guido Trotter
    if self.notifier.check_events(timeout*1000):
565 6c2549d6 Guido Trotter
      self.notifier.read_events()
566 6c2549d6 Guido Trotter
    self.notifier.process_events()
567 6c2549d6 Guido Trotter
568 6c2549d6 Guido Trotter
  def WaitForChanges(self, timeout):
569 6c2549d6 Guido Trotter
    try:
570 6c2549d6 Guido Trotter
      return utils.Retry(self._CheckForChanges,
571 6c2549d6 Guido Trotter
                         utils.RETRY_REMAINING_TIME,
572 6c2549d6 Guido Trotter
                         timeout,
573 6c2549d6 Guido Trotter
                         wait_fn=self.WaitFn)
574 6c2549d6 Guido Trotter
    except (errors.InotifyError, errors.JobLost):
575 6c2549d6 Guido Trotter
      return None
576 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
577 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
578 6c2549d6 Guido Trotter
579 6c2549d6 Guido Trotter
  def Close(self):
580 6c2549d6 Guido Trotter
    if self.wm:
581 6c2549d6 Guido Trotter
      self.notifier.stop()
582 6c2549d6 Guido Trotter
583 6c2549d6 Guido Trotter
584 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
585 031a3e57 Michael Hanselmann
  """The actual job workers.
586 031a3e57 Michael Hanselmann

587 031a3e57 Michael Hanselmann
  """
588 7260cfbe Iustin Pop
  def RunTask(self, job): # pylint: disable-msg=W0221
589 e2715f69 Michael Hanselmann
    """Job executor.
590 e2715f69 Michael Hanselmann

591 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
592 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
593 e2715f69 Michael Hanselmann

594 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
595 ea03467c Iustin Pop
    @param job: the job to be processed
596 ea03467c Iustin Pop

597 e2715f69 Michael Hanselmann
    """
598 02fc74da Michael Hanselmann
    logging.info("Processing job %s", job.id)
599 adfa97e3 Guido Trotter
    proc = mcpu.Processor(self.pool.queue.context, job.id)
600 031a3e57 Michael Hanselmann
    queue = job.queue
601 e2715f69 Michael Hanselmann
    try:
602 85f03e0d Michael Hanselmann
      try:
603 85f03e0d Michael Hanselmann
        count = len(job.ops)
604 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
605 d21d09d6 Iustin Pop
          op_summary = op.input.Summary()
606 f6424741 Iustin Pop
          if op.status == constants.OP_STATUS_SUCCESS:
607 f6424741 Iustin Pop
            # this is a job that was partially completed before master
608 f6424741 Iustin Pop
            # daemon shutdown, so it can be expected that some opcodes
609 f6424741 Iustin Pop
            # are already completed successfully (if any did error
610 f6424741 Iustin Pop
            # out, then the whole job should have been aborted and not
611 f6424741 Iustin Pop
            # resubmitted for processing)
612 f6424741 Iustin Pop
            logging.info("Op %s/%s: opcode %s already processed, skipping",
613 f6424741 Iustin Pop
                         idx + 1, count, op_summary)
614 f6424741 Iustin Pop
            continue
615 85f03e0d Michael Hanselmann
          try:
616 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
617 d21d09d6 Iustin Pop
                         op_summary)
618 85f03e0d Michael Hanselmann
619 85f03e0d Michael Hanselmann
            queue.acquire()
620 85f03e0d Michael Hanselmann
            try:
621 df0fb067 Iustin Pop
              if op.status == constants.OP_STATUS_CANCELED:
622 df0fb067 Iustin Pop
                raise CancelJob()
623 fbf0262f Michael Hanselmann
              assert op.status == constants.OP_STATUS_QUEUED
624 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
625 85f03e0d Michael Hanselmann
              op.result = None
626 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
627 c56ec146 Iustin Pop
              if idx == 0: # first opcode
628 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
629 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
630 85f03e0d Michael Hanselmann
631 38206f3c Iustin Pop
              input_opcode = op.input
632 85f03e0d Michael Hanselmann
            finally:
633 85f03e0d Michael Hanselmann
              queue.release()
634 85f03e0d Michael Hanselmann
635 031a3e57 Michael Hanselmann
            # Make sure not to hold queue lock while calling ExecOpCode
636 031a3e57 Michael Hanselmann
            result = proc.ExecOpCode(input_opcode,
637 ef2df7d3 Michael Hanselmann
                                     _OpExecCallbacks(queue, job, op))
638 85f03e0d Michael Hanselmann
639 85f03e0d Michael Hanselmann
            queue.acquire()
640 85f03e0d Michael Hanselmann
            try:
641 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
642 85f03e0d Michael Hanselmann
              op.result = result
643 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
644 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
645 85f03e0d Michael Hanselmann
            finally:
646 85f03e0d Michael Hanselmann
              queue.release()
647 85f03e0d Michael Hanselmann
648 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Successfully finished opcode %s",
649 d21d09d6 Iustin Pop
                         idx + 1, count, op_summary)
650 fbf0262f Michael Hanselmann
          except CancelJob:
651 fbf0262f Michael Hanselmann
            # Will be handled further up
652 fbf0262f Michael Hanselmann
            raise
653 85f03e0d Michael Hanselmann
          except Exception, err:
654 85f03e0d Michael Hanselmann
            queue.acquire()
655 85f03e0d Michael Hanselmann
            try:
656 85f03e0d Michael Hanselmann
              try:
657 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
658 bcb66fca Iustin Pop
                if isinstance(err, errors.GenericError):
659 bcb66fca Iustin Pop
                  op.result = errors.EncodeException(err)
660 bcb66fca Iustin Pop
                else:
661 bcb66fca Iustin Pop
                  op.result = str(err)
662 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
663 0f6be82a Iustin Pop
                logging.info("Op %s/%s: Error in opcode %s: %s",
664 0f6be82a Iustin Pop
                             idx + 1, count, op_summary, err)
665 85f03e0d Michael Hanselmann
              finally:
666 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
667 85f03e0d Michael Hanselmann
            finally:
668 85f03e0d Michael Hanselmann
              queue.release()
669 85f03e0d Michael Hanselmann
            raise
670 85f03e0d Michael Hanselmann
671 fbf0262f Michael Hanselmann
      except CancelJob:
672 fbf0262f Michael Hanselmann
        queue.acquire()
673 fbf0262f Michael Hanselmann
        try:
674 fbf0262f Michael Hanselmann
          queue.CancelJobUnlocked(job)
675 fbf0262f Michael Hanselmann
        finally:
676 fbf0262f Michael Hanselmann
          queue.release()
677 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
678 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
679 85f03e0d Michael Hanselmann
      except:
680 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
681 e2715f69 Michael Hanselmann
    finally:
682 85f03e0d Michael Hanselmann
      queue.acquire()
683 85f03e0d Michael Hanselmann
      try:
684 65548ed5 Michael Hanselmann
        try:
685 ef2df7d3 Michael Hanselmann
          job.lock_status = None
686 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
687 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
688 65548ed5 Michael Hanselmann
        finally:
689 65548ed5 Michael Hanselmann
          job_id = job.id
690 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
691 85f03e0d Michael Hanselmann
      finally:
692 85f03e0d Michael Hanselmann
        queue.release()
693 ef2df7d3 Michael Hanselmann
694 02fc74da Michael Hanselmann
      logging.info("Finished job %s, status = %s", job_id, status)
695 e2715f69 Michael Hanselmann
696 e2715f69 Michael Hanselmann
697 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
698 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
699 ea03467c Iustin Pop

700 ea03467c Iustin Pop
  """
701 5bdce580 Michael Hanselmann
  def __init__(self, queue):
702 89e2b4d2 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
703 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
704 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
705 5bdce580 Michael Hanselmann
    self.queue = queue
706 e2715f69 Michael Hanselmann
707 e2715f69 Michael Hanselmann
708 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
709 6c881c52 Iustin Pop
  """Decorator for "public" functions.
710 ea03467c Iustin Pop

711 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
712 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
713 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
714 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
715 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
716 ea03467c Iustin Pop

717 6c881c52 Iustin Pop
  @warning: Use this decorator only after utils.LockedMethod!
718 f1da30e6 Michael Hanselmann

719 6c881c52 Iustin Pop
  Example::
720 6c881c52 Iustin Pop
    @utils.LockedMethod
721 6c881c52 Iustin Pop
    @_RequireOpenQueue
722 6c881c52 Iustin Pop
    def Example(self):
723 6c881c52 Iustin Pop
      pass
724 db37da70 Michael Hanselmann

725 6c881c52 Iustin Pop
  """
726 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
727 7260cfbe Iustin Pop
    # pylint: disable-msg=W0212
728 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
729 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
730 6c881c52 Iustin Pop
  return wrapper
731 db37da70 Michael Hanselmann
732 db37da70 Michael Hanselmann
733 6c881c52 Iustin Pop
class JobQueue(object):
734 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
735 db37da70 Michael Hanselmann

736 6c881c52 Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
737 6c881c52 Iustin Pop

738 6c881c52 Iustin Pop
  """
739 6c881c52 Iustin Pop
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
740 db37da70 Michael Hanselmann
741 85f03e0d Michael Hanselmann
  def __init__(self, context):
742 ea03467c Iustin Pop
    """Constructor for JobQueue.
743 ea03467c Iustin Pop

744 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
745 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
746 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
747 ea03467c Iustin Pop
    running).
748 ea03467c Iustin Pop

749 ea03467c Iustin Pop
    @type context: GanetiContext
750 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
751 ea03467c Iustin Pop
        data and other ganeti objects
752 ea03467c Iustin Pop

753 ea03467c Iustin Pop
    """
754 5bdce580 Michael Hanselmann
    self.context = context
755 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
756 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
757 f1da30e6 Michael Hanselmann
758 85f03e0d Michael Hanselmann
    # Locking
759 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
760 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
761 85f03e0d Michael Hanselmann
    self.release = self._lock.release
762 85f03e0d Michael Hanselmann
763 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
764 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
765 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
766 f1da30e6 Michael Hanselmann
767 04ab05ce Michael Hanselmann
    # Read serial file
768 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
769 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
770 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
771 c4beba1c Iustin Pop
772 23752136 Michael Hanselmann
    # Get initial list of nodes
773 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
774 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
775 59303563 Iustin Pop
                       if n.master_candidate)
776 8e00939c Michael Hanselmann
777 8e00939c Michael Hanselmann
    # Remove master node
778 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
779 23752136 Michael Hanselmann
780 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
781 23752136 Michael Hanselmann
782 20571a26 Guido Trotter
    self._queue_size = 0
783 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
784 20571a26 Guido Trotter
    self._drained = self._IsQueueMarkedDrain()
785 20571a26 Guido Trotter
786 85f03e0d Michael Hanselmann
    # Setup worker pool
787 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
788 85f03e0d Michael Hanselmann
    try:
789 16714921 Michael Hanselmann
      # We need to lock here because WorkerPool.AddTask() may start a job while
790 16714921 Michael Hanselmann
      # we're still doing our work.
791 16714921 Michael Hanselmann
      self.acquire()
792 16714921 Michael Hanselmann
      try:
793 711b5124 Michael Hanselmann
        logging.info("Inspecting job queue")
794 711b5124 Michael Hanselmann
795 711b5124 Michael Hanselmann
        all_job_ids = self._GetJobIDsUnlocked()
796 b7cb9024 Michael Hanselmann
        jobs_count = len(all_job_ids)
797 711b5124 Michael Hanselmann
        lastinfo = time.time()
798 711b5124 Michael Hanselmann
        for idx, job_id in enumerate(all_job_ids):
799 711b5124 Michael Hanselmann
          # Give an update every 1000 jobs or 10 seconds
800 b7cb9024 Michael Hanselmann
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
801 b7cb9024 Michael Hanselmann
              idx == (jobs_count - 1)):
802 711b5124 Michael Hanselmann
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
803 b7cb9024 Michael Hanselmann
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
804 711b5124 Michael Hanselmann
            lastinfo = time.time()
805 711b5124 Michael Hanselmann
806 711b5124 Michael Hanselmann
          job = self._LoadJobUnlocked(job_id)
807 711b5124 Michael Hanselmann
808 16714921 Michael Hanselmann
          # a failure in loading the job can cause 'None' to be returned
809 16714921 Michael Hanselmann
          if job is None:
810 16714921 Michael Hanselmann
            continue
811 94ed59a5 Iustin Pop
812 16714921 Michael Hanselmann
          status = job.CalcStatus()
813 85f03e0d Michael Hanselmann
814 16714921 Michael Hanselmann
          if status in (constants.JOB_STATUS_QUEUED, ):
815 16714921 Michael Hanselmann
            self._wpool.AddTask(job)
816 85f03e0d Michael Hanselmann
817 16714921 Michael Hanselmann
          elif status in (constants.JOB_STATUS_RUNNING,
818 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK,
819 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_CANCELING):
820 16714921 Michael Hanselmann
            logging.warning("Unfinished job %s found: %s", job.id, job)
821 16714921 Michael Hanselmann
            try:
822 34327f51 Iustin Pop
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
823 34327f51 Iustin Pop
                                    "Unclean master daemon shutdown")
824 16714921 Michael Hanselmann
            finally:
825 16714921 Michael Hanselmann
              self.UpdateJobUnlocked(job)
826 711b5124 Michael Hanselmann
827 711b5124 Michael Hanselmann
        logging.info("Job queue inspection finished")
828 16714921 Michael Hanselmann
      finally:
829 16714921 Michael Hanselmann
        self.release()
830 16714921 Michael Hanselmann
    except:
831 16714921 Michael Hanselmann
      self._wpool.TerminateWorkers()
832 16714921 Michael Hanselmann
      raise
833 85f03e0d Michael Hanselmann
834 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
835 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
836 99aabbed Iustin Pop
  def AddNode(self, node):
837 99aabbed Iustin Pop
    """Register a new node with the queue.
838 99aabbed Iustin Pop

839 99aabbed Iustin Pop
    @type node: L{objects.Node}
840 99aabbed Iustin Pop
    @param node: the node object to be added
841 99aabbed Iustin Pop

842 99aabbed Iustin Pop
    """
843 99aabbed Iustin Pop
    node_name = node.name
844 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
845 23752136 Michael Hanselmann
846 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
847 c8457ce7 Iustin Pop
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
848 3cebe102 Michael Hanselmann
    msg = result.fail_msg
849 c8457ce7 Iustin Pop
    if msg:
850 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
851 c8457ce7 Iustin Pop
                      node_name, msg)
852 23752136 Michael Hanselmann
853 59303563 Iustin Pop
    if not node.master_candidate:
854 59303563 Iustin Pop
      # remove if existing, ignoring errors
855 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
856 59303563 Iustin Pop
      # and skip the replication of the job ids
857 59303563 Iustin Pop
      return
858 59303563 Iustin Pop
859 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
860 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
861 23752136 Michael Hanselmann
862 d2e03a33 Michael Hanselmann
    # Upload current serial file
863 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
864 d2e03a33 Michael Hanselmann
865 d2e03a33 Michael Hanselmann
    for file_name in files:
866 9f774ee8 Michael Hanselmann
      # Read file content
867 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
868 9f774ee8 Michael Hanselmann
869 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
870 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
871 a3811745 Michael Hanselmann
                                                  file_name, content)
872 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
873 c8457ce7 Iustin Pop
      if msg:
874 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
875 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
876 d2e03a33 Michael Hanselmann
877 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
878 d2e03a33 Michael Hanselmann
879 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
880 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
881 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
882 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
883 ea03467c Iustin Pop

884 ea03467c Iustin Pop
    @type node_name: str
885 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
886 ea03467c Iustin Pop

887 ea03467c Iustin Pop
    """
888 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
889 23752136 Michael Hanselmann
890 7e950d31 Iustin Pop
  @staticmethod
891 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
892 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
893 ea03467c Iustin Pop

894 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
895 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
896 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
897 ea03467c Iustin Pop

898 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
899 ea03467c Iustin Pop
    @type nodes: list
900 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
901 ea03467c Iustin Pop
    @type failmsg: str
902 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
903 ea03467c Iustin Pop

904 ea03467c Iustin Pop
    """
905 e74798c1 Michael Hanselmann
    failed = []
906 e74798c1 Michael Hanselmann
    success = []
907 e74798c1 Michael Hanselmann
908 e74798c1 Michael Hanselmann
    for node in nodes:
909 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
910 c8457ce7 Iustin Pop
      if msg:
911 e74798c1 Michael Hanselmann
        failed.append(node)
912 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
913 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
914 c8457ce7 Iustin Pop
      else:
915 c8457ce7 Iustin Pop
        success.append(node)
916 e74798c1 Michael Hanselmann
917 e74798c1 Michael Hanselmann
    # +1 for the master node
918 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
919 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
920 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
921 e74798c1 Michael Hanselmann
922 99aabbed Iustin Pop
  def _GetNodeIp(self):
923 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
924 99aabbed Iustin Pop

925 ea03467c Iustin Pop
    @rtype: (list, list)
926 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
927 ea03467c Iustin Pop
        names and the second one with the node addresses
928 ea03467c Iustin Pop

929 99aabbed Iustin Pop
    """
930 99aabbed Iustin Pop
    name_list = self._nodes.keys()
931 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
932 99aabbed Iustin Pop
    return name_list, addr_list
933 99aabbed Iustin Pop
934 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
935 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
936 8e00939c Michael Hanselmann

937 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
938 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
939 ea03467c Iustin Pop

940 ea03467c Iustin Pop
    @type file_name: str
941 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
942 ea03467c Iustin Pop
    @type data: str
943 ea03467c Iustin Pop
    @param data: the new contents of the file
944 4c36bdf5 Guido Trotter
    @type replicate: boolean
945 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
946 ea03467c Iustin Pop

947 8e00939c Michael Hanselmann
    """
948 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
949 8e00939c Michael Hanselmann
950 4c36bdf5 Guido Trotter
    if replicate:
951 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
952 4c36bdf5 Guido Trotter
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
953 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
954 23752136 Michael Hanselmann
955 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
956 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
957 ea03467c Iustin Pop

958 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
959 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
960 ea03467c Iustin Pop

961 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
962 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
963 ea03467c Iustin Pop

964 ea03467c Iustin Pop
    """
965 dd875d32 Michael Hanselmann
    # Rename them locally
966 d7fd1f28 Michael Hanselmann
    for old, new in rename:
967 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
968 abc1f2ce Michael Hanselmann
969 dd875d32 Michael Hanselmann
    # ... and on all nodes
970 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
971 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
972 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
973 abc1f2ce Michael Hanselmann
974 7e950d31 Iustin Pop
  @staticmethod
975 7e950d31 Iustin Pop
  def _FormatJobID(job_id):
976 ea03467c Iustin Pop
    """Convert a job ID to string format.
977 ea03467c Iustin Pop

978 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
979 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
980 ea03467c Iustin Pop
    abstract this change.
981 ea03467c Iustin Pop

982 ea03467c Iustin Pop
    @type job_id: int or long
983 ea03467c Iustin Pop
    @param job_id: the numeric job id
984 ea03467c Iustin Pop
    @rtype: str
985 ea03467c Iustin Pop
    @return: the formatted job id
986 ea03467c Iustin Pop

987 ea03467c Iustin Pop
    """
988 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
989 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
990 85f03e0d Michael Hanselmann
    if job_id < 0:
991 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
992 85f03e0d Michael Hanselmann
993 85f03e0d Michael Hanselmann
    return str(job_id)
994 85f03e0d Michael Hanselmann
995 58b22b6e Michael Hanselmann
  @classmethod
996 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
997 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
998 58b22b6e Michael Hanselmann

999 58b22b6e Michael Hanselmann
    @type job_id: str
1000 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
1001 58b22b6e Michael Hanselmann
    @rtype: str
1002 58b22b6e Michael Hanselmann
    @return: Directory name
1003 58b22b6e Michael Hanselmann

1004 58b22b6e Michael Hanselmann
    """
1005 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1006 58b22b6e Michael Hanselmann
1007 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1008 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1009 f1da30e6 Michael Hanselmann

1010 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1011 f1da30e6 Michael Hanselmann

1012 009e73d0 Iustin Pop
    @type count: integer
1013 009e73d0 Iustin Pop
    @param count: how many serials to return
1014 ea03467c Iustin Pop
    @rtype: str
1015 ea03467c Iustin Pop
    @return: a string representing the job identifier.
1016 f1da30e6 Michael Hanselmann

1017 f1da30e6 Michael Hanselmann
    """
1018 009e73d0 Iustin Pop
    assert count > 0
1019 f1da30e6 Michael Hanselmann
    # New number
1020 009e73d0 Iustin Pop
    serial = self._last_serial + count
1021 f1da30e6 Michael Hanselmann
1022 f1da30e6 Michael Hanselmann
    # Write to file
1023 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1024 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1025 f1da30e6 Michael Hanselmann
1026 009e73d0 Iustin Pop
    result = [self._FormatJobID(v)
1027 009e73d0 Iustin Pop
              for v in range(self._last_serial, serial + 1)]
1028 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1029 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1030 f1da30e6 Michael Hanselmann
1031 009e73d0 Iustin Pop
    return result
1032 f1da30e6 Michael Hanselmann
1033 85f03e0d Michael Hanselmann
  @staticmethod
1034 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1035 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1036 ea03467c Iustin Pop

1037 ea03467c Iustin Pop
    @type job_id: str
1038 ea03467c Iustin Pop
    @param job_id: the job identifier
1039 ea03467c Iustin Pop
    @rtype: str
1040 ea03467c Iustin Pop
    @return: the path to the job file
1041 ea03467c Iustin Pop

1042 ea03467c Iustin Pop
    """
1043 c4feafe8 Iustin Pop
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1044 f1da30e6 Michael Hanselmann
1045 58b22b6e Michael Hanselmann
  @classmethod
1046 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
1047 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1048 ea03467c Iustin Pop

1049 ea03467c Iustin Pop
    @type job_id: str
1050 ea03467c Iustin Pop
    @param job_id: the job identifier
1051 ea03467c Iustin Pop
    @rtype: str
1052 ea03467c Iustin Pop
    @return: the path to the archived job file
1053 ea03467c Iustin Pop

1054 ea03467c Iustin Pop
    """
1055 0411c011 Iustin Pop
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1056 0411c011 Iustin Pop
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1057 0cb94105 Michael Hanselmann
1058 85a1c57d Guido Trotter
  def _GetJobIDsUnlocked(self, sort=True):
1059 911a495b Iustin Pop
    """Return all known job IDs.
1060 911a495b Iustin Pop

1061 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1062 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1063 ac0930b9 Iustin Pop
    extra IDs).
1064 ac0930b9 Iustin Pop

1065 85a1c57d Guido Trotter
    @type sort: boolean
1066 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1067 ea03467c Iustin Pop
    @rtype: list
1068 ea03467c Iustin Pop
    @return: the list of job IDs
1069 ea03467c Iustin Pop

1070 911a495b Iustin Pop
    """
1071 85a1c57d Guido Trotter
    jlist = []
1072 b5b8309d Guido Trotter
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1073 85a1c57d Guido Trotter
      m = self._RE_JOB_FILE.match(filename)
1074 85a1c57d Guido Trotter
      if m:
1075 85a1c57d Guido Trotter
        jlist.append(m.group(1))
1076 85a1c57d Guido Trotter
    if sort:
1077 85a1c57d Guido Trotter
      jlist = utils.NiceSort(jlist)
1078 f0d874fe Iustin Pop
    return jlist
1079 911a495b Iustin Pop
1080 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1081 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1082 ea03467c Iustin Pop

1083 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1084 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1085 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1086 ea03467c Iustin Pop

1087 ea03467c Iustin Pop
    @param job_id: the job id
1088 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1089 ea03467c Iustin Pop
    @return: either None or the job object
1090 ea03467c Iustin Pop

1091 ea03467c Iustin Pop
    """
1092 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
1093 5685c1a5 Michael Hanselmann
    if job:
1094 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
1095 5685c1a5 Michael Hanselmann
      return job
1096 ac0930b9 Iustin Pop
1097 3d6c5566 Guido Trotter
    try:
1098 3d6c5566 Guido Trotter
      job = self._LoadJobFromDisk(job_id)
1099 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
1100 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
1101 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
1102 3d6c5566 Guido Trotter
      if old_path == new_path:
1103 3d6c5566 Guido Trotter
        # job already archived (future case)
1104 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
1105 3d6c5566 Guido Trotter
      else:
1106 3d6c5566 Guido Trotter
        # non-archived case
1107 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
1108 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
1109 3d6c5566 Guido Trotter
      return None
1110 162c8636 Guido Trotter
1111 162c8636 Guido Trotter
    self._memcache[job_id] = job
1112 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
1113 162c8636 Guido Trotter
    return job
1114 162c8636 Guido Trotter
1115 162c8636 Guido Trotter
  def _LoadJobFromDisk(self, job_id):
1116 162c8636 Guido Trotter
    """Load the given job file from disk.
1117 162c8636 Guido Trotter

1118 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1119 162c8636 Guido Trotter

1120 162c8636 Guido Trotter
    @type job_id: string
1121 162c8636 Guido Trotter
    @param job_id: job identifier
1122 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
1123 162c8636 Guido Trotter
    @return: either None or the job object
1124 162c8636 Guido Trotter

1125 162c8636 Guido Trotter
    """
1126 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
1127 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
1128 f1da30e6 Michael Hanselmann
    try:
1129 13998ef2 Michael Hanselmann
      raw_data = utils.ReadFile(filepath)
1130 162c8636 Guido Trotter
    except EnvironmentError, err:
1131 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
1132 f1da30e6 Michael Hanselmann
        return None
1133 f1da30e6 Michael Hanselmann
      raise
1134 13998ef2 Michael Hanselmann
1135 94ed59a5 Iustin Pop
    try:
1136 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
1137 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
1138 7260cfbe Iustin Pop
    except Exception, err: # pylint: disable-msg=W0703
1139 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
1140 94ed59a5 Iustin Pop
1141 ac0930b9 Iustin Pop
    return job
1142 f1da30e6 Michael Hanselmann
1143 0f9c08dc Guido Trotter
  def SafeLoadJobFromDisk(self, job_id):
1144 0f9c08dc Guido Trotter
    """Load the given job file from disk.
1145 0f9c08dc Guido Trotter

1146 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1147 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
1148 0f9c08dc Guido Trotter
    exception is logged.
1149 0f9c08dc Guido Trotter

1150 0f9c08dc Guido Trotter
    @type job_id: string
1151 0f9c08dc Guido Trotter
    @param job_id: job identifier
1152 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
1153 0f9c08dc Guido Trotter
    @return: either None or the job object
1154 0f9c08dc Guido Trotter

1155 0f9c08dc Guido Trotter
    """
1156 0f9c08dc Guido Trotter
    try:
1157 0f9c08dc Guido Trotter
      return self._LoadJobFromDisk(job_id)
1158 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
1159 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
1160 0f9c08dc Guido Trotter
      return None
1161 0f9c08dc Guido Trotter
1162 686d7433 Iustin Pop
  @staticmethod
1163 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
1164 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
1165 686d7433 Iustin Pop

1166 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
1167 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
1168 686d7433 Iustin Pop

1169 ea03467c Iustin Pop
    @rtype: boolean
1170 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
1171 ea03467c Iustin Pop

1172 686d7433 Iustin Pop
    """
1173 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1174 686d7433 Iustin Pop
1175 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
1176 20571a26 Guido Trotter
    """Update the queue size.
1177 20571a26 Guido Trotter

1178 20571a26 Guido Trotter
    """
1179 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1180 20571a26 Guido Trotter
1181 20571a26 Guido Trotter
  @utils.LockedMethod
1182 20571a26 Guido Trotter
  @_RequireOpenQueue
1183 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
1184 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
1185 3ccafd0e Iustin Pop

1186 ea03467c Iustin Pop
    @type drain_flag: boolean
1187 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
1188 ea03467c Iustin Pop

1189 3ccafd0e Iustin Pop
    """
1190 3ccafd0e Iustin Pop
    if drain_flag:
1191 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1192 3ccafd0e Iustin Pop
    else:
1193 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1194 20571a26 Guido Trotter
1195 20571a26 Guido Trotter
    self._drained = drain_flag
1196 20571a26 Guido Trotter
1197 3ccafd0e Iustin Pop
    return True
1198 3ccafd0e Iustin Pop
1199 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1200 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
1201 85f03e0d Michael Hanselmann
    """Create and store a new job.
1202 f1da30e6 Michael Hanselmann

1203 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
1204 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
1205 c3f0a12f Iustin Pop

1206 009e73d0 Iustin Pop
    @type job_id: job ID
1207 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
1208 c3f0a12f Iustin Pop
    @type ops: list
1209 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
1210 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
1211 7beb1e53 Guido Trotter
    @return: the job object to be queued
1212 7beb1e53 Guido Trotter
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1213 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1214 c3f0a12f Iustin Pop

1215 c3f0a12f Iustin Pop
    """
1216 20571a26 Guido Trotter
    # Ok when sharing the big job queue lock, as the drain file is created when
1217 20571a26 Guido Trotter
    # the lock is exclusive.
1218 20571a26 Guido Trotter
    if self._drained:
1219 2971c913 Iustin Pop
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1220 f87b405e Michael Hanselmann
1221 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1222 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
1223 f87b405e Michael Hanselmann
1224 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
1225 f1da30e6 Michael Hanselmann
1226 f1da30e6 Michael Hanselmann
    # Write to disk
1227 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
1228 f1da30e6 Michael Hanselmann
1229 20571a26 Guido Trotter
    self._queue_size += 1
1230 20571a26 Guido Trotter
1231 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
1232 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
1233 ac0930b9 Iustin Pop
1234 7beb1e53 Guido Trotter
    return job
1235 f1da30e6 Michael Hanselmann
1236 2971c913 Iustin Pop
  @utils.LockedMethod
1237 2971c913 Iustin Pop
  @_RequireOpenQueue
1238 2971c913 Iustin Pop
  def SubmitJob(self, ops):
1239 2971c913 Iustin Pop
    """Create and store a new job.
1240 2971c913 Iustin Pop

1241 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1242 2971c913 Iustin Pop

1243 2971c913 Iustin Pop
    """
1244 009e73d0 Iustin Pop
    job_id = self._NewSerialsUnlocked(1)[0]
1245 7beb1e53 Guido Trotter
    self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
1246 7beb1e53 Guido Trotter
    return job_id
1247 2971c913 Iustin Pop
1248 2971c913 Iustin Pop
  @utils.LockedMethod
1249 2971c913 Iustin Pop
  @_RequireOpenQueue
1250 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
1251 2971c913 Iustin Pop
    """Create and store multiple jobs.
1252 2971c913 Iustin Pop

1253 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1254 2971c913 Iustin Pop

1255 2971c913 Iustin Pop
    """
1256 2971c913 Iustin Pop
    results = []
1257 7beb1e53 Guido Trotter
    tasks = []
1258 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1259 009e73d0 Iustin Pop
    for job_id, ops in zip(all_job_ids, jobs):
1260 2971c913 Iustin Pop
      try:
1261 7beb1e53 Guido Trotter
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1262 2971c913 Iustin Pop
        status = True
1263 7beb1e53 Guido Trotter
        data = job_id
1264 2971c913 Iustin Pop
      except errors.GenericError, err:
1265 2971c913 Iustin Pop
        data = str(err)
1266 2971c913 Iustin Pop
        status = False
1267 2971c913 Iustin Pop
      results.append((status, data))
1268 7beb1e53 Guido Trotter
    self._wpool.AddManyTasks(tasks)
1269 2971c913 Iustin Pop
1270 2971c913 Iustin Pop
    return results
1271 2971c913 Iustin Pop
1272 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1273 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
1274 ea03467c Iustin Pop
    """Update a job's on disk storage.
1275 ea03467c Iustin Pop

1276 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
1277 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
1278 ea03467c Iustin Pop
    nodes.
1279 ea03467c Iustin Pop

1280 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1281 ea03467c Iustin Pop
    @param job: the changed job
1282 4c36bdf5 Guido Trotter
    @type replicate: boolean
1283 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
1284 ea03467c Iustin Pop

1285 ea03467c Iustin Pop
    """
1286 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
1287 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
1288 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
1289 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
1290 ac0930b9 Iustin Pop
1291 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1292 5c735209 Iustin Pop
                        timeout):
1293 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
1294 6c5a7090 Michael Hanselmann

1295 6c5a7090 Michael Hanselmann
    @type job_id: string
1296 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
1297 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1298 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1299 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1300 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1301 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1302 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1303 5c735209 Iustin Pop
    @type timeout: float
1304 5c735209 Iustin Pop
    @param timeout: maximum time to wait
1305 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1306 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1307 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1308 ea03467c Iustin Pop

1309 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1310 ea03467c Iustin Pop
        we instead return a special value,
1311 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1312 ea03467c Iustin Pop
        as such by the clients
1313 6c5a7090 Michael Hanselmann

1314 6c5a7090 Michael Hanselmann
    """
1315 6c2549d6 Guido Trotter
    helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
1316 6c2549d6 Guido Trotter
                                      prev_log_serial, self)
1317 6bcb1446 Michael Hanselmann
    try:
1318 6c2549d6 Guido Trotter
      return helper.WaitForChanges(timeout)
1319 6c2549d6 Guido Trotter
    finally:
1320 6c2549d6 Guido Trotter
      helper.Close()
1321 dfe57c22 Michael Hanselmann
1322 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1323 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1324 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1325 188c5e0a Michael Hanselmann
    """Cancels a job.
1326 188c5e0a Michael Hanselmann

1327 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1328 ea03467c Iustin Pop

1329 188c5e0a Michael Hanselmann
    @type job_id: string
1330 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1331 188c5e0a Michael Hanselmann

1332 188c5e0a Michael Hanselmann
    """
1333 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1334 188c5e0a Michael Hanselmann
1335 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1336 188c5e0a Michael Hanselmann
    if not job:
1337 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1338 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1339 fbf0262f Michael Hanselmann
1340 fbf0262f Michael Hanselmann
    job_status = job.CalcStatus()
1341 188c5e0a Michael Hanselmann
1342 fbf0262f Michael Hanselmann
    if job_status not in (constants.JOB_STATUS_QUEUED,
1343 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK):
1344 a9e97393 Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1345 a9e97393 Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1346 fbf0262f Michael Hanselmann
1347 fbf0262f Michael Hanselmann
    if job_status == constants.JOB_STATUS_QUEUED:
1348 fbf0262f Michael Hanselmann
      self.CancelJobUnlocked(job)
1349 fbf0262f Michael Hanselmann
      return (True, "Job %s canceled" % job.id)
1350 188c5e0a Michael Hanselmann
1351 fbf0262f Michael Hanselmann
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1352 fbf0262f Michael Hanselmann
      # The worker will notice the new status and cancel the job
1353 fbf0262f Michael Hanselmann
      try:
1354 34327f51 Iustin Pop
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1355 fbf0262f Michael Hanselmann
      finally:
1356 fbf0262f Michael Hanselmann
        self.UpdateJobUnlocked(job)
1357 fbf0262f Michael Hanselmann
      return (True, "Job %s will be canceled" % job.id)
1358 fbf0262f Michael Hanselmann
1359 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1360 fbf0262f Michael Hanselmann
  def CancelJobUnlocked(self, job):
1361 fbf0262f Michael Hanselmann
    """Marks a job as canceled.
1362 fbf0262f Michael Hanselmann

1363 fbf0262f Michael Hanselmann
    """
1364 85f03e0d Michael Hanselmann
    try:
1365 34327f51 Iustin Pop
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1366 34327f51 Iustin Pop
                            "Job canceled by request")
1367 85f03e0d Michael Hanselmann
    finally:
1368 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
1369 188c5e0a Michael Hanselmann
1370 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1371 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
1372 d7fd1f28 Michael Hanselmann
    """Archives jobs.
1373 c609f802 Michael Hanselmann

1374 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
1375 25e7b43f Iustin Pop
    @param jobs: Job objects
1376 d7fd1f28 Michael Hanselmann
    @rtype: int
1377 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
1378 c609f802 Michael Hanselmann

1379 c609f802 Michael Hanselmann
    """
1380 d7fd1f28 Michael Hanselmann
    archive_jobs = []
1381 d7fd1f28 Michael Hanselmann
    rename_files = []
1382 d7fd1f28 Michael Hanselmann
    for job in jobs:
1383 d7fd1f28 Michael Hanselmann
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1384 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_SUCCESS,
1385 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_ERROR):
1386 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
1387 d7fd1f28 Michael Hanselmann
        continue
1388 c609f802 Michael Hanselmann
1389 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
1390 c609f802 Michael Hanselmann
1391 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
1392 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
1393 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
1394 c609f802 Michael Hanselmann
1395 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
1396 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
1397 f1da30e6 Michael Hanselmann
1398 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
1399 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
1400 d7fd1f28 Michael Hanselmann
1401 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1402 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
1403 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
1404 20571a26 Guido Trotter
    # archived jobs to fix this.
1405 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1406 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
1407 78d12585 Michael Hanselmann
1408 07cd723a Iustin Pop
  @utils.LockedMethod
1409 07cd723a Iustin Pop
  @_RequireOpenQueue
1410 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1411 07cd723a Iustin Pop
    """Archives a job.
1412 07cd723a Iustin Pop

1413 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1414 ea03467c Iustin Pop

1415 07cd723a Iustin Pop
    @type job_id: string
1416 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1417 78d12585 Michael Hanselmann
    @rtype: bool
1418 78d12585 Michael Hanselmann
    @return: Whether job was archived
1419 07cd723a Iustin Pop

1420 07cd723a Iustin Pop
    """
1421 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
1422 78d12585 Michael Hanselmann
1423 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1424 78d12585 Michael Hanselmann
    if not job:
1425 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1426 78d12585 Michael Hanselmann
      return False
1427 78d12585 Michael Hanselmann
1428 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
1429 07cd723a Iustin Pop
1430 07cd723a Iustin Pop
  @utils.LockedMethod
1431 07cd723a Iustin Pop
  @_RequireOpenQueue
1432 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
1433 07cd723a Iustin Pop
    """Archives all jobs based on age.
1434 07cd723a Iustin Pop

1435 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1436 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1437 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1438 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1439 07cd723a Iustin Pop

1440 07cd723a Iustin Pop
    @type age: int
1441 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1442 07cd723a Iustin Pop

1443 07cd723a Iustin Pop
    """
1444 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1445 07cd723a Iustin Pop
1446 07cd723a Iustin Pop
    now = time.time()
1447 f8ad5591 Michael Hanselmann
    end_time = now + timeout
1448 f8ad5591 Michael Hanselmann
    archived_count = 0
1449 f8ad5591 Michael Hanselmann
    last_touched = 0
1450 f8ad5591 Michael Hanselmann
1451 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
1452 d7fd1f28 Michael Hanselmann
    pending = []
1453 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1454 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
1455 f8ad5591 Michael Hanselmann
1456 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
1457 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
1458 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
1459 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
1460 f8ad5591 Michael Hanselmann
        break
1461 f8ad5591 Michael Hanselmann
1462 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
1463 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1464 f8ad5591 Michael Hanselmann
      if job:
1465 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
1466 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
1467 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
1468 f8ad5591 Michael Hanselmann
          else:
1469 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
1470 07cd723a Iustin Pop
        else:
1471 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
1472 f8ad5591 Michael Hanselmann
1473 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
1474 d7fd1f28 Michael Hanselmann
          pending.append(job)
1475 d7fd1f28 Michael Hanselmann
1476 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
1477 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
1478 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
1479 d7fd1f28 Michael Hanselmann
            pending = []
1480 f8ad5591 Michael Hanselmann
1481 d7fd1f28 Michael Hanselmann
    if pending:
1482 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
1483 07cd723a Iustin Pop
1484 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
1485 07cd723a Iustin Pop
1486 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1487 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1488 e2715f69 Michael Hanselmann

1489 ea03467c Iustin Pop
    @type job_ids: list
1490 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1491 ea03467c Iustin Pop
    @type fields: list
1492 ea03467c Iustin Pop
    @param fields: names of fields to return
1493 ea03467c Iustin Pop
    @rtype: list
1494 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1495 ea03467c Iustin Pop
        the requested fields
1496 e2715f69 Michael Hanselmann

1497 e2715f69 Michael Hanselmann
    """
1498 85f03e0d Michael Hanselmann
    jobs = []
1499 9f7b4967 Guido Trotter
    list_all = False
1500 9f7b4967 Guido Trotter
    if not job_ids:
1501 9f7b4967 Guido Trotter
      # Since files are added to/removed from the queue atomically, there's no
1502 9f7b4967 Guido Trotter
      # risk of getting the job ids in an inconsistent state.
1503 9f7b4967 Guido Trotter
      job_ids = self._GetJobIDsUnlocked()
1504 9f7b4967 Guido Trotter
      list_all = True
1505 e2715f69 Michael Hanselmann
1506 9f7b4967 Guido Trotter
    for job_id in job_ids:
1507 9f7b4967 Guido Trotter
      job = self.SafeLoadJobFromDisk(job_id)
1508 9f7b4967 Guido Trotter
      if job is not None:
1509 6a290889 Guido Trotter
        jobs.append(job.GetInfo(fields))
1510 9f7b4967 Guido Trotter
      elif not list_all:
1511 9f7b4967 Guido Trotter
        jobs.append(None)
1512 e2715f69 Michael Hanselmann
1513 85f03e0d Michael Hanselmann
    return jobs
1514 e2715f69 Michael Hanselmann
1515 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1516 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1517 e2715f69 Michael Hanselmann
  def Shutdown(self):
1518 e2715f69 Michael Hanselmann
    """Stops the job queue.
1519 e2715f69 Michael Hanselmann

1520 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1521 ea03467c Iustin Pop

1522 e2715f69 Michael Hanselmann
    """
1523 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1524 85f03e0d Michael Hanselmann
1525 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
1526 a71f9c7d Guido Trotter
    self._queue_filelock = None