Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 82b22e19

History | View | Annotate | Download (49.2 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 7f93570a Iustin Pop
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 f1da30e6 Michael Hanselmann
import errno
35 f1da30e6 Michael Hanselmann
import re
36 f1048938 Iustin Pop
import time
37 5685c1a5 Michael Hanselmann
import weakref
38 498ae1cc Iustin Pop
39 6c2549d6 Guido Trotter
try:
40 6c2549d6 Guido Trotter
  # pylint: disable-msg=E0611
41 6c2549d6 Guido Trotter
  from pyinotify import pyinotify
42 6c2549d6 Guido Trotter
except ImportError:
43 6c2549d6 Guido Trotter
  import pyinotify
44 6c2549d6 Guido Trotter
45 6c2549d6 Guido Trotter
from ganeti import asyncnotifier
46 e2715f69 Michael Hanselmann
from ganeti import constants
47 f1da30e6 Michael Hanselmann
from ganeti import serializer
48 e2715f69 Michael Hanselmann
from ganeti import workerpool
49 99bd4f0a Guido Trotter
from ganeti import locking
50 f1da30e6 Michael Hanselmann
from ganeti import opcodes
51 7a1ecaed Iustin Pop
from ganeti import errors
52 e2715f69 Michael Hanselmann
from ganeti import mcpu
53 7996a135 Iustin Pop
from ganeti import utils
54 04ab05ce Michael Hanselmann
from ganeti import jstore
55 c3f0a12f Iustin Pop
from ganeti import rpc
56 82b22e19 René Nussbaumer
from ganeti import runtime
57 a744b676 Manuel Franceschini
from ganeti import netutils
58 989a8bee Michael Hanselmann
from ganeti import compat
59 e2715f69 Michael Hanselmann
60 fbf0262f Michael Hanselmann
61 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
62 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
63 e2715f69 Michael Hanselmann
64 ebb80afa Guido Trotter
# member lock names to be passed to @ssynchronized decorator
65 ebb80afa Guido Trotter
_LOCK = "_lock"
66 ebb80afa Guido Trotter
_QUEUE = "_queue"
67 99bd4f0a Guido Trotter
68 498ae1cc Iustin Pop
69 9728ae5d Iustin Pop
class CancelJob(Exception):
70 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
71 fbf0262f Michael Hanselmann

72 fbf0262f Michael Hanselmann
  """
73 fbf0262f Michael Hanselmann
74 fbf0262f Michael Hanselmann
75 70552c46 Michael Hanselmann
def TimeStampNow():
76 ea03467c Iustin Pop
  """Returns the current timestamp.
77 ea03467c Iustin Pop

78 ea03467c Iustin Pop
  @rtype: tuple
79 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
80 ea03467c Iustin Pop

81 ea03467c Iustin Pop
  """
82 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
83 70552c46 Michael Hanselmann
84 70552c46 Michael Hanselmann
85 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
86 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
87 e2715f69 Michael Hanselmann

88 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
89 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
90 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
91 ea03467c Iustin Pop
  @ivar status: the current status
92 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
93 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
94 b9b5abcb Iustin Pop
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
96 f1048938 Iustin Pop

97 e2715f69 Michael Hanselmann
  """
98 66d895a8 Iustin Pop
  __slots__ = ["input", "status", "result", "log",
99 b9b5abcb Iustin Pop
               "start_timestamp", "exec_timestamp", "end_timestamp",
100 66d895a8 Iustin Pop
               "__weakref__"]
101 66d895a8 Iustin Pop
102 85f03e0d Michael Hanselmann
  def __init__(self, op):
103 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
104 ea03467c Iustin Pop

105 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
106 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
107 ea03467c Iustin Pop

108 ea03467c Iustin Pop
    """
109 85f03e0d Michael Hanselmann
    self.input = op
110 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
111 85f03e0d Michael Hanselmann
    self.result = None
112 85f03e0d Michael Hanselmann
    self.log = []
113 70552c46 Michael Hanselmann
    self.start_timestamp = None
114 b9b5abcb Iustin Pop
    self.exec_timestamp = None
115 70552c46 Michael Hanselmann
    self.end_timestamp = None
116 f1da30e6 Michael Hanselmann
117 f1da30e6 Michael Hanselmann
  @classmethod
118 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
119 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
120 ea03467c Iustin Pop

121 ea03467c Iustin Pop
    @type state: dict
122 ea03467c Iustin Pop
    @param state: the serialized state
123 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
124 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
125 ea03467c Iustin Pop

126 ea03467c Iustin Pop
    """
127 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
128 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
129 85f03e0d Michael Hanselmann
    obj.status = state["status"]
130 85f03e0d Michael Hanselmann
    obj.result = state["result"]
131 85f03e0d Michael Hanselmann
    obj.log = state["log"]
132 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
133 b9b5abcb Iustin Pop
    obj.exec_timestamp = state.get("exec_timestamp", None)
134 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
135 f1da30e6 Michael Hanselmann
    return obj
136 f1da30e6 Michael Hanselmann
137 f1da30e6 Michael Hanselmann
  def Serialize(self):
138 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
139 ea03467c Iustin Pop

140 ea03467c Iustin Pop
    @rtype: dict
141 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
142 ea03467c Iustin Pop

143 ea03467c Iustin Pop
    """
144 6c5a7090 Michael Hanselmann
    return {
145 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
146 6c5a7090 Michael Hanselmann
      "status": self.status,
147 6c5a7090 Michael Hanselmann
      "result": self.result,
148 6c5a7090 Michael Hanselmann
      "log": self.log,
149 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
150 b9b5abcb Iustin Pop
      "exec_timestamp": self.exec_timestamp,
151 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
152 6c5a7090 Michael Hanselmann
      }
153 f1048938 Iustin Pop
154 e2715f69 Michael Hanselmann
155 e2715f69 Michael Hanselmann
class _QueuedJob(object):
156 e2715f69 Michael Hanselmann
  """In-memory job representation.
157 e2715f69 Michael Hanselmann

158 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
159 ea03467c Iustin Pop
  be taken care of by users of this class.
160 ea03467c Iustin Pop

161 ea03467c Iustin Pop
  @type queue: L{JobQueue}
162 ea03467c Iustin Pop
  @ivar queue: the parent queue
163 ea03467c Iustin Pop
  @ivar id: the job ID
164 ea03467c Iustin Pop
  @type ops: list
165 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
166 ea03467c Iustin Pop
  @type log_serial: int
167 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
168 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
169 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
170 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
171 e2715f69 Michael Hanselmann

172 e2715f69 Michael Hanselmann
  """
173 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
174 d25c1d6a Michael Hanselmann
  __slots__ = ["queue", "id", "ops", "log_serial",
175 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
176 66d895a8 Iustin Pop
               "__weakref__"]
177 66d895a8 Iustin Pop
178 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
179 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
180 ea03467c Iustin Pop

181 ea03467c Iustin Pop
    @type queue: L{JobQueue}
182 ea03467c Iustin Pop
    @param queue: our parent queue
183 ea03467c Iustin Pop
    @type job_id: job_id
184 ea03467c Iustin Pop
    @param job_id: our job id
185 ea03467c Iustin Pop
    @type ops: list
186 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
187 ea03467c Iustin Pop
        in _QueuedOpCodes
188 ea03467c Iustin Pop

189 ea03467c Iustin Pop
    """
190 e2715f69 Michael Hanselmann
    if not ops:
191 c910bccb Guido Trotter
      raise errors.GenericError("A job needs at least one opcode")
192 e2715f69 Michael Hanselmann
193 85f03e0d Michael Hanselmann
    self.queue = queue
194 f1da30e6 Michael Hanselmann
    self.id = job_id
195 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
196 6c5a7090 Michael Hanselmann
    self.log_serial = 0
197 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
198 c56ec146 Iustin Pop
    self.start_timestamp = None
199 c56ec146 Iustin Pop
    self.end_timestamp = None
200 6c5a7090 Michael Hanselmann
201 9fa2e150 Michael Hanselmann
  def __repr__(self):
202 9fa2e150 Michael Hanselmann
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
203 9fa2e150 Michael Hanselmann
              "id=%s" % self.id,
204 9fa2e150 Michael Hanselmann
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
205 9fa2e150 Michael Hanselmann
206 9fa2e150 Michael Hanselmann
    return "<%s at %#x>" % (" ".join(status), id(self))
207 9fa2e150 Michael Hanselmann
208 f1da30e6 Michael Hanselmann
  @classmethod
209 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
210 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
211 ea03467c Iustin Pop

212 ea03467c Iustin Pop
    @type queue: L{JobQueue}
213 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
214 ea03467c Iustin Pop
    @type state: dict
215 ea03467c Iustin Pop
    @param state: the serialized state
216 ea03467c Iustin Pop
    @rtype: _JobQueue
217 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
218 ea03467c Iustin Pop

219 ea03467c Iustin Pop
    """
220 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
221 85f03e0d Michael Hanselmann
    obj.queue = queue
222 85f03e0d Michael Hanselmann
    obj.id = state["id"]
223 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
224 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
225 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
226 6c5a7090 Michael Hanselmann
227 6c5a7090 Michael Hanselmann
    obj.ops = []
228 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
229 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
230 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
231 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
232 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
233 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
234 6c5a7090 Michael Hanselmann
235 f1da30e6 Michael Hanselmann
    return obj
236 f1da30e6 Michael Hanselmann
237 f1da30e6 Michael Hanselmann
  def Serialize(self):
238 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
239 ea03467c Iustin Pop

240 ea03467c Iustin Pop
    @rtype: dict
241 ea03467c Iustin Pop
    @return: the serialized state
242 ea03467c Iustin Pop

243 ea03467c Iustin Pop
    """
244 f1da30e6 Michael Hanselmann
    return {
245 f1da30e6 Michael Hanselmann
      "id": self.id,
246 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
247 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
248 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
249 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
250 f1da30e6 Michael Hanselmann
      }
251 f1da30e6 Michael Hanselmann
252 85f03e0d Michael Hanselmann
  def CalcStatus(self):
253 ea03467c Iustin Pop
    """Compute the status of this job.
254 ea03467c Iustin Pop

255 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
256 ea03467c Iustin Pop
    based on their status, computes the job status.
257 ea03467c Iustin Pop

258 ea03467c Iustin Pop
    The algorithm is:
259 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
260 ea03467c Iustin Pop
        status will be the same
261 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
262 ea03467c Iustin Pop
          - waitlock
263 fbf0262f Michael Hanselmann
          - canceling
264 ea03467c Iustin Pop
          - running
265 ea03467c Iustin Pop

266 ea03467c Iustin Pop
        will determine the job status
267 ea03467c Iustin Pop

268 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
269 ea03467c Iustin Pop
        and the job status will be the same
270 ea03467c Iustin Pop

271 ea03467c Iustin Pop
    @return: the job status
272 ea03467c Iustin Pop

273 ea03467c Iustin Pop
    """
274 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
275 e2715f69 Michael Hanselmann
276 e2715f69 Michael Hanselmann
    all_success = True
277 85f03e0d Michael Hanselmann
    for op in self.ops:
278 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
279 e2715f69 Michael Hanselmann
        continue
280 e2715f69 Michael Hanselmann
281 e2715f69 Michael Hanselmann
      all_success = False
282 e2715f69 Michael Hanselmann
283 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
284 e2715f69 Michael Hanselmann
        pass
285 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
286 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
287 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
288 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
289 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
290 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
291 fbf0262f Michael Hanselmann
        break
292 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
293 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
294 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
295 f1da30e6 Michael Hanselmann
        break
296 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
297 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
298 4cb1d919 Michael Hanselmann
        break
299 e2715f69 Michael Hanselmann
300 e2715f69 Michael Hanselmann
    if all_success:
301 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
302 e2715f69 Michael Hanselmann
303 e2715f69 Michael Hanselmann
    return status
304 e2715f69 Michael Hanselmann
305 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
306 ea03467c Iustin Pop
    """Selectively returns the log entries.
307 ea03467c Iustin Pop

308 ea03467c Iustin Pop
    @type newer_than: None or int
309 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
310 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
311 ea03467c Iustin Pop
        than this value
312 ea03467c Iustin Pop
    @rtype: list
313 ea03467c Iustin Pop
    @return: the list of the log entries selected
314 ea03467c Iustin Pop

315 ea03467c Iustin Pop
    """
316 6c5a7090 Michael Hanselmann
    if newer_than is None:
317 6c5a7090 Michael Hanselmann
      serial = -1
318 6c5a7090 Michael Hanselmann
    else:
319 6c5a7090 Michael Hanselmann
      serial = newer_than
320 6c5a7090 Michael Hanselmann
321 6c5a7090 Michael Hanselmann
    entries = []
322 6c5a7090 Michael Hanselmann
    for op in self.ops:
323 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
324 6c5a7090 Michael Hanselmann
325 6c5a7090 Michael Hanselmann
    return entries
326 6c5a7090 Michael Hanselmann
327 6a290889 Guido Trotter
  def GetInfo(self, fields):
328 6a290889 Guido Trotter
    """Returns information about a job.
329 6a290889 Guido Trotter

330 6a290889 Guido Trotter
    @type fields: list
331 6a290889 Guido Trotter
    @param fields: names of fields to return
332 6a290889 Guido Trotter
    @rtype: list
333 6a290889 Guido Trotter
    @return: list with one element for each field
334 6a290889 Guido Trotter
    @raise errors.OpExecError: when an invalid field
335 6a290889 Guido Trotter
        has been passed
336 6a290889 Guido Trotter

337 6a290889 Guido Trotter
    """
338 6a290889 Guido Trotter
    row = []
339 6a290889 Guido Trotter
    for fname in fields:
340 6a290889 Guido Trotter
      if fname == "id":
341 6a290889 Guido Trotter
        row.append(self.id)
342 6a290889 Guido Trotter
      elif fname == "status":
343 6a290889 Guido Trotter
        row.append(self.CalcStatus())
344 6a290889 Guido Trotter
      elif fname == "ops":
345 6a290889 Guido Trotter
        row.append([op.input.__getstate__() for op in self.ops])
346 6a290889 Guido Trotter
      elif fname == "opresult":
347 6a290889 Guido Trotter
        row.append([op.result for op in self.ops])
348 6a290889 Guido Trotter
      elif fname == "opstatus":
349 6a290889 Guido Trotter
        row.append([op.status for op in self.ops])
350 6a290889 Guido Trotter
      elif fname == "oplog":
351 6a290889 Guido Trotter
        row.append([op.log for op in self.ops])
352 6a290889 Guido Trotter
      elif fname == "opstart":
353 6a290889 Guido Trotter
        row.append([op.start_timestamp for op in self.ops])
354 6a290889 Guido Trotter
      elif fname == "opexec":
355 6a290889 Guido Trotter
        row.append([op.exec_timestamp for op in self.ops])
356 6a290889 Guido Trotter
      elif fname == "opend":
357 6a290889 Guido Trotter
        row.append([op.end_timestamp for op in self.ops])
358 6a290889 Guido Trotter
      elif fname == "received_ts":
359 6a290889 Guido Trotter
        row.append(self.received_timestamp)
360 6a290889 Guido Trotter
      elif fname == "start_ts":
361 6a290889 Guido Trotter
        row.append(self.start_timestamp)
362 6a290889 Guido Trotter
      elif fname == "end_ts":
363 6a290889 Guido Trotter
        row.append(self.end_timestamp)
364 6a290889 Guido Trotter
      elif fname == "summary":
365 6a290889 Guido Trotter
        row.append([op.input.Summary() for op in self.ops])
366 6a290889 Guido Trotter
      else:
367 6a290889 Guido Trotter
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
368 6a290889 Guido Trotter
    return row
369 6a290889 Guido Trotter
370 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
371 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
372 34327f51 Iustin Pop

373 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
374 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
375 34327f51 Iustin Pop
    finalised are not changed.
376 34327f51 Iustin Pop

377 34327f51 Iustin Pop
    @param status: a given opcode status
378 34327f51 Iustin Pop
    @param result: the opcode result
379 34327f51 Iustin Pop

380 34327f51 Iustin Pop
    """
381 39ed3a98 Guido Trotter
    try:
382 39ed3a98 Guido Trotter
      not_marked = True
383 39ed3a98 Guido Trotter
      for op in self.ops:
384 39ed3a98 Guido Trotter
        if op.status in constants.OPS_FINALIZED:
385 39ed3a98 Guido Trotter
          assert not_marked, "Finalized opcodes found after non-finalized ones"
386 39ed3a98 Guido Trotter
          continue
387 39ed3a98 Guido Trotter
        op.status = status
388 39ed3a98 Guido Trotter
        op.result = result
389 39ed3a98 Guido Trotter
        not_marked = False
390 39ed3a98 Guido Trotter
    finally:
391 39ed3a98 Guido Trotter
      self.queue.UpdateJobUnlocked(self)
392 34327f51 Iustin Pop
393 f1048938 Iustin Pop
394 ef2df7d3 Michael Hanselmann
class _OpExecCallbacks(mcpu.OpExecCbBase):
395 031a3e57 Michael Hanselmann
  def __init__(self, queue, job, op):
396 031a3e57 Michael Hanselmann
    """Initializes this class.
397 ea03467c Iustin Pop

398 031a3e57 Michael Hanselmann
    @type queue: L{JobQueue}
399 031a3e57 Michael Hanselmann
    @param queue: Job queue
400 031a3e57 Michael Hanselmann
    @type job: L{_QueuedJob}
401 031a3e57 Michael Hanselmann
    @param job: Job object
402 031a3e57 Michael Hanselmann
    @type op: L{_QueuedOpCode}
403 031a3e57 Michael Hanselmann
    @param op: OpCode
404 031a3e57 Michael Hanselmann

405 031a3e57 Michael Hanselmann
    """
406 031a3e57 Michael Hanselmann
    assert queue, "Queue is missing"
407 031a3e57 Michael Hanselmann
    assert job, "Job is missing"
408 031a3e57 Michael Hanselmann
    assert op, "Opcode is missing"
409 031a3e57 Michael Hanselmann
410 031a3e57 Michael Hanselmann
    self._queue = queue
411 031a3e57 Michael Hanselmann
    self._job = job
412 031a3e57 Michael Hanselmann
    self._op = op
413 031a3e57 Michael Hanselmann
414 dc1e2262 Michael Hanselmann
  def _CheckCancel(self):
415 dc1e2262 Michael Hanselmann
    """Raises an exception to cancel the job if asked to.
416 dc1e2262 Michael Hanselmann

417 dc1e2262 Michael Hanselmann
    """
418 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
419 dc1e2262 Michael Hanselmann
    if self._op.status == constants.OP_STATUS_CANCELING:
420 dc1e2262 Michael Hanselmann
      logging.debug("Canceling opcode")
421 dc1e2262 Michael Hanselmann
      raise CancelJob()
422 dc1e2262 Michael Hanselmann
423 271daef8 Iustin Pop
  @locking.ssynchronized(_QUEUE, shared=1)
424 031a3e57 Michael Hanselmann
  def NotifyStart(self):
425 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
426 e92376d7 Iustin Pop

427 031a3e57 Michael Hanselmann
    This is called from the mcpu code as a notifier function, when the LU is
428 031a3e57 Michael Hanselmann
    finally about to start the Exec() method. Of course, to have end-user
429 031a3e57 Michael Hanselmann
    visible results, the opcode must be initially (before calling into
430 031a3e57 Michael Hanselmann
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
431 e92376d7 Iustin Pop

432 e92376d7 Iustin Pop
    """
433 9bdab621 Michael Hanselmann
    assert self._op in self._job.ops
434 271daef8 Iustin Pop
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
435 271daef8 Iustin Pop
                               constants.OP_STATUS_CANCELING)
436 fbf0262f Michael Hanselmann
437 271daef8 Iustin Pop
    # Cancel here if we were asked to
438 dc1e2262 Michael Hanselmann
    self._CheckCancel()
439 fbf0262f Michael Hanselmann
440 e35344b4 Michael Hanselmann
    logging.debug("Opcode is now running")
441 9bdab621 Michael Hanselmann
442 271daef8 Iustin Pop
    self._op.status = constants.OP_STATUS_RUNNING
443 271daef8 Iustin Pop
    self._op.exec_timestamp = TimeStampNow()
444 271daef8 Iustin Pop
445 271daef8 Iustin Pop
    # And finally replicate the job status
446 271daef8 Iustin Pop
    self._queue.UpdateJobUnlocked(self._job)
447 031a3e57 Michael Hanselmann
448 ebb80afa Guido Trotter
  @locking.ssynchronized(_QUEUE, shared=1)
449 9bf5e01f Guido Trotter
  def _AppendFeedback(self, timestamp, log_type, log_msg):
450 9bf5e01f Guido Trotter
    """Internal feedback append function, with locks
451 9bf5e01f Guido Trotter

452 9bf5e01f Guido Trotter
    """
453 9bf5e01f Guido Trotter
    self._job.log_serial += 1
454 9bf5e01f Guido Trotter
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
455 9bf5e01f Guido Trotter
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
456 9bf5e01f Guido Trotter
457 031a3e57 Michael Hanselmann
  def Feedback(self, *args):
458 031a3e57 Michael Hanselmann
    """Append a log entry.
459 031a3e57 Michael Hanselmann

460 031a3e57 Michael Hanselmann
    """
461 031a3e57 Michael Hanselmann
    assert len(args) < 3
462 031a3e57 Michael Hanselmann
463 031a3e57 Michael Hanselmann
    if len(args) == 1:
464 031a3e57 Michael Hanselmann
      log_type = constants.ELOG_MESSAGE
465 031a3e57 Michael Hanselmann
      log_msg = args[0]
466 031a3e57 Michael Hanselmann
    else:
467 031a3e57 Michael Hanselmann
      (log_type, log_msg) = args
468 031a3e57 Michael Hanselmann
469 031a3e57 Michael Hanselmann
    # The time is split to make serialization easier and not lose
470 031a3e57 Michael Hanselmann
    # precision.
471 031a3e57 Michael Hanselmann
    timestamp = utils.SplitTime(time.time())
472 9bf5e01f Guido Trotter
    self._AppendFeedback(timestamp, log_type, log_msg)
473 031a3e57 Michael Hanselmann
474 ef2df7d3 Michael Hanselmann
  def ReportLocks(self, msg):
475 ef2df7d3 Michael Hanselmann
    """Write locking information to the job.
476 ef2df7d3 Michael Hanselmann

477 ef2df7d3 Michael Hanselmann
    Called whenever the LU processor is waiting for a lock or has acquired one.
478 ef2df7d3 Michael Hanselmann

479 ef2df7d3 Michael Hanselmann
    """
480 dc1e2262 Michael Hanselmann
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
481 dc1e2262 Michael Hanselmann
                               constants.OP_STATUS_CANCELING)
482 dc1e2262 Michael Hanselmann
483 dc1e2262 Michael Hanselmann
    # Cancel here if we were asked to
484 dc1e2262 Michael Hanselmann
    self._CheckCancel()
485 dc1e2262 Michael Hanselmann
486 031a3e57 Michael Hanselmann
487 989a8bee Michael Hanselmann
class _JobChangesChecker(object):
488 989a8bee Michael Hanselmann
  def __init__(self, fields, prev_job_info, prev_log_serial):
489 989a8bee Michael Hanselmann
    """Initializes this class.
490 6c2549d6 Guido Trotter

491 989a8bee Michael Hanselmann
    @type fields: list of strings
492 989a8bee Michael Hanselmann
    @param fields: Fields requested by LUXI client
493 989a8bee Michael Hanselmann
    @type prev_job_info: string
494 989a8bee Michael Hanselmann
    @param prev_job_info: previous job info, as passed by the LUXI client
495 989a8bee Michael Hanselmann
    @type prev_log_serial: string
496 989a8bee Michael Hanselmann
    @param prev_log_serial: previous job serial, as passed by the LUXI client
497 6c2549d6 Guido Trotter

498 989a8bee Michael Hanselmann
    """
499 989a8bee Michael Hanselmann
    self._fields = fields
500 989a8bee Michael Hanselmann
    self._prev_job_info = prev_job_info
501 989a8bee Michael Hanselmann
    self._prev_log_serial = prev_log_serial
502 6c2549d6 Guido Trotter
503 989a8bee Michael Hanselmann
  def __call__(self, job):
504 989a8bee Michael Hanselmann
    """Checks whether job has changed.
505 6c2549d6 Guido Trotter

506 989a8bee Michael Hanselmann
    @type job: L{_QueuedJob}
507 989a8bee Michael Hanselmann
    @param job: Job object
508 6c2549d6 Guido Trotter

509 6c2549d6 Guido Trotter
    """
510 989a8bee Michael Hanselmann
    status = job.CalcStatus()
511 989a8bee Michael Hanselmann
    job_info = job.GetInfo(self._fields)
512 989a8bee Michael Hanselmann
    log_entries = job.GetLogEntries(self._prev_log_serial)
513 6c2549d6 Guido Trotter
514 6c2549d6 Guido Trotter
    # Serializing and deserializing data can cause type changes (e.g. from
515 6c2549d6 Guido Trotter
    # tuple to list) or precision loss. We're doing it here so that we get
516 6c2549d6 Guido Trotter
    # the same modifications as the data received from the client. Without
517 6c2549d6 Guido Trotter
    # this, the comparison afterwards might fail without the data being
518 6c2549d6 Guido Trotter
    # significantly different.
519 6c2549d6 Guido Trotter
    # TODO: we just deserialized from disk, investigate how to make sure that
520 6c2549d6 Guido Trotter
    # the job info and log entries are compatible to avoid this further step.
521 989a8bee Michael Hanselmann
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
522 989a8bee Michael Hanselmann
    # efficient, though floats will be tricky
523 989a8bee Michael Hanselmann
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
524 989a8bee Michael Hanselmann
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
525 6c2549d6 Guido Trotter
526 6c2549d6 Guido Trotter
    # Don't even try to wait if the job is no longer running, there will be
527 6c2549d6 Guido Trotter
    # no changes.
528 989a8bee Michael Hanselmann
    if (status not in (constants.JOB_STATUS_QUEUED,
529 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_RUNNING,
530 989a8bee Michael Hanselmann
                       constants.JOB_STATUS_WAITLOCK) or
531 989a8bee Michael Hanselmann
        job_info != self._prev_job_info or
532 989a8bee Michael Hanselmann
        (log_entries and self._prev_log_serial != log_entries[0][0])):
533 989a8bee Michael Hanselmann
      logging.debug("Job %s changed", job.id)
534 989a8bee Michael Hanselmann
      return (job_info, log_entries)
535 6c2549d6 Guido Trotter
536 989a8bee Michael Hanselmann
    return None
537 989a8bee Michael Hanselmann
538 989a8bee Michael Hanselmann
539 989a8bee Michael Hanselmann
class _JobFileChangesWaiter(object):
540 989a8bee Michael Hanselmann
  def __init__(self, filename):
541 989a8bee Michael Hanselmann
    """Initializes this class.
542 989a8bee Michael Hanselmann

543 989a8bee Michael Hanselmann
    @type filename: string
544 989a8bee Michael Hanselmann
    @param filename: Path to job file
545 989a8bee Michael Hanselmann
    @raises errors.InotifyError: if the notifier cannot be setup
546 6c2549d6 Guido Trotter

547 989a8bee Michael Hanselmann
    """
548 989a8bee Michael Hanselmann
    self._wm = pyinotify.WatchManager()
549 989a8bee Michael Hanselmann
    self._inotify_handler = \
550 989a8bee Michael Hanselmann
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
551 989a8bee Michael Hanselmann
    self._notifier = \
552 989a8bee Michael Hanselmann
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
553 989a8bee Michael Hanselmann
    try:
554 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
555 989a8bee Michael Hanselmann
    except Exception:
556 989a8bee Michael Hanselmann
      # pyinotify doesn't close file descriptors automatically
557 989a8bee Michael Hanselmann
      self._notifier.stop()
558 989a8bee Michael Hanselmann
      raise
559 989a8bee Michael Hanselmann
560 989a8bee Michael Hanselmann
  def _OnInotify(self, notifier_enabled):
561 989a8bee Michael Hanselmann
    """Callback for inotify.
562 989a8bee Michael Hanselmann

563 989a8bee Michael Hanselmann
    """
564 6c2549d6 Guido Trotter
    if not notifier_enabled:
565 989a8bee Michael Hanselmann
      self._inotify_handler.enable()
566 989a8bee Michael Hanselmann
567 989a8bee Michael Hanselmann
  def Wait(self, timeout):
568 989a8bee Michael Hanselmann
    """Waits for the job file to change.
569 989a8bee Michael Hanselmann

570 989a8bee Michael Hanselmann
    @type timeout: float
571 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
572 989a8bee Michael Hanselmann
    @return: Whether there have been events
573 989a8bee Michael Hanselmann

574 989a8bee Michael Hanselmann
    """
575 989a8bee Michael Hanselmann
    assert timeout >= 0
576 989a8bee Michael Hanselmann
    have_events = self._notifier.check_events(timeout * 1000)
577 989a8bee Michael Hanselmann
    if have_events:
578 989a8bee Michael Hanselmann
      self._notifier.read_events()
579 989a8bee Michael Hanselmann
    self._notifier.process_events()
580 989a8bee Michael Hanselmann
    return have_events
581 989a8bee Michael Hanselmann
582 989a8bee Michael Hanselmann
  def Close(self):
583 989a8bee Michael Hanselmann
    """Closes underlying notifier and its file descriptor.
584 989a8bee Michael Hanselmann

585 989a8bee Michael Hanselmann
    """
586 989a8bee Michael Hanselmann
    self._notifier.stop()
587 989a8bee Michael Hanselmann
588 989a8bee Michael Hanselmann
589 989a8bee Michael Hanselmann
class _JobChangesWaiter(object):
590 989a8bee Michael Hanselmann
  def __init__(self, filename):
591 989a8bee Michael Hanselmann
    """Initializes this class.
592 989a8bee Michael Hanselmann

593 989a8bee Michael Hanselmann
    @type filename: string
594 989a8bee Michael Hanselmann
    @param filename: Path to job file
595 989a8bee Michael Hanselmann

596 989a8bee Michael Hanselmann
    """
597 989a8bee Michael Hanselmann
    self._filewaiter = None
598 989a8bee Michael Hanselmann
    self._filename = filename
599 6c2549d6 Guido Trotter
600 989a8bee Michael Hanselmann
  def Wait(self, timeout):
601 989a8bee Michael Hanselmann
    """Waits for a job to change.
602 6c2549d6 Guido Trotter

603 989a8bee Michael Hanselmann
    @type timeout: float
604 989a8bee Michael Hanselmann
    @param timeout: Timeout in seconds
605 989a8bee Michael Hanselmann
    @return: Whether there have been events
606 989a8bee Michael Hanselmann

607 989a8bee Michael Hanselmann
    """
608 989a8bee Michael Hanselmann
    if self._filewaiter:
609 989a8bee Michael Hanselmann
      return self._filewaiter.Wait(timeout)
610 989a8bee Michael Hanselmann
611 989a8bee Michael Hanselmann
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
612 989a8bee Michael Hanselmann
    # If this point is reached, return immediately and let caller check the job
613 989a8bee Michael Hanselmann
    # file again in case there were changes since the last check. This avoids a
614 989a8bee Michael Hanselmann
    # race condition.
615 989a8bee Michael Hanselmann
    self._filewaiter = _JobFileChangesWaiter(self._filename)
616 989a8bee Michael Hanselmann
617 989a8bee Michael Hanselmann
    return True
618 989a8bee Michael Hanselmann
619 989a8bee Michael Hanselmann
  def Close(self):
620 989a8bee Michael Hanselmann
    """Closes underlying waiter.
621 989a8bee Michael Hanselmann

622 989a8bee Michael Hanselmann
    """
623 989a8bee Michael Hanselmann
    if self._filewaiter:
624 989a8bee Michael Hanselmann
      self._filewaiter.Close()
625 989a8bee Michael Hanselmann
626 989a8bee Michael Hanselmann
627 989a8bee Michael Hanselmann
class _WaitForJobChangesHelper(object):
628 989a8bee Michael Hanselmann
  """Helper class using inotify to wait for changes in a job file.
629 989a8bee Michael Hanselmann

630 989a8bee Michael Hanselmann
  This class takes a previous job status and serial, and alerts the client when
631 989a8bee Michael Hanselmann
  the current job status has changed.
632 989a8bee Michael Hanselmann

633 989a8bee Michael Hanselmann
  """
634 989a8bee Michael Hanselmann
  @staticmethod
635 989a8bee Michael Hanselmann
  def _CheckForChanges(job_load_fn, check_fn):
636 989a8bee Michael Hanselmann
    job = job_load_fn()
637 989a8bee Michael Hanselmann
    if not job:
638 989a8bee Michael Hanselmann
      raise errors.JobLost()
639 989a8bee Michael Hanselmann
640 989a8bee Michael Hanselmann
    result = check_fn(job)
641 989a8bee Michael Hanselmann
    if result is None:
642 989a8bee Michael Hanselmann
      raise utils.RetryAgain()
643 989a8bee Michael Hanselmann
644 989a8bee Michael Hanselmann
    return result
645 989a8bee Michael Hanselmann
646 989a8bee Michael Hanselmann
  def __call__(self, filename, job_load_fn,
647 989a8bee Michael Hanselmann
               fields, prev_job_info, prev_log_serial, timeout):
648 989a8bee Michael Hanselmann
    """Waits for changes on a job.
649 989a8bee Michael Hanselmann

650 989a8bee Michael Hanselmann
    @type filename: string
651 989a8bee Michael Hanselmann
    @param filename: File on which to wait for changes
652 989a8bee Michael Hanselmann
    @type job_load_fn: callable
653 989a8bee Michael Hanselmann
    @param job_load_fn: Function to load job
654 989a8bee Michael Hanselmann
    @type fields: list of strings
655 989a8bee Michael Hanselmann
    @param fields: Which fields to check for changes
656 989a8bee Michael Hanselmann
    @type prev_job_info: list or None
657 989a8bee Michael Hanselmann
    @param prev_job_info: Last job information returned
658 989a8bee Michael Hanselmann
    @type prev_log_serial: int
659 989a8bee Michael Hanselmann
    @param prev_log_serial: Last job message serial number
660 989a8bee Michael Hanselmann
    @type timeout: float
661 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
662 989a8bee Michael Hanselmann

663 989a8bee Michael Hanselmann
    """
664 6c2549d6 Guido Trotter
    try:
665 989a8bee Michael Hanselmann
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
666 989a8bee Michael Hanselmann
      waiter = _JobChangesWaiter(filename)
667 989a8bee Michael Hanselmann
      try:
668 989a8bee Michael Hanselmann
        return utils.Retry(compat.partial(self._CheckForChanges,
669 989a8bee Michael Hanselmann
                                          job_load_fn, check_fn),
670 989a8bee Michael Hanselmann
                           utils.RETRY_REMAINING_TIME, timeout,
671 989a8bee Michael Hanselmann
                           wait_fn=waiter.Wait)
672 989a8bee Michael Hanselmann
      finally:
673 989a8bee Michael Hanselmann
        waiter.Close()
674 6c2549d6 Guido Trotter
    except (errors.InotifyError, errors.JobLost):
675 6c2549d6 Guido Trotter
      return None
676 6c2549d6 Guido Trotter
    except utils.RetryTimeout:
677 6c2549d6 Guido Trotter
      return constants.JOB_NOTCHANGED
678 6c2549d6 Guido Trotter
679 6c2549d6 Guido Trotter
680 6760e4ed Michael Hanselmann
def _EncodeOpError(err):
681 6760e4ed Michael Hanselmann
  """Encodes an error which occurred while processing an opcode.
682 6760e4ed Michael Hanselmann

683 6760e4ed Michael Hanselmann
  """
684 6760e4ed Michael Hanselmann
  if isinstance(err, errors.GenericError):
685 6760e4ed Michael Hanselmann
    to_encode = err
686 6760e4ed Michael Hanselmann
  else:
687 6760e4ed Michael Hanselmann
    to_encode = errors.OpExecError(str(err))
688 6760e4ed Michael Hanselmann
689 6760e4ed Michael Hanselmann
  return errors.EncodeException(to_encode)
690 6760e4ed Michael Hanselmann
691 6760e4ed Michael Hanselmann
692 031a3e57 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
693 031a3e57 Michael Hanselmann
  """The actual job workers.
694 031a3e57 Michael Hanselmann

695 031a3e57 Michael Hanselmann
  """
696 7260cfbe Iustin Pop
  def RunTask(self, job): # pylint: disable-msg=W0221
697 e2715f69 Michael Hanselmann
    """Job executor.
698 e2715f69 Michael Hanselmann

699 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
700 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
701 e2715f69 Michael Hanselmann

702 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
703 ea03467c Iustin Pop
    @param job: the job to be processed
704 ea03467c Iustin Pop

705 e2715f69 Michael Hanselmann
    """
706 daba67c7 Michael Hanselmann
    self.SetTaskName("Job%s" % job.id)
707 daba67c7 Michael Hanselmann
708 02fc74da Michael Hanselmann
    logging.info("Processing job %s", job.id)
709 adfa97e3 Guido Trotter
    proc = mcpu.Processor(self.pool.queue.context, job.id)
710 031a3e57 Michael Hanselmann
    queue = job.queue
711 e2715f69 Michael Hanselmann
    try:
712 85f03e0d Michael Hanselmann
      try:
713 85f03e0d Michael Hanselmann
        count = len(job.ops)
714 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
715 d21d09d6 Iustin Pop
          op_summary = op.input.Summary()
716 f6424741 Iustin Pop
          if op.status == constants.OP_STATUS_SUCCESS:
717 f6424741 Iustin Pop
            # this is a job that was partially completed before master
718 f6424741 Iustin Pop
            # daemon shutdown, so it can be expected that some opcodes
719 f6424741 Iustin Pop
            # are already completed successfully (if any did error
720 f6424741 Iustin Pop
            # out, then the whole job should have been aborted and not
721 f6424741 Iustin Pop
            # resubmitted for processing)
722 f6424741 Iustin Pop
            logging.info("Op %s/%s: opcode %s already processed, skipping",
723 f6424741 Iustin Pop
                         idx + 1, count, op_summary)
724 f6424741 Iustin Pop
            continue
725 85f03e0d Michael Hanselmann
          try:
726 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
727 d21d09d6 Iustin Pop
                         op_summary)
728 85f03e0d Michael Hanselmann
729 3c0d60d0 Guido Trotter
            queue.acquire(shared=1)
730 85f03e0d Michael Hanselmann
            try:
731 df0fb067 Iustin Pop
              if op.status == constants.OP_STATUS_CANCELED:
732 e35344b4 Michael Hanselmann
                logging.debug("Canceling opcode")
733 df0fb067 Iustin Pop
                raise CancelJob()
734 fbf0262f Michael Hanselmann
              assert op.status == constants.OP_STATUS_QUEUED
735 e35344b4 Michael Hanselmann
              logging.debug("Opcode %s/%s waiting for locks",
736 e35344b4 Michael Hanselmann
                            idx + 1, count)
737 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
738 85f03e0d Michael Hanselmann
              op.result = None
739 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
740 c56ec146 Iustin Pop
              if idx == 0: # first opcode
741 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
742 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
743 85f03e0d Michael Hanselmann
744 38206f3c Iustin Pop
              input_opcode = op.input
745 85f03e0d Michael Hanselmann
            finally:
746 85f03e0d Michael Hanselmann
              queue.release()
747 85f03e0d Michael Hanselmann
748 031a3e57 Michael Hanselmann
            # Make sure not to hold queue lock while calling ExecOpCode
749 031a3e57 Michael Hanselmann
            result = proc.ExecOpCode(input_opcode,
750 ef2df7d3 Michael Hanselmann
                                     _OpExecCallbacks(queue, job, op))
751 85f03e0d Michael Hanselmann
752 3c0d60d0 Guido Trotter
            queue.acquire(shared=1)
753 85f03e0d Michael Hanselmann
            try:
754 e35344b4 Michael Hanselmann
              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
755 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
756 85f03e0d Michael Hanselmann
              op.result = result
757 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
758 6ea72e43 Michael Hanselmann
              if idx == count - 1:
759 6ea72e43 Michael Hanselmann
                job.end_timestamp = TimeStampNow()
760 963a068b Michael Hanselmann
761 963a068b Michael Hanselmann
                # Consistency check
762 963a068b Michael Hanselmann
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
763 963a068b Michael Hanselmann
                                  for i in job.ops)
764 963a068b Michael Hanselmann
765 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
766 85f03e0d Michael Hanselmann
            finally:
767 85f03e0d Michael Hanselmann
              queue.release()
768 85f03e0d Michael Hanselmann
769 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Successfully finished opcode %s",
770 d21d09d6 Iustin Pop
                         idx + 1, count, op_summary)
771 fbf0262f Michael Hanselmann
          except CancelJob:
772 fbf0262f Michael Hanselmann
            # Will be handled further up
773 fbf0262f Michael Hanselmann
            raise
774 85f03e0d Michael Hanselmann
          except Exception, err:
775 3c0d60d0 Guido Trotter
            queue.acquire(shared=1)
776 85f03e0d Michael Hanselmann
            try:
777 85f03e0d Michael Hanselmann
              try:
778 e35344b4 Michael Hanselmann
                logging.debug("Opcode %s/%s failed", idx + 1, count)
779 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
780 6760e4ed Michael Hanselmann
                op.result = _EncodeOpError(err)
781 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
782 0f6be82a Iustin Pop
                logging.info("Op %s/%s: Error in opcode %s: %s",
783 0f6be82a Iustin Pop
                             idx + 1, count, op_summary, err)
784 963a068b Michael Hanselmann
785 963a068b Michael Hanselmann
                to_encode = errors.OpExecError("Preceding opcode failed")
786 963a068b Michael Hanselmann
                job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
787 6760e4ed Michael Hanselmann
                                      _EncodeOpError(to_encode))
788 963a068b Michael Hanselmann
789 963a068b Michael Hanselmann
                # Consistency check
790 963a068b Michael Hanselmann
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
791 963a068b Michael Hanselmann
                                  for i in job.ops[:idx])
792 963a068b Michael Hanselmann
                assert compat.all(i.status == constants.OP_STATUS_ERROR and
793 963a068b Michael Hanselmann
                                  errors.GetEncodedError(i.result)
794 963a068b Michael Hanselmann
                                  for i in job.ops[idx:])
795 85f03e0d Michael Hanselmann
              finally:
796 6ea72e43 Michael Hanselmann
                job.end_timestamp = TimeStampNow()
797 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
798 85f03e0d Michael Hanselmann
            finally:
799 85f03e0d Michael Hanselmann
              queue.release()
800 85f03e0d Michael Hanselmann
            raise
801 85f03e0d Michael Hanselmann
802 fbf0262f Michael Hanselmann
      except CancelJob:
803 3c0d60d0 Guido Trotter
        queue.acquire(shared=1)
804 fbf0262f Michael Hanselmann
        try:
805 39ed3a98 Guido Trotter
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
806 39ed3a98 Guido Trotter
                                "Job canceled by request")
807 6ea72e43 Michael Hanselmann
          job.end_timestamp = TimeStampNow()
808 6ea72e43 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
809 fbf0262f Michael Hanselmann
        finally:
810 fbf0262f Michael Hanselmann
          queue.release()
811 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
812 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
813 85f03e0d Michael Hanselmann
      except:
814 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
815 e2715f69 Michael Hanselmann
    finally:
816 6ea72e43 Michael Hanselmann
      status = job.CalcStatus()
817 6ea72e43 Michael Hanselmann
      logging.info("Finished job %s, status = %s", job.id, status)
818 e2715f69 Michael Hanselmann
819 e2715f69 Michael Hanselmann
820 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
821 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
822 ea03467c Iustin Pop

823 ea03467c Iustin Pop
  """
824 5bdce580 Michael Hanselmann
  def __init__(self, queue):
825 89e2b4d2 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
826 89e2b4d2 Michael Hanselmann
                                              JOBQUEUE_THREADS,
827 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
828 5bdce580 Michael Hanselmann
    self.queue = queue
829 e2715f69 Michael Hanselmann
830 e2715f69 Michael Hanselmann
831 6c881c52 Iustin Pop
def _RequireOpenQueue(fn):
832 6c881c52 Iustin Pop
  """Decorator for "public" functions.
833 ea03467c Iustin Pop

834 6c881c52 Iustin Pop
  This function should be used for all 'public' functions. That is,
835 6c881c52 Iustin Pop
  functions usually called from other classes. Note that this should
836 6c881c52 Iustin Pop
  be applied only to methods (not plain functions), since it expects
837 6c881c52 Iustin Pop
  that the decorated function is called with a first argument that has
838 a71f9c7d Guido Trotter
  a '_queue_filelock' argument.
839 ea03467c Iustin Pop

840 99bd4f0a Guido Trotter
  @warning: Use this decorator only after locking.ssynchronized
841 f1da30e6 Michael Hanselmann

842 6c881c52 Iustin Pop
  Example::
843 ebb80afa Guido Trotter
    @locking.ssynchronized(_LOCK)
844 6c881c52 Iustin Pop
    @_RequireOpenQueue
845 6c881c52 Iustin Pop
    def Example(self):
846 6c881c52 Iustin Pop
      pass
847 db37da70 Michael Hanselmann

848 6c881c52 Iustin Pop
  """
849 6c881c52 Iustin Pop
  def wrapper(self, *args, **kwargs):
850 7260cfbe Iustin Pop
    # pylint: disable-msg=W0212
851 a71f9c7d Guido Trotter
    assert self._queue_filelock is not None, "Queue should be open"
852 6c881c52 Iustin Pop
    return fn(self, *args, **kwargs)
853 6c881c52 Iustin Pop
  return wrapper
854 db37da70 Michael Hanselmann
855 db37da70 Michael Hanselmann
856 6c881c52 Iustin Pop
class JobQueue(object):
857 6c881c52 Iustin Pop
  """Queue used to manage the jobs.
858 db37da70 Michael Hanselmann

859 6c881c52 Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
860 6c881c52 Iustin Pop

861 6c881c52 Iustin Pop
  """
862 6c881c52 Iustin Pop
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
863 db37da70 Michael Hanselmann
864 85f03e0d Michael Hanselmann
  def __init__(self, context):
865 ea03467c Iustin Pop
    """Constructor for JobQueue.
866 ea03467c Iustin Pop

867 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
868 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
869 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
870 ea03467c Iustin Pop
    running).
871 ea03467c Iustin Pop

872 ea03467c Iustin Pop
    @type context: GanetiContext
873 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
874 ea03467c Iustin Pop
        data and other ganeti objects
875 ea03467c Iustin Pop

876 ea03467c Iustin Pop
    """
877 5bdce580 Michael Hanselmann
    self.context = context
878 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
879 b705c7a6 Manuel Franceschini
    self._my_hostname = netutils.Hostname.GetSysName()
880 f1da30e6 Michael Hanselmann
881 ebb80afa Guido Trotter
    # The Big JobQueue lock. If a code block or method acquires it in shared
882 ebb80afa Guido Trotter
    # mode safe it must guarantee concurrency with all the code acquiring it in
883 ebb80afa Guido Trotter
    # shared mode, including itself. In order not to acquire it at all
884 ebb80afa Guido Trotter
    # concurrency must be guaranteed with all code acquiring it in shared mode
885 ebb80afa Guido Trotter
    # and all code acquiring it exclusively.
886 7f93570a Iustin Pop
    self._lock = locking.SharedLock("JobQueue")
887 ebb80afa Guido Trotter
888 ebb80afa Guido Trotter
    self.acquire = self._lock.acquire
889 ebb80afa Guido Trotter
    self.release = self._lock.release
890 85f03e0d Michael Hanselmann
891 a71f9c7d Guido Trotter
    # Initialize the queue, and acquire the filelock.
892 a71f9c7d Guido Trotter
    # This ensures no other process is working on the job queue.
893 a71f9c7d Guido Trotter
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
894 f1da30e6 Michael Hanselmann
895 04ab05ce Michael Hanselmann
    # Read serial file
896 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
897 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
898 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
899 c4beba1c Iustin Pop
900 23752136 Michael Hanselmann
    # Get initial list of nodes
901 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
902 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
903 59303563 Iustin Pop
                       if n.master_candidate)
904 8e00939c Michael Hanselmann
905 8e00939c Michael Hanselmann
    # Remove master node
906 d8e0dc17 Guido Trotter
    self._nodes.pop(self._my_hostname, None)
907 23752136 Michael Hanselmann
908 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
909 23752136 Michael Hanselmann
910 20571a26 Guido Trotter
    self._queue_size = 0
911 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
912 20571a26 Guido Trotter
    self._drained = self._IsQueueMarkedDrain()
913 20571a26 Guido Trotter
914 85f03e0d Michael Hanselmann
    # Setup worker pool
915 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
916 85f03e0d Michael Hanselmann
    try:
917 16714921 Michael Hanselmann
      # We need to lock here because WorkerPool.AddTask() may start a job while
918 16714921 Michael Hanselmann
      # we're still doing our work.
919 16714921 Michael Hanselmann
      self.acquire()
920 16714921 Michael Hanselmann
      try:
921 711b5124 Michael Hanselmann
        logging.info("Inspecting job queue")
922 711b5124 Michael Hanselmann
923 711b5124 Michael Hanselmann
        all_job_ids = self._GetJobIDsUnlocked()
924 b7cb9024 Michael Hanselmann
        jobs_count = len(all_job_ids)
925 711b5124 Michael Hanselmann
        lastinfo = time.time()
926 711b5124 Michael Hanselmann
        for idx, job_id in enumerate(all_job_ids):
927 711b5124 Michael Hanselmann
          # Give an update every 1000 jobs or 10 seconds
928 b7cb9024 Michael Hanselmann
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
929 b7cb9024 Michael Hanselmann
              idx == (jobs_count - 1)):
930 711b5124 Michael Hanselmann
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
931 b7cb9024 Michael Hanselmann
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
932 711b5124 Michael Hanselmann
            lastinfo = time.time()
933 711b5124 Michael Hanselmann
934 711b5124 Michael Hanselmann
          job = self._LoadJobUnlocked(job_id)
935 711b5124 Michael Hanselmann
936 16714921 Michael Hanselmann
          # a failure in loading the job can cause 'None' to be returned
937 16714921 Michael Hanselmann
          if job is None:
938 16714921 Michael Hanselmann
            continue
939 94ed59a5 Iustin Pop
940 16714921 Michael Hanselmann
          status = job.CalcStatus()
941 85f03e0d Michael Hanselmann
942 16714921 Michael Hanselmann
          if status in (constants.JOB_STATUS_QUEUED, ):
943 b2e8a4d9 Michael Hanselmann
            self._wpool.AddTask((job, ))
944 85f03e0d Michael Hanselmann
945 16714921 Michael Hanselmann
          elif status in (constants.JOB_STATUS_RUNNING,
946 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK,
947 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_CANCELING):
948 16714921 Michael Hanselmann
            logging.warning("Unfinished job %s found: %s", job.id, job)
949 39ed3a98 Guido Trotter
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
950 39ed3a98 Guido Trotter
                                  "Unclean master daemon shutdown")
951 711b5124 Michael Hanselmann
952 711b5124 Michael Hanselmann
        logging.info("Job queue inspection finished")
953 16714921 Michael Hanselmann
      finally:
954 16714921 Michael Hanselmann
        self.release()
955 16714921 Michael Hanselmann
    except:
956 16714921 Michael Hanselmann
      self._wpool.TerminateWorkers()
957 16714921 Michael Hanselmann
      raise
958 85f03e0d Michael Hanselmann
959 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
960 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
961 99aabbed Iustin Pop
  def AddNode(self, node):
962 99aabbed Iustin Pop
    """Register a new node with the queue.
963 99aabbed Iustin Pop

964 99aabbed Iustin Pop
    @type node: L{objects.Node}
965 99aabbed Iustin Pop
    @param node: the node object to be added
966 99aabbed Iustin Pop

967 99aabbed Iustin Pop
    """
968 99aabbed Iustin Pop
    node_name = node.name
969 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
970 23752136 Michael Hanselmann
971 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
972 c8457ce7 Iustin Pop
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
973 3cebe102 Michael Hanselmann
    msg = result.fail_msg
974 c8457ce7 Iustin Pop
    if msg:
975 c8457ce7 Iustin Pop
      logging.warning("Cannot cleanup queue directory on node %s: %s",
976 c8457ce7 Iustin Pop
                      node_name, msg)
977 23752136 Michael Hanselmann
978 59303563 Iustin Pop
    if not node.master_candidate:
979 59303563 Iustin Pop
      # remove if existing, ignoring errors
980 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
981 59303563 Iustin Pop
      # and skip the replication of the job ids
982 59303563 Iustin Pop
      return
983 59303563 Iustin Pop
984 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
985 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
986 23752136 Michael Hanselmann
987 d2e03a33 Michael Hanselmann
    # Upload current serial file
988 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
989 d2e03a33 Michael Hanselmann
990 d2e03a33 Michael Hanselmann
    for file_name in files:
991 9f774ee8 Michael Hanselmann
      # Read file content
992 13998ef2 Michael Hanselmann
      content = utils.ReadFile(file_name)
993 9f774ee8 Michael Hanselmann
994 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
995 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
996 a3811745 Michael Hanselmann
                                                  file_name, content)
997 3cebe102 Michael Hanselmann
      msg = result[node_name].fail_msg
998 c8457ce7 Iustin Pop
      if msg:
999 c8457ce7 Iustin Pop
        logging.error("Failed to upload file %s to node %s: %s",
1000 c8457ce7 Iustin Pop
                      file_name, node_name, msg)
1001 d2e03a33 Michael Hanselmann
1002 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
1003 d2e03a33 Michael Hanselmann
1004 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1005 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
1006 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
1007 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
1008 ea03467c Iustin Pop

1009 ea03467c Iustin Pop
    @type node_name: str
1010 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
1011 ea03467c Iustin Pop

1012 ea03467c Iustin Pop
    """
1013 d8e0dc17 Guido Trotter
    self._nodes.pop(node_name, None)
1014 23752136 Michael Hanselmann
1015 7e950d31 Iustin Pop
  @staticmethod
1016 7e950d31 Iustin Pop
  def _CheckRpcResult(result, nodes, failmsg):
1017 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
1018 ea03467c Iustin Pop

1019 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
1020 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
1021 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
1022 ea03467c Iustin Pop

1023 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
1024 ea03467c Iustin Pop
    @type nodes: list
1025 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
1026 ea03467c Iustin Pop
    @type failmsg: str
1027 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
1028 ea03467c Iustin Pop

1029 ea03467c Iustin Pop
    """
1030 e74798c1 Michael Hanselmann
    failed = []
1031 e74798c1 Michael Hanselmann
    success = []
1032 e74798c1 Michael Hanselmann
1033 e74798c1 Michael Hanselmann
    for node in nodes:
1034 3cebe102 Michael Hanselmann
      msg = result[node].fail_msg
1035 c8457ce7 Iustin Pop
      if msg:
1036 e74798c1 Michael Hanselmann
        failed.append(node)
1037 45e0d704 Iustin Pop
        logging.error("RPC call %s (%s) failed on node %s: %s",
1038 45e0d704 Iustin Pop
                      result[node].call, failmsg, node, msg)
1039 c8457ce7 Iustin Pop
      else:
1040 c8457ce7 Iustin Pop
        success.append(node)
1041 e74798c1 Michael Hanselmann
1042 e74798c1 Michael Hanselmann
    # +1 for the master node
1043 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
1044 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
1045 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
1046 e74798c1 Michael Hanselmann
1047 99aabbed Iustin Pop
  def _GetNodeIp(self):
1048 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
1049 99aabbed Iustin Pop

1050 ea03467c Iustin Pop
    @rtype: (list, list)
1051 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
1052 ea03467c Iustin Pop
        names and the second one with the node addresses
1053 ea03467c Iustin Pop

1054 99aabbed Iustin Pop
    """
1055 e35344b4 Michael Hanselmann
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1056 99aabbed Iustin Pop
    name_list = self._nodes.keys()
1057 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
1058 99aabbed Iustin Pop
    return name_list, addr_list
1059 99aabbed Iustin Pop
1060 4c36bdf5 Guido Trotter
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1061 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
1062 8e00939c Michael Hanselmann

1063 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
1064 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
1065 ea03467c Iustin Pop

1066 ea03467c Iustin Pop
    @type file_name: str
1067 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
1068 ea03467c Iustin Pop
    @type data: str
1069 ea03467c Iustin Pop
    @param data: the new contents of the file
1070 4c36bdf5 Guido Trotter
    @type replicate: boolean
1071 4c36bdf5 Guido Trotter
    @param replicate: whether to spread the changes to the remote nodes
1072 ea03467c Iustin Pop

1073 8e00939c Michael Hanselmann
    """
1074 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1075 82b22e19 René Nussbaumer
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1076 82b22e19 René Nussbaumer
                    gid=getents.masterd_gid)
1077 8e00939c Michael Hanselmann
1078 4c36bdf5 Guido Trotter
    if replicate:
1079 4c36bdf5 Guido Trotter
      names, addrs = self._GetNodeIp()
1080 4c36bdf5 Guido Trotter
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1081 4c36bdf5 Guido Trotter
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1082 23752136 Michael Hanselmann
1083 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
1084 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
1085 ea03467c Iustin Pop

1086 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
1087 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
1088 ea03467c Iustin Pop

1089 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
1090 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
1091 ea03467c Iustin Pop

1092 ea03467c Iustin Pop
    """
1093 dd875d32 Michael Hanselmann
    # Rename them locally
1094 d7fd1f28 Michael Hanselmann
    for old, new in rename:
1095 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
1096 abc1f2ce Michael Hanselmann
1097 dd875d32 Michael Hanselmann
    # ... and on all nodes
1098 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
1099 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1100 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1101 abc1f2ce Michael Hanselmann
1102 7e950d31 Iustin Pop
  @staticmethod
1103 7e950d31 Iustin Pop
  def _FormatJobID(job_id):
1104 ea03467c Iustin Pop
    """Convert a job ID to string format.
1105 ea03467c Iustin Pop

1106 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
1107 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
1108 ea03467c Iustin Pop
    abstract this change.
1109 ea03467c Iustin Pop

1110 ea03467c Iustin Pop
    @type job_id: int or long
1111 ea03467c Iustin Pop
    @param job_id: the numeric job id
1112 ea03467c Iustin Pop
    @rtype: str
1113 ea03467c Iustin Pop
    @return: the formatted job id
1114 ea03467c Iustin Pop

1115 ea03467c Iustin Pop
    """
1116 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
1117 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1118 85f03e0d Michael Hanselmann
    if job_id < 0:
1119 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1120 85f03e0d Michael Hanselmann
1121 85f03e0d Michael Hanselmann
    return str(job_id)
1122 85f03e0d Michael Hanselmann
1123 58b22b6e Michael Hanselmann
  @classmethod
1124 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
1125 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
1126 58b22b6e Michael Hanselmann

1127 58b22b6e Michael Hanselmann
    @type job_id: str
1128 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
1129 58b22b6e Michael Hanselmann
    @rtype: str
1130 58b22b6e Michael Hanselmann
    @return: Directory name
1131 58b22b6e Michael Hanselmann

1132 58b22b6e Michael Hanselmann
    """
1133 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1134 58b22b6e Michael Hanselmann
1135 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
1136 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
1137 f1da30e6 Michael Hanselmann

1138 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
1139 f1da30e6 Michael Hanselmann

1140 009e73d0 Iustin Pop
    @type count: integer
1141 009e73d0 Iustin Pop
    @param count: how many serials to return
1142 ea03467c Iustin Pop
    @rtype: str
1143 ea03467c Iustin Pop
    @return: a string representing the job identifier.
1144 f1da30e6 Michael Hanselmann

1145 f1da30e6 Michael Hanselmann
    """
1146 009e73d0 Iustin Pop
    assert count > 0
1147 f1da30e6 Michael Hanselmann
    # New number
1148 009e73d0 Iustin Pop
    serial = self._last_serial + count
1149 f1da30e6 Michael Hanselmann
1150 f1da30e6 Michael Hanselmann
    # Write to file
1151 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1152 4c36bdf5 Guido Trotter
                             "%s\n" % serial, True)
1153 f1da30e6 Michael Hanselmann
1154 009e73d0 Iustin Pop
    result = [self._FormatJobID(v)
1155 009e73d0 Iustin Pop
              for v in range(self._last_serial, serial + 1)]
1156 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
1157 f1da30e6 Michael Hanselmann
    self._last_serial = serial
1158 f1da30e6 Michael Hanselmann
1159 009e73d0 Iustin Pop
    return result
1160 f1da30e6 Michael Hanselmann
1161 85f03e0d Michael Hanselmann
  @staticmethod
1162 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
1163 ea03467c Iustin Pop
    """Returns the job file for a given job id.
1164 ea03467c Iustin Pop

1165 ea03467c Iustin Pop
    @type job_id: str
1166 ea03467c Iustin Pop
    @param job_id: the job identifier
1167 ea03467c Iustin Pop
    @rtype: str
1168 ea03467c Iustin Pop
    @return: the path to the job file
1169 ea03467c Iustin Pop

1170 ea03467c Iustin Pop
    """
1171 c4feafe8 Iustin Pop
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1172 f1da30e6 Michael Hanselmann
1173 58b22b6e Michael Hanselmann
  @classmethod
1174 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
1175 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
1176 ea03467c Iustin Pop

1177 ea03467c Iustin Pop
    @type job_id: str
1178 ea03467c Iustin Pop
    @param job_id: the job identifier
1179 ea03467c Iustin Pop
    @rtype: str
1180 ea03467c Iustin Pop
    @return: the path to the archived job file
1181 ea03467c Iustin Pop

1182 ea03467c Iustin Pop
    """
1183 0411c011 Iustin Pop
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1184 0411c011 Iustin Pop
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1185 0cb94105 Michael Hanselmann
1186 85a1c57d Guido Trotter
  def _GetJobIDsUnlocked(self, sort=True):
1187 911a495b Iustin Pop
    """Return all known job IDs.
1188 911a495b Iustin Pop

1189 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
1190 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
1191 ac0930b9 Iustin Pop
    extra IDs).
1192 ac0930b9 Iustin Pop

1193 85a1c57d Guido Trotter
    @type sort: boolean
1194 85a1c57d Guido Trotter
    @param sort: perform sorting on the returned job ids
1195 ea03467c Iustin Pop
    @rtype: list
1196 ea03467c Iustin Pop
    @return: the list of job IDs
1197 ea03467c Iustin Pop

1198 911a495b Iustin Pop
    """
1199 85a1c57d Guido Trotter
    jlist = []
1200 b5b8309d Guido Trotter
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1201 85a1c57d Guido Trotter
      m = self._RE_JOB_FILE.match(filename)
1202 85a1c57d Guido Trotter
      if m:
1203 85a1c57d Guido Trotter
        jlist.append(m.group(1))
1204 85a1c57d Guido Trotter
    if sort:
1205 85a1c57d Guido Trotter
      jlist = utils.NiceSort(jlist)
1206 f0d874fe Iustin Pop
    return jlist
1207 911a495b Iustin Pop
1208 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
1209 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
1210 ea03467c Iustin Pop

1211 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
1212 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
1213 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
1214 ea03467c Iustin Pop

1215 ea03467c Iustin Pop
    @param job_id: the job id
1216 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
1217 ea03467c Iustin Pop
    @return: either None or the job object
1218 ea03467c Iustin Pop

1219 ea03467c Iustin Pop
    """
1220 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
1221 5685c1a5 Michael Hanselmann
    if job:
1222 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
1223 5685c1a5 Michael Hanselmann
      return job
1224 ac0930b9 Iustin Pop
1225 3d6c5566 Guido Trotter
    try:
1226 3d6c5566 Guido Trotter
      job = self._LoadJobFromDisk(job_id)
1227 aa9f8167 Iustin Pop
      if job is None:
1228 aa9f8167 Iustin Pop
        return job
1229 3d6c5566 Guido Trotter
    except errors.JobFileCorrupted:
1230 3d6c5566 Guido Trotter
      old_path = self._GetJobPath(job_id)
1231 3d6c5566 Guido Trotter
      new_path = self._GetArchivedJobPath(job_id)
1232 3d6c5566 Guido Trotter
      if old_path == new_path:
1233 3d6c5566 Guido Trotter
        # job already archived (future case)
1234 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s", job_id)
1235 3d6c5566 Guido Trotter
      else:
1236 3d6c5566 Guido Trotter
        # non-archived case
1237 3d6c5566 Guido Trotter
        logging.exception("Can't parse job %s, will archive.", job_id)
1238 3d6c5566 Guido Trotter
        self._RenameFilesUnlocked([(old_path, new_path)])
1239 3d6c5566 Guido Trotter
      return None
1240 162c8636 Guido Trotter
1241 162c8636 Guido Trotter
    self._memcache[job_id] = job
1242 162c8636 Guido Trotter
    logging.debug("Added job %s to the cache", job_id)
1243 162c8636 Guido Trotter
    return job
1244 162c8636 Guido Trotter
1245 162c8636 Guido Trotter
  def _LoadJobFromDisk(self, job_id):
1246 162c8636 Guido Trotter
    """Load the given job file from disk.
1247 162c8636 Guido Trotter

1248 162c8636 Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1249 162c8636 Guido Trotter

1250 162c8636 Guido Trotter
    @type job_id: string
1251 162c8636 Guido Trotter
    @param job_id: job identifier
1252 162c8636 Guido Trotter
    @rtype: L{_QueuedJob} or None
1253 162c8636 Guido Trotter
    @return: either None or the job object
1254 162c8636 Guido Trotter

1255 162c8636 Guido Trotter
    """
1256 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
1257 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
1258 f1da30e6 Michael Hanselmann
    try:
1259 13998ef2 Michael Hanselmann
      raw_data = utils.ReadFile(filepath)
1260 162c8636 Guido Trotter
    except EnvironmentError, err:
1261 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
1262 f1da30e6 Michael Hanselmann
        return None
1263 f1da30e6 Michael Hanselmann
      raise
1264 13998ef2 Michael Hanselmann
1265 94ed59a5 Iustin Pop
    try:
1266 162c8636 Guido Trotter
      data = serializer.LoadJson(raw_data)
1267 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
1268 7260cfbe Iustin Pop
    except Exception, err: # pylint: disable-msg=W0703
1269 3d6c5566 Guido Trotter
      raise errors.JobFileCorrupted(err)
1270 94ed59a5 Iustin Pop
1271 ac0930b9 Iustin Pop
    return job
1272 f1da30e6 Michael Hanselmann
1273 0f9c08dc Guido Trotter
  def SafeLoadJobFromDisk(self, job_id):
1274 0f9c08dc Guido Trotter
    """Load the given job file from disk.
1275 0f9c08dc Guido Trotter

1276 0f9c08dc Guido Trotter
    Given a job file, read, load and restore it in a _QueuedJob format.
1277 0f9c08dc Guido Trotter
    In case of error reading the job, it gets returned as None, and the
1278 0f9c08dc Guido Trotter
    exception is logged.
1279 0f9c08dc Guido Trotter

1280 0f9c08dc Guido Trotter
    @type job_id: string
1281 0f9c08dc Guido Trotter
    @param job_id: job identifier
1282 0f9c08dc Guido Trotter
    @rtype: L{_QueuedJob} or None
1283 0f9c08dc Guido Trotter
    @return: either None or the job object
1284 0f9c08dc Guido Trotter

1285 0f9c08dc Guido Trotter
    """
1286 0f9c08dc Guido Trotter
    try:
1287 0f9c08dc Guido Trotter
      return self._LoadJobFromDisk(job_id)
1288 0f9c08dc Guido Trotter
    except (errors.JobFileCorrupted, EnvironmentError):
1289 0f9c08dc Guido Trotter
      logging.exception("Can't load/parse job %s", job_id)
1290 0f9c08dc Guido Trotter
      return None
1291 0f9c08dc Guido Trotter
1292 686d7433 Iustin Pop
  @staticmethod
1293 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
1294 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
1295 686d7433 Iustin Pop

1296 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
1297 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
1298 686d7433 Iustin Pop

1299 ea03467c Iustin Pop
    @rtype: boolean
1300 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
1301 ea03467c Iustin Pop

1302 686d7433 Iustin Pop
    """
1303 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1304 686d7433 Iustin Pop
1305 20571a26 Guido Trotter
  def _UpdateQueueSizeUnlocked(self):
1306 20571a26 Guido Trotter
    """Update the queue size.
1307 20571a26 Guido Trotter

1308 20571a26 Guido Trotter
    """
1309 20571a26 Guido Trotter
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1310 20571a26 Guido Trotter
1311 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1312 20571a26 Guido Trotter
  @_RequireOpenQueue
1313 20571a26 Guido Trotter
  def SetDrainFlag(self, drain_flag):
1314 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
1315 3ccafd0e Iustin Pop

1316 ea03467c Iustin Pop
    @type drain_flag: boolean
1317 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
1318 ea03467c Iustin Pop

1319 3ccafd0e Iustin Pop
    """
1320 82b22e19 René Nussbaumer
    getents = runtime.GetEnts()
1321 82b22e19 René Nussbaumer
1322 3ccafd0e Iustin Pop
    if drain_flag:
1323 82b22e19 René Nussbaumer
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1324 82b22e19 René Nussbaumer
                      uid=getents.masterd_uid, gid=getents.masterd_gid)
1325 3ccafd0e Iustin Pop
    else:
1326 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1327 20571a26 Guido Trotter
1328 20571a26 Guido Trotter
    self._drained = drain_flag
1329 20571a26 Guido Trotter
1330 3ccafd0e Iustin Pop
    return True
1331 3ccafd0e Iustin Pop
1332 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1333 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
1334 85f03e0d Michael Hanselmann
    """Create and store a new job.
1335 f1da30e6 Michael Hanselmann

1336 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
1337 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
1338 c3f0a12f Iustin Pop

1339 009e73d0 Iustin Pop
    @type job_id: job ID
1340 69b99987 Michael Hanselmann
    @param job_id: the job ID for the new job
1341 c3f0a12f Iustin Pop
    @type ops: list
1342 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
1343 7beb1e53 Guido Trotter
    @rtype: L{_QueuedJob}
1344 7beb1e53 Guido Trotter
    @return: the job object to be queued
1345 7beb1e53 Guido Trotter
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1346 7beb1e53 Guido Trotter
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1347 c3f0a12f Iustin Pop

1348 c3f0a12f Iustin Pop
    """
1349 20571a26 Guido Trotter
    # Ok when sharing the big job queue lock, as the drain file is created when
1350 20571a26 Guido Trotter
    # the lock is exclusive.
1351 20571a26 Guido Trotter
    if self._drained:
1352 2971c913 Iustin Pop
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1353 f87b405e Michael Hanselmann
1354 20571a26 Guido Trotter
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1355 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
1356 f87b405e Michael Hanselmann
1357 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
1358 f1da30e6 Michael Hanselmann
1359 f1da30e6 Michael Hanselmann
    # Write to disk
1360 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
1361 f1da30e6 Michael Hanselmann
1362 20571a26 Guido Trotter
    self._queue_size += 1
1363 20571a26 Guido Trotter
1364 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
1365 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
1366 ac0930b9 Iustin Pop
1367 7beb1e53 Guido Trotter
    return job
1368 f1da30e6 Michael Hanselmann
1369 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1370 2971c913 Iustin Pop
  @_RequireOpenQueue
1371 2971c913 Iustin Pop
  def SubmitJob(self, ops):
1372 2971c913 Iustin Pop
    """Create and store a new job.
1373 2971c913 Iustin Pop

1374 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1375 2971c913 Iustin Pop

1376 2971c913 Iustin Pop
    """
1377 009e73d0 Iustin Pop
    job_id = self._NewSerialsUnlocked(1)[0]
1378 b2e8a4d9 Michael Hanselmann
    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1379 7beb1e53 Guido Trotter
    return job_id
1380 2971c913 Iustin Pop
1381 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1382 2971c913 Iustin Pop
  @_RequireOpenQueue
1383 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
1384 2971c913 Iustin Pop
    """Create and store multiple jobs.
1385 2971c913 Iustin Pop

1386 2971c913 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1387 2971c913 Iustin Pop

1388 2971c913 Iustin Pop
    """
1389 2971c913 Iustin Pop
    results = []
1390 7beb1e53 Guido Trotter
    tasks = []
1391 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1392 009e73d0 Iustin Pop
    for job_id, ops in zip(all_job_ids, jobs):
1393 2971c913 Iustin Pop
      try:
1394 7beb1e53 Guido Trotter
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1395 2971c913 Iustin Pop
        status = True
1396 7beb1e53 Guido Trotter
        data = job_id
1397 2971c913 Iustin Pop
      except errors.GenericError, err:
1398 2971c913 Iustin Pop
        data = str(err)
1399 2971c913 Iustin Pop
        status = False
1400 2971c913 Iustin Pop
      results.append((status, data))
1401 7beb1e53 Guido Trotter
    self._wpool.AddManyTasks(tasks)
1402 2971c913 Iustin Pop
1403 2971c913 Iustin Pop
    return results
1404 2971c913 Iustin Pop
1405 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1406 4c36bdf5 Guido Trotter
  def UpdateJobUnlocked(self, job, replicate=True):
1407 ea03467c Iustin Pop
    """Update a job's on disk storage.
1408 ea03467c Iustin Pop

1409 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
1410 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
1411 ea03467c Iustin Pop
    nodes.
1412 ea03467c Iustin Pop

1413 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1414 ea03467c Iustin Pop
    @param job: the changed job
1415 4c36bdf5 Guido Trotter
    @type replicate: boolean
1416 4c36bdf5 Guido Trotter
    @param replicate: whether to replicate the change to remote nodes
1417 ea03467c Iustin Pop

1418 ea03467c Iustin Pop
    """
1419 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
1420 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
1421 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
1422 4c36bdf5 Guido Trotter
    self._UpdateJobQueueFile(filename, data, replicate)
1423 ac0930b9 Iustin Pop
1424 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1425 5c735209 Iustin Pop
                        timeout):
1426 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
1427 6c5a7090 Michael Hanselmann

1428 6c5a7090 Michael Hanselmann
    @type job_id: string
1429 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
1430 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1431 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1432 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1433 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1434 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1435 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1436 5c735209 Iustin Pop
    @type timeout: float
1437 989a8bee Michael Hanselmann
    @param timeout: maximum time to wait in seconds
1438 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1439 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1440 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1441 ea03467c Iustin Pop

1442 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1443 ea03467c Iustin Pop
        we instead return a special value,
1444 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1445 ea03467c Iustin Pop
        as such by the clients
1446 6c5a7090 Michael Hanselmann

1447 6c5a7090 Michael Hanselmann
    """
1448 989a8bee Michael Hanselmann
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1449 989a8bee Michael Hanselmann
1450 989a8bee Michael Hanselmann
    helper = _WaitForJobChangesHelper()
1451 989a8bee Michael Hanselmann
1452 989a8bee Michael Hanselmann
    return helper(self._GetJobPath(job_id), load_fn,
1453 989a8bee Michael Hanselmann
                  fields, prev_job_info, prev_log_serial, timeout)
1454 dfe57c22 Michael Hanselmann
1455 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1456 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1457 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1458 188c5e0a Michael Hanselmann
    """Cancels a job.
1459 188c5e0a Michael Hanselmann

1460 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1461 ea03467c Iustin Pop

1462 188c5e0a Michael Hanselmann
    @type job_id: string
1463 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1464 188c5e0a Michael Hanselmann

1465 188c5e0a Michael Hanselmann
    """
1466 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1467 188c5e0a Michael Hanselmann
1468 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1469 188c5e0a Michael Hanselmann
    if not job:
1470 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1471 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1472 fbf0262f Michael Hanselmann
1473 fbf0262f Michael Hanselmann
    job_status = job.CalcStatus()
1474 188c5e0a Michael Hanselmann
1475 fbf0262f Michael Hanselmann
    if job_status not in (constants.JOB_STATUS_QUEUED,
1476 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK):
1477 a9e97393 Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1478 a9e97393 Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1479 fbf0262f Michael Hanselmann
1480 fbf0262f Michael Hanselmann
    if job_status == constants.JOB_STATUS_QUEUED:
1481 39ed3a98 Guido Trotter
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1482 39ed3a98 Guido Trotter
                            "Job canceled by request")
1483 fbf0262f Michael Hanselmann
      return (True, "Job %s canceled" % job.id)
1484 188c5e0a Michael Hanselmann
1485 fbf0262f Michael Hanselmann
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1486 fbf0262f Michael Hanselmann
      # The worker will notice the new status and cancel the job
1487 39ed3a98 Guido Trotter
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1488 fbf0262f Michael Hanselmann
      return (True, "Job %s will be canceled" % job.id)
1489 fbf0262f Michael Hanselmann
1490 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1491 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
1492 d7fd1f28 Michael Hanselmann
    """Archives jobs.
1493 c609f802 Michael Hanselmann

1494 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
1495 25e7b43f Iustin Pop
    @param jobs: Job objects
1496 d7fd1f28 Michael Hanselmann
    @rtype: int
1497 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
1498 c609f802 Michael Hanselmann

1499 c609f802 Michael Hanselmann
    """
1500 d7fd1f28 Michael Hanselmann
    archive_jobs = []
1501 d7fd1f28 Michael Hanselmann
    rename_files = []
1502 d7fd1f28 Michael Hanselmann
    for job in jobs:
1503 989a8bee Michael Hanselmann
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1504 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
1505 d7fd1f28 Michael Hanselmann
        continue
1506 c609f802 Michael Hanselmann
1507 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
1508 c609f802 Michael Hanselmann
1509 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
1510 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
1511 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
1512 c609f802 Michael Hanselmann
1513 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
1514 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
1515 f1da30e6 Michael Hanselmann
1516 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
1517 1f864b60 Iustin Pop
                  utils.CommaJoin(job.id for job in archive_jobs))
1518 d7fd1f28 Michael Hanselmann
1519 20571a26 Guido Trotter
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1520 20571a26 Guido Trotter
    # the files, we update the cached queue size from the filesystem. When we
1521 20571a26 Guido Trotter
    # get around to fix the TODO: above, we can use the number of actually
1522 20571a26 Guido Trotter
    # archived jobs to fix this.
1523 20571a26 Guido Trotter
    self._UpdateQueueSizeUnlocked()
1524 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
1525 78d12585 Michael Hanselmann
1526 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1527 07cd723a Iustin Pop
  @_RequireOpenQueue
1528 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1529 07cd723a Iustin Pop
    """Archives a job.
1530 07cd723a Iustin Pop

1531 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1532 ea03467c Iustin Pop

1533 07cd723a Iustin Pop
    @type job_id: string
1534 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1535 78d12585 Michael Hanselmann
    @rtype: bool
1536 78d12585 Michael Hanselmann
    @return: Whether job was archived
1537 07cd723a Iustin Pop

1538 07cd723a Iustin Pop
    """
1539 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
1540 78d12585 Michael Hanselmann
1541 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1542 78d12585 Michael Hanselmann
    if not job:
1543 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1544 78d12585 Michael Hanselmann
      return False
1545 78d12585 Michael Hanselmann
1546 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
1547 07cd723a Iustin Pop
1548 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1549 07cd723a Iustin Pop
  @_RequireOpenQueue
1550 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
1551 07cd723a Iustin Pop
    """Archives all jobs based on age.
1552 07cd723a Iustin Pop

1553 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1554 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1555 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1556 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1557 07cd723a Iustin Pop

1558 07cd723a Iustin Pop
    @type age: int
1559 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1560 07cd723a Iustin Pop

1561 07cd723a Iustin Pop
    """
1562 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1563 07cd723a Iustin Pop
1564 07cd723a Iustin Pop
    now = time.time()
1565 f8ad5591 Michael Hanselmann
    end_time = now + timeout
1566 f8ad5591 Michael Hanselmann
    archived_count = 0
1567 f8ad5591 Michael Hanselmann
    last_touched = 0
1568 f8ad5591 Michael Hanselmann
1569 69b03fd7 Guido Trotter
    all_job_ids = self._GetJobIDsUnlocked()
1570 d7fd1f28 Michael Hanselmann
    pending = []
1571 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1572 d2c8afb1 Michael Hanselmann
      last_touched = idx + 1
1573 f8ad5591 Michael Hanselmann
1574 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
1575 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
1576 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
1577 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
1578 f8ad5591 Michael Hanselmann
        break
1579 f8ad5591 Michael Hanselmann
1580 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
1581 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1582 f8ad5591 Michael Hanselmann
      if job:
1583 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
1584 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
1585 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
1586 f8ad5591 Michael Hanselmann
          else:
1587 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
1588 07cd723a Iustin Pop
        else:
1589 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
1590 f8ad5591 Michael Hanselmann
1591 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
1592 d7fd1f28 Michael Hanselmann
          pending.append(job)
1593 d7fd1f28 Michael Hanselmann
1594 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
1595 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
1596 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
1597 d7fd1f28 Michael Hanselmann
            pending = []
1598 f8ad5591 Michael Hanselmann
1599 d7fd1f28 Michael Hanselmann
    if pending:
1600 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
1601 07cd723a Iustin Pop
1602 d2c8afb1 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched)
1603 07cd723a Iustin Pop
1604 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1605 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1606 e2715f69 Michael Hanselmann

1607 ea03467c Iustin Pop
    @type job_ids: list
1608 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1609 ea03467c Iustin Pop
    @type fields: list
1610 ea03467c Iustin Pop
    @param fields: names of fields to return
1611 ea03467c Iustin Pop
    @rtype: list
1612 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1613 ea03467c Iustin Pop
        the requested fields
1614 e2715f69 Michael Hanselmann

1615 e2715f69 Michael Hanselmann
    """
1616 85f03e0d Michael Hanselmann
    jobs = []
1617 9f7b4967 Guido Trotter
    list_all = False
1618 9f7b4967 Guido Trotter
    if not job_ids:
1619 9f7b4967 Guido Trotter
      # Since files are added to/removed from the queue atomically, there's no
1620 9f7b4967 Guido Trotter
      # risk of getting the job ids in an inconsistent state.
1621 9f7b4967 Guido Trotter
      job_ids = self._GetJobIDsUnlocked()
1622 9f7b4967 Guido Trotter
      list_all = True
1623 e2715f69 Michael Hanselmann
1624 9f7b4967 Guido Trotter
    for job_id in job_ids:
1625 9f7b4967 Guido Trotter
      job = self.SafeLoadJobFromDisk(job_id)
1626 9f7b4967 Guido Trotter
      if job is not None:
1627 6a290889 Guido Trotter
        jobs.append(job.GetInfo(fields))
1628 9f7b4967 Guido Trotter
      elif not list_all:
1629 9f7b4967 Guido Trotter
        jobs.append(None)
1630 e2715f69 Michael Hanselmann
1631 85f03e0d Michael Hanselmann
    return jobs
1632 e2715f69 Michael Hanselmann
1633 ebb80afa Guido Trotter
  @locking.ssynchronized(_LOCK)
1634 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1635 e2715f69 Michael Hanselmann
  def Shutdown(self):
1636 e2715f69 Michael Hanselmann
    """Stops the job queue.
1637 e2715f69 Michael Hanselmann

1638 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1639 ea03467c Iustin Pop

1640 e2715f69 Michael Hanselmann
    """
1641 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1642 85f03e0d Michael Hanselmann
1643 a71f9c7d Guido Trotter
    self._queue_filelock.Close()
1644 a71f9c7d Guido Trotter
    self._queue_filelock = None