Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 3c0d60d0

History | View | Annotate | Download (45.9 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 f1da30e6 Michael Hanselmann
import errno
35 f1da30e6 Michael Hanselmann
import re
36 f1048938 Iustin Pop
import time
37 5685c1a5 Michael Hanselmann
import weakref
38 498ae1cc Iustin Pop
39 6c2549d6 Guido Trotter
try:
40 6c2549d6 Guido Trotter
  # pylint: disable-msg=E0611
41 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
42 6c2549d6 Guido Trotter
except ImportError:
43 6c2549d6 Guido Trotter
  import pyinotify
44 6c2549d6 Guido Trotter
45 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
46 e2715f69 Michael Hanselmann
from ganeti import constants
47 f1da30e6 Michael Hanselmann
from ganeti import serializer
48 e2715f69 Michael Hanselmann
from ganeti import workerpool
49 99bd4f0a Guido Trotter
from ganeti import locking
50 f1da30e6 Michael Hanselmann
from ganeti import opcodes
51 7a1ecaed Iustin Pop
from ganeti import errors
52 e2715f69 Michael Hanselmann
from ganeti import mcpu
53 7996a135 Iustin Pop
from ganeti import utils
54 04ab05ce Michael Hanselmann
from ganeti import jstore
55 c3f0a12f Iustin Pop
from ganeti import rpc
56 e2715f69 Michael Hanselmann
57 fbf0262f Michael Hanselmann
58 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
59 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
60 e2715f69 Michael Hanselmann
61 99bd4f0a Guido Trotter
# The Big JobQueue lock. As for all B*Lock conversions, it must be acquired in
62 99bd4f0a Guido Trotter
# shared mode to ensure exclusion with legacy code, which acquires it
63 99bd4f0a Guido Trotter
# exclusively. It can not be acquired at all only after concurrency with all
64 99bd4f0a Guido Trotter
# new and legacy code is ensured.
65 99bd4f0a Guido Trotter
_big_jqueue_lock = locking.SharedLock()
66 99bd4f0a Guido Trotter
67 498ae1cc Iustin Pop
68 9728ae5d Iustin Pop
class CancelJob(Exception):
69 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
70 fbf0262f Michael Hanselmann

71 fbf0262f Michael Hanselmann
  """
72 fbf0262f Michael Hanselmann
73 fbf0262f Michael Hanselmann
74 70552c46 Michael Hanselmann
def TimeStampNow():
75 ea03467c Iustin Pop
  """Returns the current timestamp.
76 ea03467c Iustin Pop

77 ea03467c Iustin Pop
  @rtype: tuple
78 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
79 ea03467c Iustin Pop

80 ea03467c Iustin Pop
  """
81 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
82 70552c46 Michael Hanselmann
83 70552c46 Michael Hanselmann
84 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
85 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
86 e2715f69 Michael Hanselmann

87 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
88 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
89 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
90 ea03467c Iustin Pop
  @ivar status: the current status
91 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
92 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
93 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
95 f1048938 Iustin Pop

96 e2715f69 Michael Hanselmann
  """
97 66d895a8 Iustin Pop
  __slots__ = ["input", "status", "result", "log",
98 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
99 66d895a8 Iustin Pop
               "__weakref__"]
100 66d895a8 Iustin Pop
101 85f03e0d Michael Hanselmann
  def __init__(self, op):
102 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
103 ea03467c Iustin Pop

104 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
105 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
106 ea03467c Iustin Pop

107 ea03467c Iustin Pop
    """
108 85f03e0d Michael Hanselmann
    self.input = op
109 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
110 85f03e0d Michael Hanselmann
    self.result = None
111 85f03e0d Michael Hanselmann
    self.log = []
112 70552c46 Michael Hanselmann
    self.start_timestamp = None
113 b9b5abcb Iustin Pop
    self.exec_timestamp = None
114 70552c46 Michael Hanselmann
    self.end_timestamp = None
115 f1da30e6 Michael Hanselmann
116 f1da30e6 Michael Hanselmann
  @classmethod
117 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
118 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
119 ea03467c Iustin Pop

120 ea03467c Iustin Pop
    @type state: dict
121 ea03467c Iustin Pop
    @param state: the serialized state
122 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
123 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
124 ea03467c Iustin Pop

125 ea03467c Iustin Pop
    """
126 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
127 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128 85f03e0d Michael Hanselmann
    obj.status = state["status"]
129 85f03e0d Michael Hanselmann
    obj.result = state["result"]
130 85f03e0d Michael Hanselmann
    obj.log = state["log"]
131 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
132 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
133 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
134 f1da30e6 Michael Hanselmann
    return obj
135 f1da30e6 Michael Hanselmann
136 f1da30e6 Michael Hanselmann
  def Serialize(self):
137 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
138 ea03467c Iustin Pop

139 ea03467c Iustin Pop
    @rtype: dict
140 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
141 ea03467c Iustin Pop

142 ea03467c Iustin Pop
    """
143 6c5a7090 Michael Hanselmann
    return {
144 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
145 6c5a7090 Michael Hanselmann
      "status": self.status,
146 6c5a7090 Michael Hanselmann
      "result": self.result,
147 6c5a7090 Michael Hanselmann
      "log": self.log,
148 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
149 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
150 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
151 6c5a7090 Michael Hanselmann
      }
152 f1048938 Iustin Pop
153 e2715f69 Michael Hanselmann
154 e2715f69 Michael Hanselmann
class _QueuedJob(object):
155 e2715f69 Michael Hanselmann
  """In-memory job representation.
156 e2715f69 Michael Hanselmann

157 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
158 ea03467c Iustin Pop
  be taken care of by users of this class.
159 ea03467c Iustin Pop

160 ea03467c Iustin Pop
  @type queue: L{JobQueue}
161 ea03467c Iustin Pop
  @ivar queue: the parent queue
162 ea03467c Iustin Pop
  @ivar id: the job ID
163 ea03467c Iustin Pop
  @type ops: list
164 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
165 ea03467c Iustin Pop
  @type log_serial: int
166 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
167 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
168 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
169 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
170 ef2df7d3 Michael Hanselmann
  @ivar lock_status: In-memory locking information for debugging
171 e2715f69 Michael Hanselmann

172 e2715f69 Michael Hanselmann
  """
173 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
174 d25c1d6a Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial",
175 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
176 ef2df7d3 Michael Hanselmann
               "lock_status", "change",
177 66d895a8 Iustin Pop
               "__weakref__"]
178 66d895a8 Iustin Pop
179 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
180 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
181 ea03467c Iustin Pop

182 ea03467c Iustin Pop
    @type queue: L{JobQueue}
183 ea03467c Iustin Pop
    @param queue: our parent queue
184 ea03467c Iustin Pop
    @type job_id: job_id
185 ea03467c Iustin Pop
    @param job_id: our job id
186 ea03467c Iustin Pop
    @type ops: list
187 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
188 ea03467c Iustin Pop
        in _QueuedOpCodes
189 ea03467c Iustin Pop

190 ea03467c Iustin Pop
    """
191 e2715f69 Michael Hanselmann
    if not ops:
192 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
193 e2715f69 Michael Hanselmann
194 85f03e0d Michael Hanselmann
    self.queue = queue
195 f1da30e6 Michael Hanselmann
    self.id = job_id
196 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
197 6c5a7090 Michael Hanselmann
    self.log_serial = 0
198 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
199 c56ec146 Iustin Pop
    self.start_timestamp = None
200 c56ec146 Iustin Pop
    self.end_timestamp = None
201 6c5a7090 Michael Hanselmann
202 ef2df7d3 Michael Hanselmann
    # In-memory attributes
203 ef2df7d3 Michael Hanselmann
    self.lock_status = None
204 ef2df7d3 Michael Hanselmann
205 9fa2e150 Michael Hanselmann
  def __repr__(self):
206 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
207 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
208 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
209 9fa2e150 Michael Hanselmann
210 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
211 9fa2e150 Michael Hanselmann
212 f1da30e6 Michael Hanselmann
  @classmethod
213 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
214 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
215 ea03467c Iustin Pop

216 ea03467c Iustin Pop
    @type queue: L{JobQueue}
217 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
218 ea03467c Iustin Pop
    @type state: dict
219 ea03467c Iustin Pop
    @param state: the serialized state
220 ea03467c Iustin Pop
    @rtype: _JobQueue
221 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
222 ea03467c Iustin Pop

223 ea03467c Iustin Pop
    """
224 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
225 85f03e0d Michael Hanselmann
    obj.queue = queue
226 85f03e0d Michael Hanselmann
    obj.id = state["id"]
227 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
228 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
229 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
230 6c5a7090 Michael Hanselmann
231 ef2df7d3 Michael Hanselmann
    # In-memory attributes
232 ef2df7d3 Michael Hanselmann
    obj.lock_status = None
233 ef2df7d3 Michael Hanselmann
234 6c5a7090 Michael Hanselmann
    obj.ops = []
235 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
236 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
237 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
238 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
239 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
240 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
241 6c5a7090 Michael Hanselmann
242 f1da30e6 Michael Hanselmann
    return obj
243 f1da30e6 Michael Hanselmann
244 f1da30e6 Michael Hanselmann
  def Serialize(self):
245 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
246 ea03467c Iustin Pop

247 ea03467c Iustin Pop
    @rtype: dict
248 ea03467c Iustin Pop
    @return: the serialized state
249 ea03467c Iustin Pop

250 ea03467c Iustin Pop
    """
251 f1da30e6 Michael Hanselmann
    return {
252 f1da30e6 Michael Hanselmann
      "id": self.id,
253 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
254 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
255 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
256 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
257 f1da30e6 Michael Hanselmann
      }
258 f1da30e6 Michael Hanselmann
259 85f03e0d Michael Hanselmann
  def CalcStatus(self):
260 ea03467c Iustin Pop
    """Compute the status of this job.
261 ea03467c Iustin Pop

262 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
263 ea03467c Iustin Pop
    based on their status, computes the job status.
264 ea03467c Iustin Pop

265 ea03467c Iustin Pop
    The algorithm is:
266 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
267 ea03467c Iustin Pop
        status will be the same
268 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
269 ea03467c Iustin Pop
          - waitlock
270 fbf0262f Michael Hanselmann
          - canceling
271 ea03467c Iustin Pop
          - running
272 ea03467c Iustin Pop

273 ea03467c Iustin Pop
        will determine the job status
274 ea03467c Iustin Pop

275 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
276 ea03467c Iustin Pop
        and the job status will be the same
277 ea03467c Iustin Pop

278 ea03467c Iustin Pop
    @return: the job status
279 ea03467c Iustin Pop

280 ea03467c Iustin Pop
    """
281 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
282 e2715f69 Michael Hanselmann
283 e2715f69 Michael Hanselmann
    all_success = True
284 85f03e0d Michael Hanselmann
    for op in self.ops:
285 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
286 e2715f69 Michael Hanselmann
        continue
287 e2715f69 Michael Hanselmann
288 e2715f69 Michael Hanselmann
      all_success = False
289 e2715f69 Michael Hanselmann
290 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
291 e2715f69 Michael Hanselmann
        pass
292 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
293 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
294 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
295 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
296 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
297 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
298 fbf0262f Michael Hanselmann
        break
299 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
300 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
301 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
302 f1da30e6 Michael Hanselmann
        break
303 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
304 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
305 4cb1d919 Michael Hanselmann
        break
306 e2715f69 Michael Hanselmann
307 e2715f69 Michael Hanselmann
    if all_success:
308 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
309 e2715f69 Michael Hanselmann
310 e2715f69 Michael Hanselmann
    return status
311 e2715f69 Michael Hanselmann
312 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
313 ea03467c Iustin Pop
    """Selectively returns the log entries.
314 ea03467c Iustin Pop

315 ea03467c Iustin Pop
    @type newer_than: None or int
316 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
317 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
318 ea03467c Iustin Pop
        than this value
319 ea03467c Iustin Pop
    @rtype: list
320 ea03467c Iustin Pop
    @return: the list of the log entries selected
321 ea03467c Iustin Pop

322 ea03467c Iustin Pop
    """
323 6c5a7090 Michael Hanselmann
    if newer_than is None:
324 6c5a7090 Michael Hanselmann
      serial = -1
325 6c5a7090 Michael Hanselmann
    else:
326 6c5a7090 Michael Hanselmann
      serial = newer_than
327 6c5a7090 Michael Hanselmann
328 6c5a7090 Michael Hanselmann
    entries = []
329 6c5a7090 Michael Hanselmann
    for op in self.ops:
330 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
331 6c5a7090 Michael Hanselmann
332 6c5a7090 Michael Hanselmann
    return entries
333 6c5a7090 Michael Hanselmann
334 6a290889 Guido Trotter
  def GetInfo(self, fields):
335 6a290889 Guido Trotter
    """Returns information about a job.
336 6a290889 Guido Trotter

337 6a290889 Guido Trotter
    @type fields: list
338 6a290889 Guido Trotter
    @param fields: names of fields to return
339 6a290889 Guido Trotter
    @rtype: list
340 6a290889 Guido Trotter
    @return: list with one element for each field
341 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
342 6a290889 Guido Trotter
        has been passed
343 6a290889 Guido Trotter

344 6a290889 Guido Trotter
    """
345 6a290889 Guido Trotter
    row = []
346 6a290889 Guido Trotter
    for fname in fields:
347 6a290889 Guido Trotter
      if fname == "id":
348 6a290889 Guido Trotter
        row.append(self.id)
349 6a290889 Guido Trotter
      elif fname == "status":
350 6a290889 Guido Trotter
        row.append(self.CalcStatus())
351 6a290889 Guido Trotter
      elif fname == "ops":
352 6a290889 Guido Trotter
        row.append([op.input.__getstate__() for op in self.ops])
353 6a290889 Guido Trotter
      elif fname == "opresult":
354 6a290889 Guido Trotter
        row.append([op.result for op in self.ops])
355 6a290889 Guido Trotter
      elif fname == "opstatus":
356 6a290889 Guido Trotter
        row.append([op.status for op in self.ops])
357 6a290889 Guido Trotter
      elif fname == "oplog":
358 6a290889 Guido Trotter
        row.append([op.log for op in self.ops])
359 6a290889 Guido Trotter
      elif fname == "opstart":
360 6a290889 Guido Trotter
        row.append([op.start_timestamp for op in self.ops])
361 6a290889 Guido Trotter
      elif fname == "opexec":
362 6a290889 Guido Trotter
        row.append([op.exec_timestamp for op in self.ops])
363 6a290889 Guido Trotter
      elif fname == "opend":
364 6a290889 Guido Trotter
        row.append([op.end_timestamp for op in self.ops])
365 6a290889 Guido Trotter
      elif fname == "received_ts":
366 6a290889 Guido Trotter
        row.append(self.received_timestamp)
367 6a290889 Guido Trotter
      elif fname == "start_ts":
368 6a290889 Guido Trotter
        row.append(self.start_timestamp)
369 6a290889 Guido Trotter
      elif fname == "end_ts":
370 6a290889 Guido Trotter
        row.append(self.end_timestamp)
371 6a290889 Guido Trotter
      elif fname == "lock_status":
372 6a290889 Guido Trotter
        row.append(self.lock_status)
373 6a290889 Guido Trotter
      elif fname == "summary":
374 6a290889 Guido Trotter
        row.append([op.input.Summary() for op in self.ops])
375 6a290889 Guido Trotter
      else:
376 6a290889 Guido Trotter
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
377 6a290889 Guido Trotter
    return row
378 6a290889 Guido Trotter
379 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
380 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
381 34327f51 Iustin Pop

382 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
383 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
384 34327f51 Iustin Pop
    finalised are not changed.
385 34327f51 Iustin Pop

386 34327f51 Iustin Pop
    @param status: a given opcode status
387 34327f51 Iustin Pop
    @param result: the opcode result
388 34327f51 Iustin Pop

389 34327f51 Iustin Pop
    """
390 39ed3a98 Guido Trotter
    try:
391 39ed3a98 Guido Trotter
      not_marked = True
392 39ed3a98 Guido Trotter
      for op in self.ops:
393 39ed3a98 Guido Trotter
        if op.status in constants.OPS_FINALIZED:
394 39ed3a98 Guido Trotter
          assert not_marked, "Finalized opcodes found after non-finalized ones"
395 39ed3a98 Guido Trotter
          continue
396 39ed3a98 Guido Trotter
        op.status = status
397 39ed3a98 Guido Trotter
        op.result = result
398 39ed3a98 Guido Trotter
        not_marked = False
399 39ed3a98 Guido Trotter
    finally:
400 39ed3a98 Guido Trotter
      self.queue.UpdateJobUnlocked(self)
401 34327f51 Iustin Pop
402 f1048938 Iustin Pop
403 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
404 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
405 031a3e57 Michael Hanselmann
    """Initializes this class.
406 ea03467c Iustin Pop

407 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
408 031a3e57 Michael Hanselmann
    @param queue: Job queue
409 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
410 031a3e57 Michael Hanselmann
    @param job: Job object
411 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
412 031a3e57 Michael Hanselmann
    @param op: OpCode
413 031a3e57 Michael Hanselmann

414 031a3e57 Michael Hanselmann
    """
415 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
416 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
417 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
418 031a3e57 Michael Hanselmann
419 031a3e57 Michael Hanselmann
    self._queue = queue
420 031a3e57 Michael Hanselmann
    self._job = job
421 031a3e57 Michael Hanselmann
    self._op = op
422 031a3e57 Michael Hanselmann
423 031a3e57 Michael Hanselmann
  def NotifyStart(self):
424 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
425 e92376d7 Iustin Pop

426 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
427 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
428 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
429 031a3e57 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
430 e92376d7 Iustin Pop

431 e92376d7 Iustin Pop
    """
432 3c0d60d0 Guido Trotter
    self._queue.acquire(shared=1)
433 e92376d7 Iustin Pop
    try:
434 031a3e57 Michael Hanselmann
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
435 031a3e57 Michael Hanselmann
                                 constants.OP_STATUS_CANCELING)
436 fbf0262f Michael Hanselmann
437 ef2df7d3 Michael Hanselmann
      # All locks are acquired by now
438 ef2df7d3 Michael Hanselmann
      self._job.lock_status = None
439 ef2df7d3 Michael Hanselmann
440 fbf0262f Michael Hanselmann
      # Cancel here if we were asked to
441 031a3e57 Michael Hanselmann
      if self._op.status == constants.OP_STATUS_CANCELING:
442 fbf0262f Michael Hanselmann
        raise CancelJob()
443 fbf0262f Michael Hanselmann
444 031a3e57 Michael Hanselmann
      self._op.status = constants.OP_STATUS_RUNNING
445 b9b5abcb Iustin Pop
      self._op.exec_timestamp = TimeStampNow()
446 e92376d7 Iustin Pop
    finally:
447 031a3e57 Michael Hanselmann
      self._queue.release()
448 031a3e57 Michael Hanselmann
449 3c0d60d0 Guido Trotter
  @locking.ssynchronized(_big_jqueue_lock, shared=1)
450 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
451 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
452 9bf5e01f Guido Trotter

453 9bf5e01f Guido Trotter
    """
454 9bf5e01f Guido Trotter
    self._job.log_serial += 1
455 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
456 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
457 9bf5e01f Guido Trotter
458 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
459 031a3e57 Michael Hanselmann
    """Append a log entry.
460 031a3e57 Michael Hanselmann

461 031a3e57 Michael Hanselmann
    """
462 031a3e57 Michael Hanselmann
    assert len(args) < 3
463 031a3e57 Michael Hanselmann
464 031a3e57 Michael Hanselmann
    if len(args) == 1:
465 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
466 031a3e57 Michael Hanselmann
      log_msg = args[0]
467 031a3e57 Michael Hanselmann
    else:
468 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
469 031a3e57 Michael Hanselmann
470 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
471 031a3e57 Michael Hanselmann
    # precision.
472 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
473 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
474 031a3e57 Michael Hanselmann
475 ef2df7d3 Michael Hanselmann
  def ReportLocks(self, msg):
476 ef2df7d3 Michael Hanselmann
    """Write locking information to the job.
477 ef2df7d3 Michael Hanselmann

478 ef2df7d3 Michael Hanselmann
    Called whenever the LU processor is waiting for a lock or has acquired one.
479 ef2df7d3 Michael Hanselmann

480 ef2df7d3 Michael Hanselmann
    """
481 ef2df7d3 Michael Hanselmann
    # Not getting the queue lock because this is a single assignment
482 ef2df7d3 Michael Hanselmann
    self._job.lock_status = msg
483 ef2df7d3 Michael Hanselmann
484 031a3e57 Michael Hanselmann
485 6c2549d6 Guido Trotter
class _WaitForJobChangesHelper(object):
486 6c2549d6 Guido Trotter
  """Helper class using initofy to wait for changes in a job file.
487 6c2549d6 Guido Trotter

488 6c2549d6 Guido Trotter
  This class takes a previous job status and serial, and alerts the client when
489 6c2549d6 Guido Trotter
  the current job status has changed.
490 6c2549d6 Guido Trotter

491 6c2549d6 Guido Trotter
  @type job_id: string
492 6c2549d6 Guido Trotter
  @ivar job_id: id of the job we're watching
493 6c2549d6 Guido Trotter
  @type prev_job_info: string
494 6c2549d6 Guido Trotter
  @ivar prev_job_info: previous job info, as passed by the luxi client
495 6c2549d6 Guido Trotter
  @type prev_log_serial: string
496 6c2549d6 Guido Trotter
  @ivar prev_log_serial: previous job serial, as passed by the luxi client
497 6c2549d6 Guido Trotter
  @type queue: L{JobQueue}
498 6c2549d6 Guido Trotter
  @ivar queue: job queue (used for a few utility functions)
499 6c2549d6 Guido Trotter
  @type job_path: string
500 6c2549d6 Guido Trotter
  @ivar job_path: absolute path of the job file
501 6c2549d6 Guido Trotter
  @type wm: pyinotify.WatchManager (or None)
502 6c2549d6 Guido Trotter
  @ivar wm: inotify watch manager to watch for changes
503 6c2549d6 Guido Trotter
  @type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
504 6c2549d6 Guido Trotter
  @ivar inotify_handler: single file event handler, used for watching
505 6c2549d6 Guido Trotter
  @type notifier: pyinotify.Notifier
506 6c2549d6 Guido Trotter
  @ivar notifier: inotify single-threaded notifier, used for watching
507 6c2549d6 Guido Trotter

508 6c2549d6 Guido Trotter
  """
509 6c2549d6 Guido Trotter
  def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
510 6c2549d6 Guido Trotter
    self.job_id = job_id
511 6c2549d6 Guido Trotter
    self.fields = fields
512 6c2549d6 Guido Trotter
    self.prev_job_info = prev_job_info
513 6c2549d6 Guido Trotter
    self.prev_log_serial = prev_log_serial
514 6c2549d6 Guido Trotter
    self.queue = queue
515 6c2549d6 Guido Trotter
    # pylint: disable-msg=W0212
516 6c2549d6 Guido Trotter
    self.job_path = self.queue._GetJobPath(self.job_id)
517 6c2549d6 Guido Trotter
    self.wm = None
518 6c2549d6 Guido Trotter
    self.inotify_handler = None
519 6c2549d6 Guido Trotter
    self.notifier = None
520 6c2549d6 Guido Trotter
521 6c2549d6 Guido Trotter
  def _SetupInotify(self):
522 6c2549d6 Guido Trotter
    """Create the inotify
523 6c2549d6 Guido Trotter

524 6c2549d6 Guido Trotter
    @raises errors.InotifyError: if the notifier cannot be setup
525 6c2549d6 Guido Trotter

526 6c2549d6 Guido Trotter
    """
527 6c2549d6 Guido Trotter
    if self.wm:
528 6c2549d6 Guido Trotter
      return
529 6c2549d6 Guido Trotter
    self.wm = pyinotify.WatchManager()
530 6c2549d6 Guido Trotter
    self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
531 6c2549d6 Guido Trotter
                                                                self.OnInotify,
532 6c2549d6 Guido Trotter
                                                                self.job_path)
533 6c2549d6 Guido Trotter
    self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
534 6c2549d6 Guido Trotter
    self.inotify_handler.enable()
535 6c2549d6 Guido Trotter
536 6c2549d6 Guido Trotter
  def _LoadDiskStatus(self):
537 6c2549d6 Guido Trotter
    job = self.queue.SafeLoadJobFromDisk(self.job_id)
538 6c2549d6 Guido Trotter
    if not job:
539 6c2549d6 Guido Trotter
      raise errors.JobLost()
540 6c2549d6 Guido Trotter
    self.job_status = job.CalcStatus()
541 6c2549d6 Guido Trotter
542 6c2549d6 Guido Trotter
    job_info = job.GetInfo(self.fields)
543 6c2549d6 Guido Trotter
    log_entries = job.GetLogEntries(self.prev_log_serial)
544 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
545 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
546 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
547 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
548 6c2549d6 Guido Trotter
    # significantly different.
549 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
550 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
551 6c2549d6 Guido Trotter
    self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
552 6c2549d6 Guido Trotter
    self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
553 6c2549d6 Guido Trotter
554 6c2549d6 Guido Trotter
  def _CheckForChanges(self):
555 6c2549d6 Guido Trotter
    self._LoadDiskStatus()
556 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
557 6c2549d6 Guido Trotter
    # no changes.
558 6c2549d6 Guido Trotter
    if (self.job_status not in (constants.JOB_STATUS_QUEUED,
559 6c2549d6 Guido Trotter
                                constants.JOB_STATUS_RUNNING,
560 6c2549d6 Guido Trotter
                                constants.JOB_STATUS_WAITLOCK) or
561 6c2549d6 Guido Trotter
        self.prev_job_info != self.job_info or
562 6c2549d6 Guido Trotter
        (self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
563 6c2549d6 Guido Trotter
      logging.debug("Job %s changed", self.job_id)
564 6c2549d6 Guido Trotter
      return (self.job_info, self.log_entries)
565 6c2549d6 Guido Trotter
566 6c2549d6 Guido Trotter
    raise utils.RetryAgain()
567 6c2549d6 Guido Trotter
568 6c2549d6 Guido Trotter
  def OnInotify(self, notifier_enabled):
569 6c2549d6 Guido Trotter
    if not notifier_enabled:
570 6c2549d6 Guido Trotter
      self.inotify_handler.enable()
571 6c2549d6 Guido Trotter
572 6c2549d6 Guido Trotter
  def WaitFn(self, timeout):
573 6c2549d6 Guido Trotter
    self._SetupInotify()
574 6c2549d6 Guido Trotter
    if self.notifier.check_events(timeout*1000):
575 6c2549d6 Guido Trotter
      self.notifier.read_events()
576 6c2549d6 Guido Trotter
    self.notifier.process_events()
577 6c2549d6 Guido Trotter
578 6c2549d6 Guido Trotter
  def WaitForChanges(self, timeout):
579 6c2549d6 Guido Trotter
    try:
580 6c2549d6 Guido Trotter
      return utils.Retry(self._CheckForChanges,
581 6c2549d6 Guido Trotter
                         utils.RETRY_REMAINING_TIME,
582 6c2549d6 Guido Trotter
                         timeout,
583 6c2549d6 Guido Trotter
                         wait_fn=self.WaitFn)
584 6c2549d6 Guido Trotter
    except (errors.InotifyError, errors.JobLost):
585 6c2549d6 Guido Trotter
      return None
586 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
587 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
588 6c2549d6 Guido Trotter
589 6c2549d6 Guido Trotter
  def Close(self):
590 6c2549d6 Guido Trotter
    if self.wm:
591 6c2549d6 Guido Trotter
      self.notifier.stop()
592 6c2549d6 Guido Trotter
593 6c2549d6 Guido Trotter
594 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
595 031a3e57 Michael Hanselmann
  """The actual job workers.
596 031a3e57 Michael Hanselmann

597 031a3e57 Michael Hanselmann
  """
598 7260cfbe Iustin Pop
  def RunTask(self, job): # pylint: disable-msg=W0221
599 e2715f69 Michael Hanselmann
    """Job executor.
600 e2715f69 Michael Hanselmann

601 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
602 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
603 e2715f69 Michael Hanselmann

604 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
605 ea03467c Iustin Pop
    @param job: the job to be processed
606 ea03467c Iustin Pop

607 e2715f69 Michael Hanselmann
    """
608 02fc74da Michael Hanselmann
    logging.info("Processing job %s", job.id)
609 adfa97e3 Guido Trotter
    proc = mcpu.Processor(self.pool.queue.context, job.id)
610 031a3e57 Michael Hanselmann
    queue = job.queue
611 e2715f69 Michael Hanselmann
    try:
612 85f03e0d Michael Hanselmann
      try:
613 85f03e0d Michael Hanselmann
        count = len(job.ops)
614 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
615 d21d09d6 Iustin Pop
          op_summary = op.input.Summary()
616 f6424741 Iustin Pop
          if op.status == constants.OP_STATUS_SUCCESS:
617 f6424741 Iustin Pop
            # this is a job that was partially completed before master
618 f6424741 Iustin Pop
            # daemon shutdown, so it can be expected that some opcodes
619 f6424741 Iustin Pop
            # are already completed successfully (if any did error
620 f6424741 Iustin Pop
            # out, then the whole job should have been aborted and not
621 f6424741 Iustin Pop
            # resubmitted for processing)
622 f6424741 Iustin Pop
            logging.info("Op %s/%s: opcode %s already processed, skipping",
623 f6424741 Iustin Pop
                         idx + 1, count, op_summary)
624 f6424741 Iustin Pop
            continue
625 85f03e0d Michael Hanselmann
          try:
626 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
627 d21d09d6 Iustin Pop
                         op_summary)
628 85f03e0d Michael Hanselmann
629 3c0d60d0 Guido Trotter
            queue.acquire(shared=1)
630 85f03e0d Michael Hanselmann
            try:
631 df0fb067 Iustin Pop
              if op.status == constants.OP_STATUS_CANCELED:
632 df0fb067 Iustin Pop
                raise CancelJob()
633 fbf0262f Michael Hanselmann
              assert op.status == constants.OP_STATUS_QUEUED
634 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
635 85f03e0d Michael Hanselmann
              op.result = None
636 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
637 c56ec146 Iustin Pop
              if idx == 0: # first opcode
638 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
639 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
640 85f03e0d Michael Hanselmann
641 38206f3c Iustin Pop
              input_opcode = op.input
642 85f03e0d Michael Hanselmann
            finally:
643 85f03e0d Michael Hanselmann
              queue.release()
644 85f03e0d Michael Hanselmann
645 031a3e57 Michael Hanselmann
            # Make sure not to hold queue lock while calling ExecOpCode
646 031a3e57 Michael Hanselmann
            result = proc.ExecOpCode(input_opcode,
647 ef2df7d3 Michael Hanselmann
                                     _OpExecCallbacks(queue, job, op))
648 85f03e0d Michael Hanselmann
649 3c0d60d0 Guido Trotter
            queue.acquire(shared=1)
650 85f03e0d Michael Hanselmann
            try:
651 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
652 85f03e0d Michael Hanselmann
              op.result = result
653 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
654 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
655 85f03e0d Michael Hanselmann
            finally:
656 85f03e0d Michael Hanselmann
              queue.release()
657 85f03e0d Michael Hanselmann
658 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Successfully finished opcode %s",
659 d21d09d6 Iustin Pop
                         idx + 1, count, op_summary)
660 fbf0262f Michael Hanselmann
          except CancelJob:
661 fbf0262f Michael Hanselmann
            # Will be handled further up
662 fbf0262f Michael Hanselmann
            raise
663 85f03e0d Michael Hanselmann
          except Exception, err:
664 3c0d60d0 Guido Trotter
            queue.acquire(shared=1)
665 85f03e0d Michael Hanselmann
            try:
666 85f03e0d Michael Hanselmann
              try:
667 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
668 bcb66fca Iustin Pop
                if isinstance(err, errors.GenericError):
669 bcb66fca Iustin Pop
                  op.result = errors.EncodeException(err)
670 bcb66fca Iustin Pop
                else:
671 bcb66fca Iustin Pop
                  op.result = str(err)
672 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
673 0f6be82a Iustin Pop
                logging.info("Op %s/%s: Error in opcode %s: %s",
674 0f6be82a Iustin Pop
                             idx + 1, count, op_summary, err)
675 85f03e0d Michael Hanselmann
              finally:
676 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
677 85f03e0d Michael Hanselmann
            finally:
678 85f03e0d Michael Hanselmann
              queue.release()
679 85f03e0d Michael Hanselmann
            raise
680 85f03e0d Michael Hanselmann
681 fbf0262f Michael Hanselmann
      except CancelJob:
682 3c0d60d0 Guido Trotter
        queue.acquire(shared=1)
683 fbf0262f Michael Hanselmann
        try:
684 39ed3a98 Guido Trotter
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
685 39ed3a98 Guido Trotter
                                "Job canceled by request")
686 fbf0262f Michael Hanselmann
        finally:
687 fbf0262f Michael Hanselmann
          queue.release()
688 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
689 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
690 85f03e0d Michael Hanselmann
      except:
691 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
692 e2715f69 Michael Hanselmann
    finally:
693 3c0d60d0 Guido Trotter
      queue.acquire(shared=1)
694 85f03e0d Michael Hanselmann
      try:
695 65548ed5 Michael Hanselmann
        try:
696 ef2df7d3 Michael Hanselmann
          job.lock_status = None
697 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
698 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
699 65548ed5 Michael Hanselmann
        finally:
700 65548ed5 Michael Hanselmann
          job_id = job.id
701 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
702 85f03e0d Michael Hanselmann
      finally:
703 85f03e0d Michael Hanselmann
        queue.release()
704 ef2df7d3 Michael Hanselmann
705 02fc74da Michael Hanselmann
      logging.info("Finished job %s, status = %s", job_id, status)
706 e2715f69 Michael Hanselmann
707 e2715f69 Michael Hanselmann
708 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
709 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
710 ea03467c Iustin Pop

711 ea03467c Iustin Pop
  """
712 5bdce580 Michael Hanselmann
  def __init__(self, queue):
713 89e2b4d2 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
714 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
715 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
716 5bdce580 Michael Hanselmann
    self.queue = queue
717 e2715f69 Michael Hanselmann
718 e2715f69 Michael Hanselmann
719 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
720 6c881c52 Iustin Pop
  """Decorator for "public" functions.
721 ea03467c Iustin Pop

722 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
723 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
724 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
725 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
726 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
727 ea03467c Iustin Pop

728 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
729 f1da30e6 Michael Hanselmann

730 6c881c52 Iustin Pop
  Example::
731 99bd4f0a Guido Trotter
    @locking.ssynchronized(_big_jqueue_lock)
732 6c881c52 Iustin Pop
    @_RequireOpenQueue
733 6c881c52 Iustin Pop
    def Example(self):
734 6c881c52 Iustin Pop
      pass
735 db37da70 Michael Hanselmann

736 6c881c52 Iustin Pop
  """
737 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
738 7260cfbe Iustin Pop
    # pylint: disable-msg=W0212
739 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
740 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
741 6c881c52 Iustin Pop
  return wrapper
742 db37da70 Michael Hanselmann
743 db37da70 Michael Hanselmann
744 6c881c52 Iustin Pop
class JobQueue(object):
745 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
746 db37da70 Michael Hanselmann

747 6c881c52 Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
748 6c881c52 Iustin Pop

749 6c881c52 Iustin Pop
  """
750 6c881c52 Iustin Pop
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
751 db37da70 Michael Hanselmann
752 85f03e0d Michael Hanselmann
  def __init__(self, context):
753 ea03467c Iustin Pop
    """Constructor for JobQueue.
754 ea03467c Iustin Pop

755 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
756 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
757 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
758 ea03467c Iustin Pop
    running).
759 ea03467c Iustin Pop

760 ea03467c Iustin Pop
    @type context: GanetiContext
761 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
762 ea03467c Iustin Pop
        data and other ganeti objects
763 ea03467c Iustin Pop

764 ea03467c Iustin Pop
    """
765 5bdce580 Michael Hanselmann
    self.context = context
766 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
767 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
768 f1da30e6 Michael Hanselmann
769 99bd4f0a Guido Trotter
    self.acquire = _big_jqueue_lock.acquire
770 99bd4f0a Guido Trotter
    self.release = _big_jqueue_lock.release
771 85f03e0d Michael Hanselmann
772 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
773 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
774 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
775 f1da30e6 Michael Hanselmann
776 04ab05ce Michael Hanselmann
    # Read serial file
777 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
778 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
779 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
780 c4beba1c Iustin Pop
781 23752136 Michael Hanselmann
    # Get initial list of nodes
782 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
783 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
784 59303563 Iustin Pop
                       if n.master_candidate)
785 8e00939c Michael Hanselmann
786 8e00939c Michael Hanselmann
    # Remove master node
787 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
788 23752136 Michael Hanselmann
789 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
790 23752136 Michael Hanselmann
791 20571a26 Guido Trotter
    self._queue_size = 0
792 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
793 20571a26 Guido Trotter
    self._drained = self._IsQueueMarkedDrain()
794 20571a26 Guido Trotter
795 85f03e0d Michael Hanselmann
    # Setup worker pool
796 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
797 85f03e0d Michael Hanselmann
    try:
798 16714921 Michael Hanselmann
      # We need to lock here because WorkerPool.AddTask() may start a job while
799 16714921 Michael Hanselmann
      # we're still doing our work.
800 16714921 Michael Hanselmann
      self.acquire()
801 16714921 Michael Hanselmann
      try:
802 711b5124 Michael Hanselmann
        logging.info("Inspecting job queue")
803 711b5124 Michael Hanselmann
804 711b5124 Michael Hanselmann
        all_job_ids = self._GetJobIDsUnlocked()
805 b7cb9024 Michael Hanselmann
        jobs_count = len(all_job_ids)
806 711b5124 Michael Hanselmann
        lastinfo = time.time()
807 711b5124 Michael Hanselmann
        for idx, job_id in enumerate(all_job_ids):
808 711b5124 Michael Hanselmann
          # Give an update every 1000 jobs or 10 seconds
809 b7cb9024 Michael Hanselmann
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
810 b7cb9024 Michael Hanselmann
              idx == (jobs_count - 1)):
811 711b5124 Michael Hanselmann
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
812 b7cb9024 Michael Hanselmann
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
813 711b5124 Michael Hanselmann
            lastinfo = time.time()
814 711b5124 Michael Hanselmann
815 711b5124 Michael Hanselmann
          job = self._LoadJobUnlocked(job_id)
816 711b5124 Michael Hanselmann
817 16714921 Michael Hanselmann
          # a failure in loading the job can cause 'None' to be returned
818 16714921 Michael Hanselmann
          if job is None:
819 16714921 Michael Hanselmann
            continue
820 94ed59a5 Iustin Pop
821 16714921 Michael Hanselmann
          status = job.CalcStatus()
822 85f03e0d Michael Hanselmann
823 16714921 Michael Hanselmann
          if status in (constants.JOB_STATUS_QUEUED, ):
824 16714921 Michael Hanselmann
            self._wpool.AddTask(job)
825 85f03e0d Michael Hanselmann
826 16714921 Michael Hanselmann
          elif status in (constants.JOB_STATUS_RUNNING,
827 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK,
828 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_CANCELING):
829 16714921 Michael Hanselmann
            logging.warning("Unfinished job %s found: %s", job.id, job)
830 39ed3a98 Guido Trotter
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
831 39ed3a98 Guido Trotter
                                  "Unclean master daemon shutdown")
832 711b5124 Michael Hanselmann
833 711b5124 Michael Hanselmann
        logging.info("Job queue inspection finished")
834 16714921 Michael Hanselmann
      finally:
835 16714921 Michael Hanselmann
        self.release()
836 16714921 Michael Hanselmann
    except:
837 16714921 Michael Hanselmann
      self._wpool.TerminateWorkers()
838 16714921 Michael Hanselmann
      raise
839 85f03e0d Michael Hanselmann
840 99bd4f0a Guido Trotter
  @locking.ssynchronized(_big_jqueue_lock)
841 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
842 99aabbed Iustin Pop
  def AddNode(self, node):
843 99aabbed Iustin Pop
    """Register a new node with the queue.
844 99aabbed Iustin Pop

845 99aabbed Iustin Pop
    @type node: L{objects.Node}
846 99aabbed Iustin Pop
    @param node: the node object to be added
847 99aabbed Iustin Pop

848 99aabbed Iustin Pop
    """
849 99aabbed Iustin Pop
    node_name = node.name
850 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
851 23752136 Michael Hanselmann
852 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
853 c8457ce7 Iustin Pop
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
854 3cebe102 Michael Hanselmann
    msg = result.fail_msg
855 c8457ce7 Iustin Pop
    if msg:
856 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
857 c8457ce7 Iustin Pop
                      node_name, msg)
858 23752136 Michael Hanselmann
859 59303563 Iustin Pop
    if not node.master_candidate:
860 59303563 Iustin Pop
      # remove if existing, ignoring errors
861 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
862 59303563 Iustin Pop
      # and skip the replication of the job ids
863 59303563 Iustin Pop
      return
864 59303563 Iustin Pop
865 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
866 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
867 23752136 Michael Hanselmann
868 d2e03a33 Michael Hanselmann
    # Upload current serial file
869 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
870 d2e03a33 Michael Hanselmann
871 d2e03a33 Michael Hanselmann
    for file_name in files:
872 9f774ee8 Michael Hanselmann
      # Read file content
873 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
874 9f774ee8 Michael Hanselmann
875 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
876 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
877 a3811745 Michael Hanselmann
                                                  file_name, content)
878 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
879 c8457ce7 Iustin Pop
      if msg:
880 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
881 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
882 d2e03a33 Michael Hanselmann
883 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
884 d2e03a33 Michael Hanselmann
885 99bd4f0a Guido Trotter
  @locking.ssynchronized(_big_jqueue_lock)
886 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
887 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
888 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
889 ea03467c Iustin Pop

890 ea03467c Iustin Pop
    @type node_name: str
891 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
892 ea03467c Iustin Pop

893 ea03467c Iustin Pop
    """
894 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
895 23752136 Michael Hanselmann
896 7e950d31 Iustin Pop
  @staticmethod
897 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
898 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
899 ea03467c Iustin Pop

900 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
901 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
902 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
903 ea03467c Iustin Pop

904 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
905 ea03467c Iustin Pop
    @type nodes: list
906 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
907 ea03467c Iustin Pop
    @type failmsg: str
908 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
909 ea03467c Iustin Pop

910 ea03467c Iustin Pop
    """
911 e74798c1 Michael Hanselmann
    failed = []
912 e74798c1 Michael Hanselmann
    success = []
913 e74798c1 Michael Hanselmann
914 e74798c1 Michael Hanselmann
    for node in nodes:
915 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
916 c8457ce7 Iustin Pop
      if msg:
917 e74798c1 Michael Hanselmann
        failed.append(node)
918 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
919 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
920 c8457ce7 Iustin Pop
      else:
921 c8457ce7 Iustin Pop
        success.append(node)
922 e74798c1 Michael Hanselmann
923 e74798c1 Michael Hanselmann
    # +1 for the master node
924 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
925 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
926 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
927 e74798c1 Michael Hanselmann
928 99aabbed Iustin Pop
  def _GetNodeIp(self):
929 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
930 99aabbed Iustin Pop

931 ea03467c Iustin Pop
    @rtype: (list, list)
932 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
933 ea03467c Iustin Pop
        names and the second one with the node addresses
934 ea03467c Iustin Pop

935 99aabbed Iustin Pop
    """
936 99aabbed Iustin Pop
    name_list = self._nodes.keys()
937 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
938 99aabbed Iustin Pop
    return name_list, addr_list
939 99aabbed Iustin Pop
940 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
941 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
942 8e00939c Michael Hanselmann

943 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
944 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
945 ea03467c Iustin Pop

946 ea03467c Iustin Pop
    @type file_name: str
947 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
948 ea03467c Iustin Pop
    @type data: str
949 ea03467c Iustin Pop
    @param data: the new contents of the file
950 4c36bdf5 Guido Trotter
    @type replicate: boolean
951 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
952 ea03467c Iustin Pop

953 8e00939c Michael Hanselmann
    """
954 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
955 8e00939c Michael Hanselmann
956 4c36bdf5 Guido Trotter
    if replicate:
957 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
958 4c36bdf5 Guido Trotter
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
959 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
960 23752136 Michael Hanselmann
961 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
962 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
963 ea03467c Iustin Pop

964 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
965 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
966 ea03467c Iustin Pop

967 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
968 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
969 ea03467c Iustin Pop

970 ea03467c Iustin Pop
    """
971 dd875d32 Michael Hanselmann
    # Rename them locally
972 d7fd1f28 Michael Hanselmann
    for old, new in rename:
973 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
974 abc1f2ce Michael Hanselmann
975 dd875d32 Michael Hanselmann
    # ... and on all nodes
976 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
977 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
978 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
979 abc1f2ce Michael Hanselmann
980 7e950d31 Iustin Pop
  @staticmethod
981 7e950d31 Iustin Pop
  def _FormatJobID(job_id):
982 ea03467c Iustin Pop
    """Convert a job ID to string format.
983 ea03467c Iustin Pop

984 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
985 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
986 ea03467c Iustin Pop
    abstract this change.
987 ea03467c Iustin Pop

988 ea03467c Iustin Pop
    @type job_id: int or long
989 ea03467c Iustin Pop
    @param job_id: the numeric job id
990 ea03467c Iustin Pop
    @rtype: str
991 ea03467c Iustin Pop
    @return: the formatted job id
992 ea03467c Iustin Pop

993 ea03467c Iustin Pop
    """
994 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
995 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
996 85f03e0d Michael Hanselmann
    if job_id < 0:
997 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
998 85f03e0d Michael Hanselmann
999 85f03e0d Michael Hanselmann
    return str(job_id)
1000 85f03e0d Michael Hanselmann
1001 58b22b6e Michael Hanselmann
  @classmethod
1002 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
1003 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
1004 58b22b6e Michael Hanselmann

1005 58b22b6e Michael Hanselmann
    @type job_id: str
1006 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
1007 58b22b6e Michael Hanselmann
    @rtype: str
1008 58b22b6e Michael Hanselmann
    @return: Directory name
1009 58b22b6e Michael Hanselmann

1010 58b22b6e Michael Hanselmann
    """
1011 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1012 58b22b6e Michael Hanselmann
1013 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1014 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1015 f1da30e6 Michael Hanselmann

1016 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1017 f1da30e6 Michael Hanselmann

1018 009e73d0 Iustin Pop
    @type count: integer
1019 009e73d0 Iustin Pop
    @param count: how many serials to return
1020 ea03467c Iustin Pop
    @rtype: str
1021 ea03467c Iustin Pop
    @return: a string representing the job identifier.
1022 f1da30e6 Michael Hanselmann

1023 f1da30e6 Michael Hanselmann
    """
1024 009e73d0 Iustin Pop
    assert count > 0
1025 f1da30e6 Michael Hanselmann
    # New number
1026 009e73d0 Iustin Pop
    serial = self._last_serial + count
1027 f1da30e6 Michael Hanselmann
1028 f1da30e6 Michael Hanselmann
    # Write to file
1029 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1030 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1031 f1da30e6 Michael Hanselmann
1032 009e73d0 Iustin Pop
    result = [self._FormatJobID(v)
1033 009e73d0 Iustin Pop
              for v in range(self._last_serial, serial + 1)]
1034 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1035 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1036 f1da30e6 Michael Hanselmann
1037 009e73d0 Iustin Pop
    return result
1038 f1da30e6 Michael Hanselmann
1039 85f03e0d Michael Hanselmann
  @staticmethod
1040 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1041 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1042 ea03467c Iustin Pop

1043 ea03467c Iustin Pop
    @type job_id: str
1044 ea03467c Iustin Pop
    @param job_id: the job identifier
1045 ea03467c Iustin Pop
    @rtype: str
1046 ea03467c Iustin Pop
    @return: the path to the job file
1047 ea03467c Iustin Pop

1048 ea03467c Iustin Pop
    """
1049 c4feafe8 Iustin Pop
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1050 f1da30e6 Michael Hanselmann
1051 58b22b6e Michael Hanselmann
  @classmethod
1052 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
1053 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1054 ea03467c Iustin Pop

1055 ea03467c Iustin Pop
    @type job_id: str
1056 ea03467c Iustin Pop
    @param job_id: the job identifier
1057 ea03467c Iustin Pop
    @rtype: str
1058 ea03467c Iustin Pop
    @return: the path to the archived job file
1059 ea03467c Iustin Pop

1060 ea03467c Iustin Pop
    """
1061 0411c011 Iustin Pop
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1062 0411c011 Iustin Pop
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1063 0cb94105 Michael Hanselmann
1064 85a1c57d Guido Trotter
  def _GetJobIDsUnlocked(self, sort=True):
1065 911a495b Iustin Pop
    """Return all known job IDs.
1066 911a495b Iustin Pop

1067 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1068 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1069 ac0930b9 Iustin Pop
    extra IDs).
1070 ac0930b9 Iustin Pop

1071 85a1c57d Guido Trotter
    @type sort: boolean
1072 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1073 ea03467c Iustin Pop
    @rtype: list
1074 ea03467c Iustin Pop
    @return: the list of job IDs
1075 ea03467c Iustin Pop

1076 911a495b Iustin Pop
    """
1077 85a1c57d Guido Trotter
    jlist = []
1078 b5b8309d Guido Trotter
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1079 85a1c57d Guido Trotter
      m = self._RE_JOB_FILE.match(filename)
1080 85a1c57d Guido Trotter
      if m:
1081 85a1c57d Guido Trotter
        jlist.append(m.group(1))
1082 85a1c57d Guido Trotter
    if sort:
1083 85a1c57d Guido Trotter
      jlist = utils.NiceSort(jlist)
1084 f0d874fe Iustin Pop
    return jlist
1085 911a495b Iustin Pop
1086 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1087 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1088 ea03467c Iustin Pop

1089 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1090 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1091 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1092 ea03467c Iustin Pop

1093 ea03467c Iustin Pop
    @param job_id: the job id
1094 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1095 ea03467c Iustin Pop
    @return: either None or the job object
1096 ea03467c Iustin Pop

1097 ea03467c Iustin Pop
    """
1098 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
1099 5685c1a5 Michael Hanselmann
    if job:
1100 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
1101 5685c1a5 Michael Hanselmann
      return job
1102 ac0930b9 Iustin Pop
1103 3d6c5566 Guido Trotter
    try:
1104 3d6c5566 Guido Trotter
      job = self._LoadJobFromDisk(job_id)
1105 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
1106 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
1107 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
1108 3d6c5566 Guido Trotter
      if old_path == new_path:
1109 3d6c5566 Guido Trotter
        # job already archived (future case)
1110 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
1111 3d6c5566 Guido Trotter
      else:
1112 3d6c5566 Guido Trotter
        # non-archived case
1113 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
1114 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
1115 3d6c5566 Guido Trotter
      return None
1116 162c8636 Guido Trotter
1117 162c8636 Guido Trotter
    self._memcache[job_id] = job
1118 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
1119 162c8636 Guido Trotter
    return job
1120 162c8636 Guido Trotter
1121 162c8636 Guido Trotter
  def _LoadJobFromDisk(self, job_id):
1122 162c8636 Guido Trotter
    """Load the given job file from disk.
1123 162c8636 Guido Trotter

1124 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1125 162c8636 Guido Trotter

1126 162c8636 Guido Trotter
    @type job_id: string
1127 162c8636 Guido Trotter
    @param job_id: job identifier
1128 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
1129 162c8636 Guido Trotter
    @return: either None or the job object
1130 162c8636 Guido Trotter

1131 162c8636 Guido Trotter
    """
1132 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
1133 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
1134 f1da30e6 Michael Hanselmann
    try:
1135 13998ef2 Michael Hanselmann
      raw_data = utils.ReadFile(filepath)
1136 162c8636 Guido Trotter
    except EnvironmentError, err:
1137 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
1138 f1da30e6 Michael Hanselmann
        return None
1139 f1da30e6 Michael Hanselmann
      raise
1140 13998ef2 Michael Hanselmann
1141 94ed59a5 Iustin Pop
    try:
1142 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
1143 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
1144 7260cfbe Iustin Pop
    except Exception, err: # pylint: disable-msg=W0703
1145 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
1146 94ed59a5 Iustin Pop
1147 ac0930b9 Iustin Pop
    return job
1148 f1da30e6 Michael Hanselmann
1149 0f9c08dc Guido Trotter
  def SafeLoadJobFromDisk(self, job_id):
1150 0f9c08dc Guido Trotter
    """Load the given job file from disk.
1151 0f9c08dc Guido Trotter

1152 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1153 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
1154 0f9c08dc Guido Trotter
    exception is logged.
1155 0f9c08dc Guido Trotter

1156 0f9c08dc Guido Trotter
    @type job_id: string
1157 0f9c08dc Guido Trotter
    @param job_id: job identifier
1158 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
1159 0f9c08dc Guido Trotter
    @return: either None or the job object
1160 0f9c08dc Guido Trotter

1161 0f9c08dc Guido Trotter
    """
1162 0f9c08dc Guido Trotter
    try:
1163 0f9c08dc Guido Trotter
      return self._LoadJobFromDisk(job_id)
1164 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
1165 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
1166 0f9c08dc Guido Trotter
      return None
1167 0f9c08dc Guido Trotter
1168 686d7433 Iustin Pop
  @staticmethod
1169 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
1170 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
1171 686d7433 Iustin Pop

1172 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
1173 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
1174 686d7433 Iustin Pop

1175 ea03467c Iustin Pop
    @rtype: boolean
1176 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
1177 ea03467c Iustin Pop

1178 686d7433 Iustin Pop
    """
1179 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1180 686d7433 Iustin Pop
1181 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
1182 20571a26 Guido Trotter
    """Update the queue size.
1183 20571a26 Guido Trotter

1184 20571a26 Guido Trotter
    """
1185 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1186 20571a26 Guido Trotter
1187 99bd4f0a Guido Trotter
  @locking.ssynchronized(_big_jqueue_lock)
1188 20571a26 Guido Trotter
  @_RequireOpenQueue
1189 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
1190 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
1191 3ccafd0e Iustin Pop

1192 ea03467c Iustin Pop
    @type drain_flag: boolean
1193 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
1194 ea03467c Iustin Pop

1195 3ccafd0e Iustin Pop
    """
1196 3ccafd0e Iustin Pop
    if drain_flag:
1197 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1198 3ccafd0e Iustin Pop
    else:
1199 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1200 20571a26 Guido Trotter
1201 20571a26 Guido Trotter
    self._drained = drain_flag
1202 20571a26 Guido Trotter
1203 3ccafd0e Iustin Pop
    return True
1204 3ccafd0e Iustin Pop
1205 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1206 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
1207 85f03e0d Michael Hanselmann
    """Create and store a new job.
1208 f1da30e6 Michael Hanselmann

1209 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
1210 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
1211 c3f0a12f Iustin Pop

1212 009e73d0 Iustin Pop
    @type job_id: job ID
1213 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
1214 c3f0a12f Iustin Pop
    @type ops: list
1215 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
1216 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
1217 7beb1e53 Guido Trotter
    @return: the job object to be queued
1218 7beb1e53 Guido Trotter
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1219 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1220 c3f0a12f Iustin Pop

1221 c3f0a12f Iustin Pop
    """
1222 20571a26 Guido Trotter
    # Ok when sharing the big job queue lock, as the drain file is created when
1223 20571a26 Guido Trotter
    # the lock is exclusive.
1224 20571a26 Guido Trotter
    if self._drained:
1225 2971c913 Iustin Pop
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1226 f87b405e Michael Hanselmann
1227 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1228 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
1229 f87b405e Michael Hanselmann
1230 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
1231 f1da30e6 Michael Hanselmann
1232 f1da30e6 Michael Hanselmann
    # Write to disk
1233 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
1234 f1da30e6 Michael Hanselmann
1235 20571a26 Guido Trotter
    self._queue_size += 1
1236 20571a26 Guido Trotter
1237 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
1238 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
1239 ac0930b9 Iustin Pop
1240 7beb1e53 Guido Trotter
    return job
1241 f1da30e6 Michael Hanselmann
1242 99bd4f0a Guido Trotter
  @locking.ssynchronized(_big_jqueue_lock)
1243 2971c913 Iustin Pop
  @_RequireOpenQueue
1244 2971c913 Iustin Pop
  def SubmitJob(self, ops):
1245 2971c913 Iustin Pop
    """Create and store a new job.
1246 2971c913 Iustin Pop

1247 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1248 2971c913 Iustin Pop

1249 2971c913 Iustin Pop
    """
1250 009e73d0 Iustin Pop
    job_id = self._NewSerialsUnlocked(1)[0]
1251 7beb1e53 Guido Trotter
    self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
1252 7beb1e53 Guido Trotter
    return job_id
1253 2971c913 Iustin Pop
1254 99bd4f0a Guido Trotter
  @locking.ssynchronized(_big_jqueue_lock)
1255 2971c913 Iustin Pop
  @_RequireOpenQueue
1256 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
1257 2971c913 Iustin Pop
    """Create and store multiple jobs.
1258 2971c913 Iustin Pop

1259 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1260 2971c913 Iustin Pop

1261 2971c913 Iustin Pop
    """
1262 2971c913 Iustin Pop
    results = []
1263 7beb1e53 Guido Trotter
    tasks = []
1264 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1265 009e73d0 Iustin Pop
    for job_id, ops in zip(all_job_ids, jobs):
1266 2971c913 Iustin Pop
      try:
1267 7beb1e53 Guido Trotter
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1268 2971c913 Iustin Pop
        status = True
1269 7beb1e53 Guido Trotter
        data = job_id
1270 2971c913 Iustin Pop
      except errors.GenericError, err:
1271 2971c913 Iustin Pop
        data = str(err)
1272 2971c913 Iustin Pop
        status = False
1273 2971c913 Iustin Pop
      results.append((status, data))
1274 7beb1e53 Guido Trotter
    self._wpool.AddManyTasks(tasks)
1275 2971c913 Iustin Pop
1276 2971c913 Iustin Pop
    return results
1277 2971c913 Iustin Pop
1278 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1279 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
1280 ea03467c Iustin Pop
    """Update a job's on disk storage.
1281 ea03467c Iustin Pop

1282 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
1283 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
1284 ea03467c Iustin Pop
    nodes.
1285 ea03467c Iustin Pop

1286 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1287 ea03467c Iustin Pop
    @param job: the changed job
1288 4c36bdf5 Guido Trotter
    @type replicate: boolean
1289 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
1290 ea03467c Iustin Pop

1291 ea03467c Iustin Pop
    """
1292 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
1293 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
1294 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
1295 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
1296 ac0930b9 Iustin Pop
1297 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1298 5c735209 Iustin Pop
                        timeout):
1299 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
1300 6c5a7090 Michael Hanselmann

1301 6c5a7090 Michael Hanselmann
    @type job_id: string
1302 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
1303 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1304 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1305 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1306 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1307 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1308 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1309 5c735209 Iustin Pop
    @type timeout: float
1310 5c735209 Iustin Pop
    @param timeout: maximum time to wait
1311 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1312 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1313 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1314 ea03467c Iustin Pop

1315 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1316 ea03467c Iustin Pop
        we instead return a special value,
1317 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1318 ea03467c Iustin Pop
        as such by the clients
1319 6c5a7090 Michael Hanselmann

1320 6c5a7090 Michael Hanselmann
    """
1321 6c2549d6 Guido Trotter
    helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
1322 6c2549d6 Guido Trotter
                                      prev_log_serial, self)
1323 6bcb1446 Michael Hanselmann
    try:
1324 6c2549d6 Guido Trotter
      return helper.WaitForChanges(timeout)
1325 6c2549d6 Guido Trotter
    finally:
1326 6c2549d6 Guido Trotter
      helper.Close()
1327 dfe57c22 Michael Hanselmann
1328 99bd4f0a Guido Trotter
  @locking.ssynchronized(_big_jqueue_lock)
1329 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1330 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1331 188c5e0a Michael Hanselmann
    """Cancels a job.
1332 188c5e0a Michael Hanselmann

1333 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1334 ea03467c Iustin Pop

1335 188c5e0a Michael Hanselmann
    @type job_id: string
1336 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1337 188c5e0a Michael Hanselmann

1338 188c5e0a Michael Hanselmann
    """
1339 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1340 188c5e0a Michael Hanselmann
1341 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1342 188c5e0a Michael Hanselmann
    if not job:
1343 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1344 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1345 fbf0262f Michael Hanselmann
1346 fbf0262f Michael Hanselmann
    job_status = job.CalcStatus()
1347 188c5e0a Michael Hanselmann
1348 fbf0262f Michael Hanselmann
    if job_status not in (constants.JOB_STATUS_QUEUED,
1349 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK):
1350 a9e97393 Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1351 a9e97393 Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1352 fbf0262f Michael Hanselmann
1353 fbf0262f Michael Hanselmann
    if job_status == constants.JOB_STATUS_QUEUED:
1354 39ed3a98 Guido Trotter
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1355 39ed3a98 Guido Trotter
                            "Job canceled by request")
1356 fbf0262f Michael Hanselmann
      return (True, "Job %s canceled" % job.id)
1357 188c5e0a Michael Hanselmann
1358 fbf0262f Michael Hanselmann
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1359 fbf0262f Michael Hanselmann
      # The worker will notice the new status and cancel the job
1360 39ed3a98 Guido Trotter
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1361 fbf0262f Michael Hanselmann
      return (True, "Job %s will be canceled" % job.id)
1362 fbf0262f Michael Hanselmann
1363 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1364 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
1365 d7fd1f28 Michael Hanselmann
    """Archives jobs.
1366 c609f802 Michael Hanselmann

1367 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
1368 25e7b43f Iustin Pop
    @param jobs: Job objects
1369 d7fd1f28 Michael Hanselmann
    @rtype: int
1370 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
1371 c609f802 Michael Hanselmann

1372 c609f802 Michael Hanselmann
    """
1373 d7fd1f28 Michael Hanselmann
    archive_jobs = []
1374 d7fd1f28 Michael Hanselmann
    rename_files = []
1375 d7fd1f28 Michael Hanselmann
    for job in jobs:
1376 d7fd1f28 Michael Hanselmann
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1377 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_SUCCESS,
1378 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_ERROR):
1379 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
1380 d7fd1f28 Michael Hanselmann
        continue
1381 c609f802 Michael Hanselmann
1382 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
1383 c609f802 Michael Hanselmann
1384 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
1385 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
1386 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
1387 c609f802 Michael Hanselmann
1388 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
1389 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
1390 f1da30e6 Michael Hanselmann
1391 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
1392 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
1393 d7fd1f28 Michael Hanselmann
1394 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1395 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
1396 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
1397 20571a26 Guido Trotter
    # archived jobs to fix this.
1398 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1399 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
1400 78d12585 Michael Hanselmann
1401 99bd4f0a Guido Trotter
  @locking.ssynchronized(_big_jqueue_lock)
1402 07cd723a Iustin Pop
  @_RequireOpenQueue
1403 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1404 07cd723a Iustin Pop
    """Archives a job.
1405 07cd723a Iustin Pop

1406 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1407 ea03467c Iustin Pop

1408 07cd723a Iustin Pop
    @type job_id: string
1409 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1410 78d12585 Michael Hanselmann
    @rtype: bool
1411 78d12585 Michael Hanselmann
    @return: Whether job was archived
1412 07cd723a Iustin Pop

1413 07cd723a Iustin Pop
    """
1414 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
1415 78d12585 Michael Hanselmann
1416 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1417 78d12585 Michael Hanselmann
    if not job:
1418 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1419 78d12585 Michael Hanselmann
      return False
1420 78d12585 Michael Hanselmann
1421 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
1422 07cd723a Iustin Pop
1423 99bd4f0a Guido Trotter
  @locking.ssynchronized(_big_jqueue_lock)
1424 07cd723a Iustin Pop
  @_RequireOpenQueue
1425 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
1426 07cd723a Iustin Pop
    """Archives all jobs based on age.
1427 07cd723a Iustin Pop

1428 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1429 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1430 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1431 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1432 07cd723a Iustin Pop

1433 07cd723a Iustin Pop
    @type age: int
1434 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1435 07cd723a Iustin Pop

1436 07cd723a Iustin Pop
    """
1437 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1438 07cd723a Iustin Pop
1439 07cd723a Iustin Pop
    now = time.time()
1440 f8ad5591 Michael Hanselmann
    end_time = now + timeout
1441 f8ad5591 Michael Hanselmann
    archived_count = 0
1442 f8ad5591 Michael Hanselmann
    last_touched = 0
1443 f8ad5591 Michael Hanselmann
1444 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
1445 d7fd1f28 Michael Hanselmann
    pending = []
1446 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1447 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
1448 f8ad5591 Michael Hanselmann
1449 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
1450 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
1451 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
1452 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
1453 f8ad5591 Michael Hanselmann
        break
1454 f8ad5591 Michael Hanselmann
1455 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
1456 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1457 f8ad5591 Michael Hanselmann
      if job:
1458 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
1459 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
1460 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
1461 f8ad5591 Michael Hanselmann
          else:
1462 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
1463 07cd723a Iustin Pop
        else:
1464 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
1465 f8ad5591 Michael Hanselmann
1466 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
1467 d7fd1f28 Michael Hanselmann
          pending.append(job)
1468 d7fd1f28 Michael Hanselmann
1469 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
1470 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
1471 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
1472 d7fd1f28 Michael Hanselmann
            pending = []
1473 f8ad5591 Michael Hanselmann
1474 d7fd1f28 Michael Hanselmann
    if pending:
1475 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
1476 07cd723a Iustin Pop
1477 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
1478 07cd723a Iustin Pop
1479 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1480 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1481 e2715f69 Michael Hanselmann

1482 ea03467c Iustin Pop
    @type job_ids: list
1483 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1484 ea03467c Iustin Pop
    @type fields: list
1485 ea03467c Iustin Pop
    @param fields: names of fields to return
1486 ea03467c Iustin Pop
    @rtype: list
1487 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1488 ea03467c Iustin Pop
        the requested fields
1489 e2715f69 Michael Hanselmann

1490 e2715f69 Michael Hanselmann
    """
1491 85f03e0d Michael Hanselmann
    jobs = []
1492 9f7b4967 Guido Trotter
    list_all = False
1493 9f7b4967 Guido Trotter
    if not job_ids:
1494 9f7b4967 Guido Trotter
      # Since files are added to/removed from the queue atomically, there's no
1495 9f7b4967 Guido Trotter
      # risk of getting the job ids in an inconsistent state.
1496 9f7b4967 Guido Trotter
      job_ids = self._GetJobIDsUnlocked()
1497 9f7b4967 Guido Trotter
      list_all = True
1498 e2715f69 Michael Hanselmann
1499 9f7b4967 Guido Trotter
    for job_id in job_ids:
1500 9f7b4967 Guido Trotter
      job = self.SafeLoadJobFromDisk(job_id)
1501 9f7b4967 Guido Trotter
      if job is not None:
1502 6a290889 Guido Trotter
        jobs.append(job.GetInfo(fields))
1503 9f7b4967 Guido Trotter
      elif not list_all:
1504 9f7b4967 Guido Trotter
        jobs.append(None)
1505 e2715f69 Michael Hanselmann
1506 85f03e0d Michael Hanselmann
    return jobs
1507 e2715f69 Michael Hanselmann
1508 99bd4f0a Guido Trotter
  @locking.ssynchronized(_big_jqueue_lock)
1509 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1510 e2715f69 Michael Hanselmann
  def Shutdown(self):
1511 e2715f69 Michael Hanselmann
    """Stops the job queue.
1512 e2715f69 Michael Hanselmann

1513 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1514 ea03467c Iustin Pop

1515 e2715f69 Michael Hanselmann
    """
1516 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1517 85f03e0d Michael Hanselmann
1518 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
1519 a71f9c7d Guido Trotter
    self._queue_filelock = None