Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 99aabbed

History | View | Annotate | Download (25.1 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 6c5a7090 Michael Hanselmann
Locking:
25 6c5a7090 Michael Hanselmann
There's a single, large lock in the JobQueue class. It's used by all other
26 6c5a7090 Michael Hanselmann
classes in this module.
27 6c5a7090 Michael Hanselmann

28 6c5a7090 Michael Hanselmann
"""
29 498ae1cc Iustin Pop
30 f1da30e6 Michael Hanselmann
import os
31 e2715f69 Michael Hanselmann
import logging
32 e2715f69 Michael Hanselmann
import threading
33 f1da30e6 Michael Hanselmann
import errno
34 f1da30e6 Michael Hanselmann
import re
35 f1048938 Iustin Pop
import time
36 5685c1a5 Michael Hanselmann
import weakref
37 498ae1cc Iustin Pop
38 e2715f69 Michael Hanselmann
from ganeti import constants
39 f1da30e6 Michael Hanselmann
from ganeti import serializer
40 e2715f69 Michael Hanselmann
from ganeti import workerpool
41 f1da30e6 Michael Hanselmann
from ganeti import opcodes
42 7a1ecaed Iustin Pop
from ganeti import errors
43 e2715f69 Michael Hanselmann
from ganeti import mcpu
44 7996a135 Iustin Pop
from ganeti import utils
45 04ab05ce Michael Hanselmann
from ganeti import jstore
46 c3f0a12f Iustin Pop
from ganeti import rpc
47 e2715f69 Michael Hanselmann
48 72737a7f Iustin Pop
from ganeti.rpc import RpcRunner
49 e2715f69 Michael Hanselmann
50 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
51 e2715f69 Michael Hanselmann
52 498ae1cc Iustin Pop
53 70552c46 Michael Hanselmann
def TimeStampNow():
54 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
55 70552c46 Michael Hanselmann
56 70552c46 Michael Hanselmann
57 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
58 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
59 e2715f69 Michael Hanselmann

60 f1048938 Iustin Pop
  The 'log' attribute holds the execution log and consists of tuples
61 6c5a7090 Michael Hanselmann
  of the form (log_serial, timestamp, level, message).
62 f1048938 Iustin Pop

63 e2715f69 Michael Hanselmann
  """
64 85f03e0d Michael Hanselmann
  def __init__(self, op):
65 85f03e0d Michael Hanselmann
    self.input = op
66 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
67 85f03e0d Michael Hanselmann
    self.result = None
68 85f03e0d Michael Hanselmann
    self.log = []
69 70552c46 Michael Hanselmann
    self.start_timestamp = None
70 70552c46 Michael Hanselmann
    self.end_timestamp = None
71 f1da30e6 Michael Hanselmann
72 f1da30e6 Michael Hanselmann
  @classmethod
73 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
74 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
75 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
76 85f03e0d Michael Hanselmann
    obj.status = state["status"]
77 85f03e0d Michael Hanselmann
    obj.result = state["result"]
78 85f03e0d Michael Hanselmann
    obj.log = state["log"]
79 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
80 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
81 f1da30e6 Michael Hanselmann
    return obj
82 f1da30e6 Michael Hanselmann
83 f1da30e6 Michael Hanselmann
  def Serialize(self):
84 6c5a7090 Michael Hanselmann
    return {
85 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
86 6c5a7090 Michael Hanselmann
      "status": self.status,
87 6c5a7090 Michael Hanselmann
      "result": self.result,
88 6c5a7090 Michael Hanselmann
      "log": self.log,
89 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
90 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
91 6c5a7090 Michael Hanselmann
      }
92 f1048938 Iustin Pop
93 e2715f69 Michael Hanselmann
94 e2715f69 Michael Hanselmann
class _QueuedJob(object):
95 e2715f69 Michael Hanselmann
  """In-memory job representation.
96 e2715f69 Michael Hanselmann

97 6c5a7090 Michael Hanselmann
  This is what we use to track the user-submitted jobs. Locking must be taken
98 6c5a7090 Michael Hanselmann
  care of by users of this class.
99 e2715f69 Michael Hanselmann

100 e2715f69 Michael Hanselmann
  """
101 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
102 e2715f69 Michael Hanselmann
    if not ops:
103 e2715f69 Michael Hanselmann
      # TODO
104 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
105 e2715f69 Michael Hanselmann
106 85f03e0d Michael Hanselmann
    self.queue = queue
107 f1da30e6 Michael Hanselmann
    self.id = job_id
108 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
109 85f03e0d Michael Hanselmann
    self.run_op_index = -1
110 6c5a7090 Michael Hanselmann
    self.log_serial = 0
111 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
112 c56ec146 Iustin Pop
    self.start_timestamp = None
113 c56ec146 Iustin Pop
    self.end_timestamp = None
114 6c5a7090 Michael Hanselmann
115 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
116 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
117 f1da30e6 Michael Hanselmann
118 f1da30e6 Michael Hanselmann
  @classmethod
119 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
120 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
121 85f03e0d Michael Hanselmann
    obj.queue = queue
122 85f03e0d Michael Hanselmann
    obj.id = state["id"]
123 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
124 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
125 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
126 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
127 6c5a7090 Michael Hanselmann
128 6c5a7090 Michael Hanselmann
    obj.ops = []
129 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
130 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
131 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
132 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
133 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
134 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
135 6c5a7090 Michael Hanselmann
136 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
137 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
138 6c5a7090 Michael Hanselmann
139 f1da30e6 Michael Hanselmann
    return obj
140 f1da30e6 Michael Hanselmann
141 f1da30e6 Michael Hanselmann
  def Serialize(self):
142 f1da30e6 Michael Hanselmann
    return {
143 f1da30e6 Michael Hanselmann
      "id": self.id,
144 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
145 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
146 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
147 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
148 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
149 f1da30e6 Michael Hanselmann
      }
150 f1da30e6 Michael Hanselmann
151 85f03e0d Michael Hanselmann
  def CalcStatus(self):
152 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
153 e2715f69 Michael Hanselmann
154 e2715f69 Michael Hanselmann
    all_success = True
155 85f03e0d Michael Hanselmann
    for op in self.ops:
156 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
157 e2715f69 Michael Hanselmann
        continue
158 e2715f69 Michael Hanselmann
159 e2715f69 Michael Hanselmann
      all_success = False
160 e2715f69 Michael Hanselmann
161 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
162 e2715f69 Michael Hanselmann
        pass
163 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
164 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
165 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
166 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
167 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
168 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
169 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
170 f1da30e6 Michael Hanselmann
        break
171 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
172 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
173 4cb1d919 Michael Hanselmann
        break
174 e2715f69 Michael Hanselmann
175 e2715f69 Michael Hanselmann
    if all_success:
176 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
177 e2715f69 Michael Hanselmann
178 e2715f69 Michael Hanselmann
    return status
179 e2715f69 Michael Hanselmann
180 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
181 6c5a7090 Michael Hanselmann
    if newer_than is None:
182 6c5a7090 Michael Hanselmann
      serial = -1
183 6c5a7090 Michael Hanselmann
    else:
184 6c5a7090 Michael Hanselmann
      serial = newer_than
185 6c5a7090 Michael Hanselmann
186 6c5a7090 Michael Hanselmann
    entries = []
187 6c5a7090 Michael Hanselmann
    for op in self.ops:
188 6c5a7090 Michael Hanselmann
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
189 6c5a7090 Michael Hanselmann
190 6c5a7090 Michael Hanselmann
    return entries
191 6c5a7090 Michael Hanselmann
192 f1048938 Iustin Pop
193 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
194 e92376d7 Iustin Pop
  def _NotifyStart(self):
195 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
196 e92376d7 Iustin Pop

197 e92376d7 Iustin Pop
    This is called from the mcpu code as a notifier function, when the
198 e92376d7 Iustin Pop
    LU is finally about to start the Exec() method. Of course, to have
199 e92376d7 Iustin Pop
    end-user visible results, the opcode must be initially (before
200 e92376d7 Iustin Pop
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
201 e92376d7 Iustin Pop

202 e92376d7 Iustin Pop
    """
203 e92376d7 Iustin Pop
    assert self.queue, "Queue attribute is missing"
204 e92376d7 Iustin Pop
    assert self.opcode, "Opcode attribute is missing"
205 e92376d7 Iustin Pop
206 e92376d7 Iustin Pop
    self.queue.acquire()
207 e92376d7 Iustin Pop
    try:
208 e92376d7 Iustin Pop
      self.opcode.status = constants.OP_STATUS_RUNNING
209 e92376d7 Iustin Pop
    finally:
210 e92376d7 Iustin Pop
      self.queue.release()
211 e92376d7 Iustin Pop
212 85f03e0d Michael Hanselmann
  def RunTask(self, job):
213 e2715f69 Michael Hanselmann
    """Job executor.
214 e2715f69 Michael Hanselmann

215 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
216 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
217 e2715f69 Michael Hanselmann

218 e2715f69 Michael Hanselmann
    """
219 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
220 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
221 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
222 e92376d7 Iustin Pop
    self.queue = queue = job.queue
223 e2715f69 Michael Hanselmann
    try:
224 85f03e0d Michael Hanselmann
      try:
225 85f03e0d Michael Hanselmann
        count = len(job.ops)
226 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
227 85f03e0d Michael Hanselmann
          try:
228 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
229 85f03e0d Michael Hanselmann
230 85f03e0d Michael Hanselmann
            queue.acquire()
231 85f03e0d Michael Hanselmann
            try:
232 85f03e0d Michael Hanselmann
              job.run_op_index = idx
233 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
234 85f03e0d Michael Hanselmann
              op.result = None
235 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
236 c56ec146 Iustin Pop
              if idx == 0: # first opcode
237 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
238 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
239 85f03e0d Michael Hanselmann
240 38206f3c Iustin Pop
              input_opcode = op.input
241 85f03e0d Michael Hanselmann
            finally:
242 85f03e0d Michael Hanselmann
              queue.release()
243 85f03e0d Michael Hanselmann
244 dfe57c22 Michael Hanselmann
            def _Log(*args):
245 6c5a7090 Michael Hanselmann
              """Append a log entry.
246 6c5a7090 Michael Hanselmann

247 6c5a7090 Michael Hanselmann
              """
248 6c5a7090 Michael Hanselmann
              assert len(args) < 3
249 6c5a7090 Michael Hanselmann
250 6c5a7090 Michael Hanselmann
              if len(args) == 1:
251 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
252 6c5a7090 Michael Hanselmann
                log_msg = args[0]
253 6c5a7090 Michael Hanselmann
              else:
254 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
255 6c5a7090 Michael Hanselmann
256 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
257 6c5a7090 Michael Hanselmann
              # precision.
258 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
259 dfe57c22 Michael Hanselmann
260 6c5a7090 Michael Hanselmann
              queue.acquire()
261 dfe57c22 Michael Hanselmann
              try:
262 6c5a7090 Michael Hanselmann
                job.log_serial += 1
263 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
264 6c5a7090 Michael Hanselmann
265 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
266 dfe57c22 Michael Hanselmann
              finally:
267 6c5a7090 Michael Hanselmann
                queue.release()
268 dfe57c22 Michael Hanselmann
269 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
270 e92376d7 Iustin Pop
            self.opcode = op
271 e92376d7 Iustin Pop
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
272 85f03e0d Michael Hanselmann
273 85f03e0d Michael Hanselmann
            queue.acquire()
274 85f03e0d Michael Hanselmann
            try:
275 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
276 85f03e0d Michael Hanselmann
              op.result = result
277 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
278 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
279 85f03e0d Michael Hanselmann
            finally:
280 85f03e0d Michael Hanselmann
              queue.release()
281 85f03e0d Michael Hanselmann
282 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
283 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
284 85f03e0d Michael Hanselmann
          except Exception, err:
285 85f03e0d Michael Hanselmann
            queue.acquire()
286 85f03e0d Michael Hanselmann
            try:
287 85f03e0d Michael Hanselmann
              try:
288 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
289 85f03e0d Michael Hanselmann
                op.result = str(err)
290 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
291 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
292 85f03e0d Michael Hanselmann
              finally:
293 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
294 85f03e0d Michael Hanselmann
            finally:
295 85f03e0d Michael Hanselmann
              queue.release()
296 85f03e0d Michael Hanselmann
            raise
297 85f03e0d Michael Hanselmann
298 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
299 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
300 85f03e0d Michael Hanselmann
      except:
301 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
302 e2715f69 Michael Hanselmann
    finally:
303 85f03e0d Michael Hanselmann
      queue.acquire()
304 85f03e0d Michael Hanselmann
      try:
305 65548ed5 Michael Hanselmann
        try:
306 65548ed5 Michael Hanselmann
          job.run_op_idx = -1
307 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
308 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
309 65548ed5 Michael Hanselmann
        finally:
310 65548ed5 Michael Hanselmann
          job_id = job.id
311 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
312 85f03e0d Michael Hanselmann
      finally:
313 85f03e0d Michael Hanselmann
        queue.release()
314 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
315 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
316 e2715f69 Michael Hanselmann
317 e2715f69 Michael Hanselmann
318 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
319 5bdce580 Michael Hanselmann
  def __init__(self, queue):
320 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
321 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
322 5bdce580 Michael Hanselmann
    self.queue = queue
323 e2715f69 Michael Hanselmann
324 e2715f69 Michael Hanselmann
325 85f03e0d Michael Hanselmann
class JobQueue(object):
326 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
327 f1da30e6 Michael Hanselmann
328 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
329 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
330 db37da70 Michael Hanselmann

331 db37da70 Michael Hanselmann
    This function should be used for all "public" functions. That is, functions
332 db37da70 Michael Hanselmann
    usually called from other classes.
333 db37da70 Michael Hanselmann

334 db37da70 Michael Hanselmann
    Important: Use this decorator only after utils.LockedMethod!
335 db37da70 Michael Hanselmann

336 db37da70 Michael Hanselmann
    Example:
337 db37da70 Michael Hanselmann
      @utils.LockedMethod
338 db37da70 Michael Hanselmann
      @_RequireOpenQueue
339 db37da70 Michael Hanselmann
      def Example(self):
340 db37da70 Michael Hanselmann
        pass
341 db37da70 Michael Hanselmann

342 db37da70 Michael Hanselmann
    """
343 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
344 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
345 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
346 db37da70 Michael Hanselmann
    return wrapper
347 db37da70 Michael Hanselmann
348 85f03e0d Michael Hanselmann
  def __init__(self, context):
349 5bdce580 Michael Hanselmann
    self.context = context
350 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
351 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
352 f1da30e6 Michael Hanselmann
353 85f03e0d Michael Hanselmann
    # Locking
354 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
355 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
356 85f03e0d Michael Hanselmann
    self.release = self._lock.release
357 85f03e0d Michael Hanselmann
358 04ab05ce Michael Hanselmann
    # Initialize
359 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
360 f1da30e6 Michael Hanselmann
361 04ab05ce Michael Hanselmann
    # Read serial file
362 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
363 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
364 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
365 c4beba1c Iustin Pop
366 23752136 Michael Hanselmann
    # Get initial list of nodes
367 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
368 99aabbed Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values())
369 8e00939c Michael Hanselmann
370 8e00939c Michael Hanselmann
    # Remove master node
371 8e00939c Michael Hanselmann
    try:
372 99aabbed Iustin Pop
      del self._nodes[self._my_hostname]
373 8e00939c Michael Hanselmann
    except ValueError:
374 8e00939c Michael Hanselmann
      pass
375 23752136 Michael Hanselmann
376 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
377 23752136 Michael Hanselmann
378 85f03e0d Michael Hanselmann
    # Setup worker pool
379 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
380 85f03e0d Michael Hanselmann
381 85f03e0d Michael Hanselmann
    # We need to lock here because WorkerPool.AddTask() may start a job while
382 85f03e0d Michael Hanselmann
    # we're still doing our work.
383 85f03e0d Michael Hanselmann
    self.acquire()
384 85f03e0d Michael Hanselmann
    try:
385 85f03e0d Michael Hanselmann
      for job in self._GetJobsUnlocked(None):
386 94ed59a5 Iustin Pop
        # a failure in loading the job can cause 'None' to be returned
387 94ed59a5 Iustin Pop
        if job is None:
388 94ed59a5 Iustin Pop
          continue
389 94ed59a5 Iustin Pop
390 85f03e0d Michael Hanselmann
        status = job.CalcStatus()
391 85f03e0d Michael Hanselmann
392 85f03e0d Michael Hanselmann
        if status in (constants.JOB_STATUS_QUEUED, ):
393 85f03e0d Michael Hanselmann
          self._wpool.AddTask(job)
394 85f03e0d Michael Hanselmann
395 e92376d7 Iustin Pop
        elif status in (constants.JOB_STATUS_RUNNING,
396 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
397 85f03e0d Michael Hanselmann
          logging.warning("Unfinished job %s found: %s", job.id, job)
398 85f03e0d Michael Hanselmann
          try:
399 85f03e0d Michael Hanselmann
            for op in job.ops:
400 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_ERROR
401 85f03e0d Michael Hanselmann
              op.result = "Unclean master daemon shutdown"
402 85f03e0d Michael Hanselmann
          finally:
403 85f03e0d Michael Hanselmann
            self.UpdateJobUnlocked(job)
404 85f03e0d Michael Hanselmann
    finally:
405 85f03e0d Michael Hanselmann
      self.release()
406 85f03e0d Michael Hanselmann
407 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
408 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
409 99aabbed Iustin Pop
  def AddNode(self, node):
410 99aabbed Iustin Pop
    """Register a new node with the queue.
411 99aabbed Iustin Pop

412 99aabbed Iustin Pop
    @type node: L{objects.Node}
413 99aabbed Iustin Pop
    @param node: the node object to be added
414 99aabbed Iustin Pop

415 99aabbed Iustin Pop
    """
416 99aabbed Iustin Pop
    node_name = node.name
417 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
418 23752136 Michael Hanselmann
419 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
420 72737a7f Iustin Pop
    RpcRunner.call_jobqueue_purge(node_name)
421 23752136 Michael Hanselmann
422 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
423 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
424 23752136 Michael Hanselmann
425 d2e03a33 Michael Hanselmann
    # Upload current serial file
426 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
427 d2e03a33 Michael Hanselmann
428 d2e03a33 Michael Hanselmann
    for file_name in files:
429 9f774ee8 Michael Hanselmann
      # Read file content
430 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
431 9f774ee8 Michael Hanselmann
      try:
432 9f774ee8 Michael Hanselmann
        content = fd.read()
433 9f774ee8 Michael Hanselmann
      finally:
434 9f774ee8 Michael Hanselmann
        fd.close()
435 9f774ee8 Michael Hanselmann
436 99aabbed Iustin Pop
      result = RpcRunner.call_jobqueue_update([node_name], [node.primary_ip],
437 99aabbed Iustin Pop
                                              file_name, content)
438 d2e03a33 Michael Hanselmann
      if not result[node_name]:
439 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
440 d2e03a33 Michael Hanselmann
441 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
442 d2e03a33 Michael Hanselmann
443 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
444 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
445 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
446 23752136 Michael Hanselmann
    try:
447 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
448 99aabbed Iustin Pop
      del self._nodes[node_name]
449 d2e03a33 Michael Hanselmann
    except KeyError:
450 23752136 Michael Hanselmann
      pass
451 23752136 Michael Hanselmann
452 e74798c1 Michael Hanselmann
  def _CheckRpcResult(self, result, nodes, failmsg):
453 e74798c1 Michael Hanselmann
    failed = []
454 e74798c1 Michael Hanselmann
    success = []
455 e74798c1 Michael Hanselmann
456 e74798c1 Michael Hanselmann
    for node in nodes:
457 e74798c1 Michael Hanselmann
      if result[node]:
458 e74798c1 Michael Hanselmann
        success.append(node)
459 e74798c1 Michael Hanselmann
      else:
460 e74798c1 Michael Hanselmann
        failed.append(node)
461 e74798c1 Michael Hanselmann
462 e74798c1 Michael Hanselmann
    if failed:
463 e74798c1 Michael Hanselmann
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
464 e74798c1 Michael Hanselmann
465 e74798c1 Michael Hanselmann
    # +1 for the master node
466 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
467 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
468 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
469 e74798c1 Michael Hanselmann
470 99aabbed Iustin Pop
  def _GetNodeIp(self):
471 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
472 99aabbed Iustin Pop

473 99aabbed Iustin Pop
    """
474 99aabbed Iustin Pop
    name_list = self._nodes.keys()
475 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
476 99aabbed Iustin Pop
    return name_list, addr_list
477 99aabbed Iustin Pop
478 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
479 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
480 8e00939c Michael Hanselmann

481 8e00939c Michael Hanselmann
    """
482 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
483 8e00939c Michael Hanselmann
484 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
485 99aabbed Iustin Pop
    result = RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
486 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
487 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
488 23752136 Michael Hanselmann
489 abc1f2ce Michael Hanselmann
  def _RenameFileUnlocked(self, old, new):
490 abc1f2ce Michael Hanselmann
    os.rename(old, new)
491 abc1f2ce Michael Hanselmann
492 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
493 99aabbed Iustin Pop
    result = RpcRunner.call_jobqueue_rename(names, addrs, old, new)
494 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
495 e74798c1 Michael Hanselmann
                         "Moving %s to %s" % (old, new))
496 abc1f2ce Michael Hanselmann
497 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
498 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
499 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
500 85f03e0d Michael Hanselmann
    if job_id < 0:
501 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
502 85f03e0d Michael Hanselmann
503 85f03e0d Michael Hanselmann
    return str(job_id)
504 85f03e0d Michael Hanselmann
505 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
506 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
507 f1da30e6 Michael Hanselmann

508 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
509 f1da30e6 Michael Hanselmann

510 f1da30e6 Michael Hanselmann
    Returns: A string representing the job identifier.
511 f1da30e6 Michael Hanselmann

512 f1da30e6 Michael Hanselmann
    """
513 f1da30e6 Michael Hanselmann
    # New number
514 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
515 f1da30e6 Michael Hanselmann
516 f1da30e6 Michael Hanselmann
    # Write to file
517 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
518 23752136 Michael Hanselmann
                                        "%s\n" % serial)
519 f1da30e6 Michael Hanselmann
520 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
521 f1da30e6 Michael Hanselmann
    self._last_serial = serial
522 f1da30e6 Michael Hanselmann
523 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
524 f1da30e6 Michael Hanselmann
525 85f03e0d Michael Hanselmann
  @staticmethod
526 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
527 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
528 f1da30e6 Michael Hanselmann
529 85f03e0d Michael Hanselmann
  @staticmethod
530 85f03e0d Michael Hanselmann
  def _GetArchivedJobPath(job_id):
531 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
532 0cb94105 Michael Hanselmann
533 85f03e0d Michael Hanselmann
  @classmethod
534 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
535 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
536 fae737ac Michael Hanselmann
    if m:
537 fae737ac Michael Hanselmann
      return m.group(1)
538 fae737ac Michael Hanselmann
    else:
539 fae737ac Michael Hanselmann
      return None
540 fae737ac Michael Hanselmann
541 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
542 911a495b Iustin Pop
    """Return all known job IDs.
543 911a495b Iustin Pop

544 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
545 911a495b Iustin Pop
    included. Currently this argument is unused.
546 911a495b Iustin Pop

547 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
548 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
549 ac0930b9 Iustin Pop
    extra IDs).
550 ac0930b9 Iustin Pop

551 911a495b Iustin Pop
    """
552 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
553 3b87986e Iustin Pop
    jlist = utils.NiceSort(jlist)
554 f0d874fe Iustin Pop
    return jlist
555 911a495b Iustin Pop
556 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
557 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
558 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
559 f1da30e6 Michael Hanselmann
560 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
561 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
562 5685c1a5 Michael Hanselmann
    if job:
563 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
564 5685c1a5 Michael Hanselmann
      return job
565 ac0930b9 Iustin Pop
566 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
567 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
568 f1da30e6 Michael Hanselmann
    try:
569 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
570 f1da30e6 Michael Hanselmann
    except IOError, err:
571 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
572 f1da30e6 Michael Hanselmann
        return None
573 f1da30e6 Michael Hanselmann
      raise
574 f1da30e6 Michael Hanselmann
    try:
575 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
576 f1da30e6 Michael Hanselmann
    finally:
577 f1da30e6 Michael Hanselmann
      fd.close()
578 f1da30e6 Michael Hanselmann
579 94ed59a5 Iustin Pop
    try:
580 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
581 94ed59a5 Iustin Pop
    except Exception, err:
582 94ed59a5 Iustin Pop
      new_path = self._GetArchivedJobPath(job_id)
583 94ed59a5 Iustin Pop
      if filepath == new_path:
584 94ed59a5 Iustin Pop
        # job already archived (future case)
585 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s", job_id)
586 94ed59a5 Iustin Pop
      else:
587 94ed59a5 Iustin Pop
        # non-archived case
588 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s, will archive.", job_id)
589 94ed59a5 Iustin Pop
        self._RenameFileUnlocked(filepath, new_path)
590 94ed59a5 Iustin Pop
      return None
591 94ed59a5 Iustin Pop
592 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
593 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
594 ac0930b9 Iustin Pop
    return job
595 f1da30e6 Michael Hanselmann
596 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
597 911a495b Iustin Pop
    if not job_ids:
598 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
599 f1da30e6 Michael Hanselmann
600 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
601 f1da30e6 Michael Hanselmann
602 686d7433 Iustin Pop
  @staticmethod
603 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
604 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
605 686d7433 Iustin Pop

606 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
607 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
608 686d7433 Iustin Pop

609 686d7433 Iustin Pop
    """
610 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
611 686d7433 Iustin Pop
612 3ccafd0e Iustin Pop
  @staticmethod
613 3ccafd0e Iustin Pop
  def SetDrainFlag(drain_flag):
614 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
615 3ccafd0e Iustin Pop

616 3ccafd0e Iustin Pop
    This is similar to the function L{backend.JobQueueSetDrainFlag},
617 3ccafd0e Iustin Pop
    and in the future we might merge them.
618 3ccafd0e Iustin Pop

619 3ccafd0e Iustin Pop
    """
620 3ccafd0e Iustin Pop
    if drain_flag:
621 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
622 3ccafd0e Iustin Pop
    else:
623 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
624 3ccafd0e Iustin Pop
    return True
625 3ccafd0e Iustin Pop
626 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
627 db37da70 Michael Hanselmann
  @_RequireOpenQueue
628 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
629 85f03e0d Michael Hanselmann
    """Create and store a new job.
630 f1da30e6 Michael Hanselmann

631 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
632 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
633 c3f0a12f Iustin Pop

634 c3f0a12f Iustin Pop
    @type ops: list
635 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
636 c3f0a12f Iustin Pop

637 c3f0a12f Iustin Pop
    """
638 686d7433 Iustin Pop
    if self._IsQueueMarkedDrain():
639 686d7433 Iustin Pop
      raise errors.JobQueueDrainError()
640 f1da30e6 Michael Hanselmann
    # Get job identifier
641 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
642 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
643 f1da30e6 Michael Hanselmann
644 f1da30e6 Michael Hanselmann
    # Write to disk
645 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
646 f1da30e6 Michael Hanselmann
647 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
648 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
649 ac0930b9 Iustin Pop
650 85f03e0d Michael Hanselmann
    # Add to worker pool
651 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
652 85f03e0d Michael Hanselmann
653 85f03e0d Michael Hanselmann
    return job.id
654 f1da30e6 Michael Hanselmann
655 db37da70 Michael Hanselmann
  @_RequireOpenQueue
656 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
657 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
658 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
659 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
660 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
661 ac0930b9 Iustin Pop
662 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
663 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
664 dfe57c22 Michael Hanselmann
665 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
666 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
667 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
668 5c735209 Iustin Pop
                        timeout):
669 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
670 6c5a7090 Michael Hanselmann

671 6c5a7090 Michael Hanselmann
    @type job_id: string
672 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
673 6c5a7090 Michael Hanselmann
    @type fields: list of strings
674 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
675 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
676 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
677 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
678 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
679 5c735209 Iustin Pop
    @type timeout: float
680 5c735209 Iustin Pop
    @param timeout: maximum time to wait
681 6c5a7090 Michael Hanselmann

682 6c5a7090 Michael Hanselmann
    """
683 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
684 5c735209 Iustin Pop
    end_time = time.time() + timeout
685 dfe57c22 Michael Hanselmann
    while True:
686 5c735209 Iustin Pop
      delta_time = end_time - time.time()
687 5c735209 Iustin Pop
      if delta_time < 0:
688 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
689 5c735209 Iustin Pop
690 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
691 6c5a7090 Michael Hanselmann
      if not job:
692 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
693 6c5a7090 Michael Hanselmann
        break
694 dfe57c22 Michael Hanselmann
695 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
696 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
697 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
698 dfe57c22 Michael Hanselmann
699 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
700 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
701 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
702 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
703 dfe57c22 Michael Hanselmann
      # significantly different.
704 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
705 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
706 dfe57c22 Michael Hanselmann
707 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
708 e92376d7 Iustin Pop
                        constants.JOB_STATUS_RUNNING,
709 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
710 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
711 6c5a7090 Michael Hanselmann
        # no changes.
712 dfe57c22 Michael Hanselmann
        break
713 dfe57c22 Michael Hanselmann
714 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
715 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
716 6c5a7090 Michael Hanselmann
        break
717 6c5a7090 Michael Hanselmann
718 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
719 6c5a7090 Michael Hanselmann
720 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
721 5c735209 Iustin Pop
      job.change.wait(delta_time)
722 dfe57c22 Michael Hanselmann
723 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
724 dfe57c22 Michael Hanselmann
725 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
726 dfe57c22 Michael Hanselmann
727 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
728 db37da70 Michael Hanselmann
  @_RequireOpenQueue
729 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
730 188c5e0a Michael Hanselmann
    """Cancels a job.
731 188c5e0a Michael Hanselmann

732 188c5e0a Michael Hanselmann
    @type job_id: string
733 188c5e0a Michael Hanselmann
    @param job_id: Job ID of job to be cancelled.
734 188c5e0a Michael Hanselmann

735 188c5e0a Michael Hanselmann
    """
736 188c5e0a Michael Hanselmann
    logging.debug("Cancelling job %s", job_id)
737 188c5e0a Michael Hanselmann
738 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
739 188c5e0a Michael Hanselmann
    if not job:
740 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
741 188c5e0a Michael Hanselmann
      return
742 188c5e0a Michael Hanselmann
743 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
744 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
745 188c5e0a Michael Hanselmann
      return
746 188c5e0a Michael Hanselmann
747 85f03e0d Michael Hanselmann
    try:
748 85f03e0d Michael Hanselmann
      for op in job.ops:
749 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
750 85f03e0d Michael Hanselmann
        op.result = "Job cancelled by request"
751 85f03e0d Michael Hanselmann
    finally:
752 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
753 188c5e0a Michael Hanselmann
754 db37da70 Michael Hanselmann
  @_RequireOpenQueue
755 07cd723a Iustin Pop
  def _ArchiveJobUnlocked(self, job_id):
756 c609f802 Michael Hanselmann
    """Archives a job.
757 c609f802 Michael Hanselmann

758 c609f802 Michael Hanselmann
    @type job_id: string
759 c609f802 Michael Hanselmann
    @param job_id: Job ID of job to be archived.
760 c609f802 Michael Hanselmann

761 c609f802 Michael Hanselmann
    """
762 07cd723a Iustin Pop
    logging.info("Archiving job %s", job_id)
763 c609f802 Michael Hanselmann
764 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
765 c609f802 Michael Hanselmann
    if not job:
766 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
767 c609f802 Michael Hanselmann
      return
768 c609f802 Michael Hanselmann
769 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
770 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_SUCCESS,
771 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_ERROR):
772 85f03e0d Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
773 c609f802 Michael Hanselmann
      return
774 c609f802 Michael Hanselmann
775 5685c1a5 Michael Hanselmann
    old = self._GetJobPath(job.id)
776 5685c1a5 Michael Hanselmann
    new = self._GetArchivedJobPath(job.id)
777 c609f802 Michael Hanselmann
778 5685c1a5 Michael Hanselmann
    self._RenameFileUnlocked(old, new)
779 c609f802 Michael Hanselmann
780 5685c1a5 Michael Hanselmann
    logging.debug("Successfully archived job %s", job.id)
781 f1da30e6 Michael Hanselmann
782 07cd723a Iustin Pop
  @utils.LockedMethod
783 07cd723a Iustin Pop
  @_RequireOpenQueue
784 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
785 07cd723a Iustin Pop
    """Archives a job.
786 07cd723a Iustin Pop

787 07cd723a Iustin Pop
    @type job_id: string
788 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
789 07cd723a Iustin Pop

790 07cd723a Iustin Pop
    """
791 07cd723a Iustin Pop
    return self._ArchiveJobUnlocked(job_id)
792 07cd723a Iustin Pop
793 07cd723a Iustin Pop
  @utils.LockedMethod
794 07cd723a Iustin Pop
  @_RequireOpenQueue
795 07cd723a Iustin Pop
  def AutoArchiveJobs(self, age):
796 07cd723a Iustin Pop
    """Archives all jobs based on age.
797 07cd723a Iustin Pop

798 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
799 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
800 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
801 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
802 07cd723a Iustin Pop

803 07cd723a Iustin Pop
    @type age: int
804 07cd723a Iustin Pop
    @param age: the minimum age in seconds
805 07cd723a Iustin Pop

806 07cd723a Iustin Pop
    """
807 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
808 07cd723a Iustin Pop
809 07cd723a Iustin Pop
    now = time.time()
810 07cd723a Iustin Pop
    for jid in self._GetJobIDsUnlocked(archived=False):
811 07cd723a Iustin Pop
      job = self._LoadJobUnlocked(jid)
812 07cd723a Iustin Pop
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
813 07cd723a Iustin Pop
                                  constants.OP_STATUS_ERROR,
814 07cd723a Iustin Pop
                                  constants.OP_STATUS_CANCELED):
815 07cd723a Iustin Pop
        continue
816 07cd723a Iustin Pop
      if job.end_timestamp is None:
817 07cd723a Iustin Pop
        if job.start_timestamp is None:
818 07cd723a Iustin Pop
          job_age = job.received_timestamp
819 07cd723a Iustin Pop
        else:
820 07cd723a Iustin Pop
          job_age = job.start_timestamp
821 07cd723a Iustin Pop
      else:
822 07cd723a Iustin Pop
        job_age = job.end_timestamp
823 07cd723a Iustin Pop
824 07cd723a Iustin Pop
      if age == -1 or now - job_age[0] > age:
825 07cd723a Iustin Pop
        self._ArchiveJobUnlocked(jid)
826 07cd723a Iustin Pop
827 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
828 e2715f69 Michael Hanselmann
    row = []
829 e2715f69 Michael Hanselmann
    for fname in fields:
830 e2715f69 Michael Hanselmann
      if fname == "id":
831 e2715f69 Michael Hanselmann
        row.append(job.id)
832 e2715f69 Michael Hanselmann
      elif fname == "status":
833 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
834 af30b2fd Michael Hanselmann
      elif fname == "ops":
835 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
836 af30b2fd Michael Hanselmann
      elif fname == "opresult":
837 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
838 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
839 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
840 5b23c34c Iustin Pop
      elif fname == "oplog":
841 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
842 c56ec146 Iustin Pop
      elif fname == "opstart":
843 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
844 c56ec146 Iustin Pop
      elif fname == "opend":
845 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
846 c56ec146 Iustin Pop
      elif fname == "received_ts":
847 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
848 c56ec146 Iustin Pop
      elif fname == "start_ts":
849 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
850 c56ec146 Iustin Pop
      elif fname == "end_ts":
851 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
852 60dd1473 Iustin Pop
      elif fname == "summary":
853 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
854 e2715f69 Michael Hanselmann
      else:
855 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
856 e2715f69 Michael Hanselmann
    return row
857 e2715f69 Michael Hanselmann
858 85f03e0d Michael Hanselmann
  @utils.LockedMethod
859 db37da70 Michael Hanselmann
  @_RequireOpenQueue
860 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
861 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
862 e2715f69 Michael Hanselmann

863 e2715f69 Michael Hanselmann
    Args:
864 e2715f69 Michael Hanselmann
    - job_ids: Sequence of job identifiers or None for all
865 e2715f69 Michael Hanselmann
    - fields: Names of fields to return
866 e2715f69 Michael Hanselmann

867 e2715f69 Michael Hanselmann
    """
868 85f03e0d Michael Hanselmann
    jobs = []
869 e2715f69 Michael Hanselmann
870 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
871 85f03e0d Michael Hanselmann
      if job is None:
872 85f03e0d Michael Hanselmann
        jobs.append(None)
873 85f03e0d Michael Hanselmann
      else:
874 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
875 e2715f69 Michael Hanselmann
876 85f03e0d Michael Hanselmann
    return jobs
877 e2715f69 Michael Hanselmann
878 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
879 db37da70 Michael Hanselmann
  @_RequireOpenQueue
880 e2715f69 Michael Hanselmann
  def Shutdown(self):
881 e2715f69 Michael Hanselmann
    """Stops the job queue.
882 e2715f69 Michael Hanselmann

883 e2715f69 Michael Hanselmann
    """
884 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
885 85f03e0d Michael Hanselmann
886 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
887 04ab05ce Michael Hanselmann
    self._queue_lock = None