Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 686d7433

History | View | Annotate | Download (23.7 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 6c5a7090 Michael Hanselmann
Locking:
25 6c5a7090 Michael Hanselmann
There's a single, large lock in the JobQueue class. It's used by all other
26 6c5a7090 Michael Hanselmann
classes in this module.
27 6c5a7090 Michael Hanselmann

28 6c5a7090 Michael Hanselmann
"""
29 498ae1cc Iustin Pop
30 f1da30e6 Michael Hanselmann
import os
31 e2715f69 Michael Hanselmann
import logging
32 e2715f69 Michael Hanselmann
import threading
33 f1da30e6 Michael Hanselmann
import errno
34 f1da30e6 Michael Hanselmann
import re
35 f1048938 Iustin Pop
import time
36 5685c1a5 Michael Hanselmann
import weakref
37 498ae1cc Iustin Pop
38 e2715f69 Michael Hanselmann
from ganeti import constants
39 f1da30e6 Michael Hanselmann
from ganeti import serializer
40 e2715f69 Michael Hanselmann
from ganeti import workerpool
41 f1da30e6 Michael Hanselmann
from ganeti import opcodes
42 7a1ecaed Iustin Pop
from ganeti import errors
43 e2715f69 Michael Hanselmann
from ganeti import mcpu
44 7996a135 Iustin Pop
from ganeti import utils
45 04ab05ce Michael Hanselmann
from ganeti import jstore
46 c3f0a12f Iustin Pop
from ganeti import rpc
47 e2715f69 Michael Hanselmann
48 72737a7f Iustin Pop
from ganeti.rpc import RpcRunner
49 e2715f69 Michael Hanselmann
50 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
51 e2715f69 Michael Hanselmann
52 498ae1cc Iustin Pop
53 70552c46 Michael Hanselmann
def TimeStampNow():
54 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
55 70552c46 Michael Hanselmann
56 70552c46 Michael Hanselmann
57 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
58 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
59 e2715f69 Michael Hanselmann

60 f1048938 Iustin Pop
  The 'log' attribute holds the execution log and consists of tuples
61 6c5a7090 Michael Hanselmann
  of the form (log_serial, timestamp, level, message).
62 f1048938 Iustin Pop

63 e2715f69 Michael Hanselmann
  """
64 85f03e0d Michael Hanselmann
  def __init__(self, op):
65 85f03e0d Michael Hanselmann
    self.input = op
66 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
67 85f03e0d Michael Hanselmann
    self.result = None
68 85f03e0d Michael Hanselmann
    self.log = []
69 70552c46 Michael Hanselmann
    self.start_timestamp = None
70 70552c46 Michael Hanselmann
    self.end_timestamp = None
71 f1da30e6 Michael Hanselmann
72 f1da30e6 Michael Hanselmann
  @classmethod
73 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
74 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
75 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
76 85f03e0d Michael Hanselmann
    obj.status = state["status"]
77 85f03e0d Michael Hanselmann
    obj.result = state["result"]
78 85f03e0d Michael Hanselmann
    obj.log = state["log"]
79 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
80 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
81 f1da30e6 Michael Hanselmann
    return obj
82 f1da30e6 Michael Hanselmann
83 f1da30e6 Michael Hanselmann
  def Serialize(self):
84 6c5a7090 Michael Hanselmann
    return {
85 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
86 6c5a7090 Michael Hanselmann
      "status": self.status,
87 6c5a7090 Michael Hanselmann
      "result": self.result,
88 6c5a7090 Michael Hanselmann
      "log": self.log,
89 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
90 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
91 6c5a7090 Michael Hanselmann
      }
92 f1048938 Iustin Pop
93 e2715f69 Michael Hanselmann
94 e2715f69 Michael Hanselmann
class _QueuedJob(object):
95 e2715f69 Michael Hanselmann
  """In-memory job representation.
96 e2715f69 Michael Hanselmann

97 6c5a7090 Michael Hanselmann
  This is what we use to track the user-submitted jobs. Locking must be taken
98 6c5a7090 Michael Hanselmann
  care of by users of this class.
99 e2715f69 Michael Hanselmann

100 e2715f69 Michael Hanselmann
  """
101 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
102 e2715f69 Michael Hanselmann
    if not ops:
103 e2715f69 Michael Hanselmann
      # TODO
104 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
105 e2715f69 Michael Hanselmann
106 85f03e0d Michael Hanselmann
    self.queue = queue
107 f1da30e6 Michael Hanselmann
    self.id = job_id
108 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
109 85f03e0d Michael Hanselmann
    self.run_op_index = -1
110 6c5a7090 Michael Hanselmann
    self.log_serial = 0
111 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
112 c56ec146 Iustin Pop
    self.start_timestamp = None
113 c56ec146 Iustin Pop
    self.end_timestamp = None
114 6c5a7090 Michael Hanselmann
115 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
116 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
117 f1da30e6 Michael Hanselmann
118 f1da30e6 Michael Hanselmann
  @classmethod
119 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
120 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
121 85f03e0d Michael Hanselmann
    obj.queue = queue
122 85f03e0d Michael Hanselmann
    obj.id = state["id"]
123 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
124 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
125 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
126 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
127 6c5a7090 Michael Hanselmann
128 6c5a7090 Michael Hanselmann
    obj.ops = []
129 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
130 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
131 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
132 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
133 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
134 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
135 6c5a7090 Michael Hanselmann
136 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
137 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
138 6c5a7090 Michael Hanselmann
139 f1da30e6 Michael Hanselmann
    return obj
140 f1da30e6 Michael Hanselmann
141 f1da30e6 Michael Hanselmann
  def Serialize(self):
142 f1da30e6 Michael Hanselmann
    return {
143 f1da30e6 Michael Hanselmann
      "id": self.id,
144 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
145 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
146 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
147 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
148 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
149 f1da30e6 Michael Hanselmann
      }
150 f1da30e6 Michael Hanselmann
151 85f03e0d Michael Hanselmann
  def CalcStatus(self):
152 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
153 e2715f69 Michael Hanselmann
154 e2715f69 Michael Hanselmann
    all_success = True
155 85f03e0d Michael Hanselmann
    for op in self.ops:
156 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
157 e2715f69 Michael Hanselmann
        continue
158 e2715f69 Michael Hanselmann
159 e2715f69 Michael Hanselmann
      all_success = False
160 e2715f69 Michael Hanselmann
161 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
162 e2715f69 Michael Hanselmann
        pass
163 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
164 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
165 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
166 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
167 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
168 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
169 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
170 f1da30e6 Michael Hanselmann
        break
171 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
172 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
173 4cb1d919 Michael Hanselmann
        break
174 e2715f69 Michael Hanselmann
175 e2715f69 Michael Hanselmann
    if all_success:
176 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
177 e2715f69 Michael Hanselmann
178 e2715f69 Michael Hanselmann
    return status
179 e2715f69 Michael Hanselmann
180 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
181 6c5a7090 Michael Hanselmann
    if newer_than is None:
182 6c5a7090 Michael Hanselmann
      serial = -1
183 6c5a7090 Michael Hanselmann
    else:
184 6c5a7090 Michael Hanselmann
      serial = newer_than
185 6c5a7090 Michael Hanselmann
186 6c5a7090 Michael Hanselmann
    entries = []
187 6c5a7090 Michael Hanselmann
    for op in self.ops:
188 6c5a7090 Michael Hanselmann
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
189 6c5a7090 Michael Hanselmann
190 6c5a7090 Michael Hanselmann
    return entries
191 6c5a7090 Michael Hanselmann
192 f1048938 Iustin Pop
193 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
194 e92376d7 Iustin Pop
  def _NotifyStart(self):
195 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
196 e92376d7 Iustin Pop

197 e92376d7 Iustin Pop
    This is called from the mcpu code as a notifier function, when the
198 e92376d7 Iustin Pop
    LU is finally about to start the Exec() method. Of course, to have
199 e92376d7 Iustin Pop
    end-user visible results, the opcode must be initially (before
200 e92376d7 Iustin Pop
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
201 e92376d7 Iustin Pop

202 e92376d7 Iustin Pop
    """
203 e92376d7 Iustin Pop
    assert self.queue, "Queue attribute is missing"
204 e92376d7 Iustin Pop
    assert self.opcode, "Opcode attribute is missing"
205 e92376d7 Iustin Pop
206 e92376d7 Iustin Pop
    self.queue.acquire()
207 e92376d7 Iustin Pop
    try:
208 e92376d7 Iustin Pop
      self.opcode.status = constants.OP_STATUS_RUNNING
209 e92376d7 Iustin Pop
    finally:
210 e92376d7 Iustin Pop
      self.queue.release()
211 e92376d7 Iustin Pop
212 85f03e0d Michael Hanselmann
  def RunTask(self, job):
213 e2715f69 Michael Hanselmann
    """Job executor.
214 e2715f69 Michael Hanselmann

215 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
216 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
217 e2715f69 Michael Hanselmann

218 e2715f69 Michael Hanselmann
    """
219 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
220 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
221 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
222 e92376d7 Iustin Pop
    self.queue = queue = job.queue
223 e2715f69 Michael Hanselmann
    try:
224 85f03e0d Michael Hanselmann
      try:
225 85f03e0d Michael Hanselmann
        count = len(job.ops)
226 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
227 85f03e0d Michael Hanselmann
          try:
228 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
229 85f03e0d Michael Hanselmann
230 85f03e0d Michael Hanselmann
            queue.acquire()
231 85f03e0d Michael Hanselmann
            try:
232 85f03e0d Michael Hanselmann
              job.run_op_index = idx
233 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
234 85f03e0d Michael Hanselmann
              op.result = None
235 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
236 c56ec146 Iustin Pop
              if idx == 0: # first opcode
237 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
238 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
239 85f03e0d Michael Hanselmann
240 38206f3c Iustin Pop
              input_opcode = op.input
241 85f03e0d Michael Hanselmann
            finally:
242 85f03e0d Michael Hanselmann
              queue.release()
243 85f03e0d Michael Hanselmann
244 dfe57c22 Michael Hanselmann
            def _Log(*args):
245 6c5a7090 Michael Hanselmann
              """Append a log entry.
246 6c5a7090 Michael Hanselmann

247 6c5a7090 Michael Hanselmann
              """
248 6c5a7090 Michael Hanselmann
              assert len(args) < 3
249 6c5a7090 Michael Hanselmann
250 6c5a7090 Michael Hanselmann
              if len(args) == 1:
251 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
252 6c5a7090 Michael Hanselmann
                log_msg = args[0]
253 6c5a7090 Michael Hanselmann
              else:
254 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
255 6c5a7090 Michael Hanselmann
256 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
257 6c5a7090 Michael Hanselmann
              # precision.
258 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
259 dfe57c22 Michael Hanselmann
260 6c5a7090 Michael Hanselmann
              queue.acquire()
261 dfe57c22 Michael Hanselmann
              try:
262 6c5a7090 Michael Hanselmann
                job.log_serial += 1
263 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
264 6c5a7090 Michael Hanselmann
265 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
266 dfe57c22 Michael Hanselmann
              finally:
267 6c5a7090 Michael Hanselmann
                queue.release()
268 dfe57c22 Michael Hanselmann
269 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
270 e92376d7 Iustin Pop
            self.opcode = op
271 e92376d7 Iustin Pop
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
272 85f03e0d Michael Hanselmann
273 85f03e0d Michael Hanselmann
            queue.acquire()
274 85f03e0d Michael Hanselmann
            try:
275 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
276 85f03e0d Michael Hanselmann
              op.result = result
277 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
278 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
279 85f03e0d Michael Hanselmann
            finally:
280 85f03e0d Michael Hanselmann
              queue.release()
281 85f03e0d Michael Hanselmann
282 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
283 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
284 85f03e0d Michael Hanselmann
          except Exception, err:
285 85f03e0d Michael Hanselmann
            queue.acquire()
286 85f03e0d Michael Hanselmann
            try:
287 85f03e0d Michael Hanselmann
              try:
288 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
289 85f03e0d Michael Hanselmann
                op.result = str(err)
290 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
291 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
292 85f03e0d Michael Hanselmann
              finally:
293 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
294 85f03e0d Michael Hanselmann
            finally:
295 85f03e0d Michael Hanselmann
              queue.release()
296 85f03e0d Michael Hanselmann
            raise
297 85f03e0d Michael Hanselmann
298 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
299 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
300 85f03e0d Michael Hanselmann
      except:
301 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
302 e2715f69 Michael Hanselmann
    finally:
303 85f03e0d Michael Hanselmann
      queue.acquire()
304 85f03e0d Michael Hanselmann
      try:
305 65548ed5 Michael Hanselmann
        try:
306 65548ed5 Michael Hanselmann
          job.run_op_idx = -1
307 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
308 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
309 65548ed5 Michael Hanselmann
        finally:
310 65548ed5 Michael Hanselmann
          job_id = job.id
311 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
312 85f03e0d Michael Hanselmann
      finally:
313 85f03e0d Michael Hanselmann
        queue.release()
314 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
315 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
316 e2715f69 Michael Hanselmann
317 e2715f69 Michael Hanselmann
318 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
319 5bdce580 Michael Hanselmann
  def __init__(self, queue):
320 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
321 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
322 5bdce580 Michael Hanselmann
    self.queue = queue
323 e2715f69 Michael Hanselmann
324 e2715f69 Michael Hanselmann
325 85f03e0d Michael Hanselmann
class JobQueue(object):
326 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
327 f1da30e6 Michael Hanselmann
328 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
329 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
330 db37da70 Michael Hanselmann

331 db37da70 Michael Hanselmann
    This function should be used for all "public" functions. That is, functions
332 db37da70 Michael Hanselmann
    usually called from other classes.
333 db37da70 Michael Hanselmann

334 db37da70 Michael Hanselmann
    Important: Use this decorator only after utils.LockedMethod!
335 db37da70 Michael Hanselmann

336 db37da70 Michael Hanselmann
    Example:
337 db37da70 Michael Hanselmann
      @utils.LockedMethod
338 db37da70 Michael Hanselmann
      @_RequireOpenQueue
339 db37da70 Michael Hanselmann
      def Example(self):
340 db37da70 Michael Hanselmann
        pass
341 db37da70 Michael Hanselmann

342 db37da70 Michael Hanselmann
    """
343 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
344 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
345 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
346 db37da70 Michael Hanselmann
    return wrapper
347 db37da70 Michael Hanselmann
348 85f03e0d Michael Hanselmann
  def __init__(self, context):
349 5bdce580 Michael Hanselmann
    self.context = context
350 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
351 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
352 f1da30e6 Michael Hanselmann
353 85f03e0d Michael Hanselmann
    # Locking
354 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
355 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
356 85f03e0d Michael Hanselmann
    self.release = self._lock.release
357 85f03e0d Michael Hanselmann
358 04ab05ce Michael Hanselmann
    # Initialize
359 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
360 f1da30e6 Michael Hanselmann
361 04ab05ce Michael Hanselmann
    # Read serial file
362 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
363 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
364 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
365 c4beba1c Iustin Pop
366 23752136 Michael Hanselmann
    # Get initial list of nodes
367 8e00939c Michael Hanselmann
    self._nodes = set(self.context.cfg.GetNodeList())
368 8e00939c Michael Hanselmann
369 8e00939c Michael Hanselmann
    # Remove master node
370 8e00939c Michael Hanselmann
    try:
371 8e00939c Michael Hanselmann
      self._nodes.remove(self._my_hostname)
372 8e00939c Michael Hanselmann
    except ValueError:
373 8e00939c Michael Hanselmann
      pass
374 23752136 Michael Hanselmann
375 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
376 23752136 Michael Hanselmann
377 85f03e0d Michael Hanselmann
    # Setup worker pool
378 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
379 85f03e0d Michael Hanselmann
380 85f03e0d Michael Hanselmann
    # We need to lock here because WorkerPool.AddTask() may start a job while
381 85f03e0d Michael Hanselmann
    # we're still doing our work.
382 85f03e0d Michael Hanselmann
    self.acquire()
383 85f03e0d Michael Hanselmann
    try:
384 85f03e0d Michael Hanselmann
      for job in self._GetJobsUnlocked(None):
385 85f03e0d Michael Hanselmann
        status = job.CalcStatus()
386 85f03e0d Michael Hanselmann
387 85f03e0d Michael Hanselmann
        if status in (constants.JOB_STATUS_QUEUED, ):
388 85f03e0d Michael Hanselmann
          self._wpool.AddTask(job)
389 85f03e0d Michael Hanselmann
390 e92376d7 Iustin Pop
        elif status in (constants.JOB_STATUS_RUNNING,
391 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
392 85f03e0d Michael Hanselmann
          logging.warning("Unfinished job %s found: %s", job.id, job)
393 85f03e0d Michael Hanselmann
          try:
394 85f03e0d Michael Hanselmann
            for op in job.ops:
395 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_ERROR
396 85f03e0d Michael Hanselmann
              op.result = "Unclean master daemon shutdown"
397 85f03e0d Michael Hanselmann
          finally:
398 85f03e0d Michael Hanselmann
            self.UpdateJobUnlocked(job)
399 85f03e0d Michael Hanselmann
    finally:
400 85f03e0d Michael Hanselmann
      self.release()
401 85f03e0d Michael Hanselmann
402 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
403 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
404 d2e03a33 Michael Hanselmann
  def AddNode(self, node_name):
405 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
406 23752136 Michael Hanselmann
407 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
408 72737a7f Iustin Pop
    RpcRunner.call_jobqueue_purge(node_name)
409 23752136 Michael Hanselmann
410 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
411 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
412 23752136 Michael Hanselmann
413 d2e03a33 Michael Hanselmann
    # Upload current serial file
414 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
415 d2e03a33 Michael Hanselmann
416 d2e03a33 Michael Hanselmann
    for file_name in files:
417 9f774ee8 Michael Hanselmann
      # Read file content
418 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
419 9f774ee8 Michael Hanselmann
      try:
420 9f774ee8 Michael Hanselmann
        content = fd.read()
421 9f774ee8 Michael Hanselmann
      finally:
422 9f774ee8 Michael Hanselmann
        fd.close()
423 9f774ee8 Michael Hanselmann
424 72737a7f Iustin Pop
      result = RpcRunner.call_jobqueue_update([node_name], file_name, content)
425 d2e03a33 Michael Hanselmann
      if not result[node_name]:
426 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
427 d2e03a33 Michael Hanselmann
428 d2e03a33 Michael Hanselmann
    self._nodes.add(node_name)
429 d2e03a33 Michael Hanselmann
430 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
431 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
432 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
433 23752136 Michael Hanselmann
    try:
434 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
435 d2e03a33 Michael Hanselmann
      self._nodes.remove(node_name)
436 d2e03a33 Michael Hanselmann
    except KeyError:
437 23752136 Michael Hanselmann
      pass
438 23752136 Michael Hanselmann
439 e74798c1 Michael Hanselmann
  def _CheckRpcResult(self, result, nodes, failmsg):
440 e74798c1 Michael Hanselmann
    failed = []
441 e74798c1 Michael Hanselmann
    success = []
442 e74798c1 Michael Hanselmann
443 e74798c1 Michael Hanselmann
    for node in nodes:
444 e74798c1 Michael Hanselmann
      if result[node]:
445 e74798c1 Michael Hanselmann
        success.append(node)
446 e74798c1 Michael Hanselmann
      else:
447 e74798c1 Michael Hanselmann
        failed.append(node)
448 e74798c1 Michael Hanselmann
449 e74798c1 Michael Hanselmann
    if failed:
450 e74798c1 Michael Hanselmann
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
451 e74798c1 Michael Hanselmann
452 e74798c1 Michael Hanselmann
    # +1 for the master node
453 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
454 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
455 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
456 e74798c1 Michael Hanselmann
457 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
458 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
459 8e00939c Michael Hanselmann

460 8e00939c Michael Hanselmann
    """
461 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
462 8e00939c Michael Hanselmann
463 72737a7f Iustin Pop
    result = RpcRunner.call_jobqueue_update(self._nodes, file_name, data)
464 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
465 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
466 23752136 Michael Hanselmann
467 abc1f2ce Michael Hanselmann
  def _RenameFileUnlocked(self, old, new):
468 abc1f2ce Michael Hanselmann
    os.rename(old, new)
469 abc1f2ce Michael Hanselmann
470 72737a7f Iustin Pop
    result = RpcRunner.call_jobqueue_rename(self._nodes, old, new)
471 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
472 e74798c1 Michael Hanselmann
                         "Moving %s to %s" % (old, new))
473 abc1f2ce Michael Hanselmann
474 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
475 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
476 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
477 85f03e0d Michael Hanselmann
    if job_id < 0:
478 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
479 85f03e0d Michael Hanselmann
480 85f03e0d Michael Hanselmann
    return str(job_id)
481 85f03e0d Michael Hanselmann
482 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
483 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
484 f1da30e6 Michael Hanselmann

485 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
486 f1da30e6 Michael Hanselmann

487 f1da30e6 Michael Hanselmann
    Returns: A string representing the job identifier.
488 f1da30e6 Michael Hanselmann

489 f1da30e6 Michael Hanselmann
    """
490 f1da30e6 Michael Hanselmann
    # New number
491 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
492 f1da30e6 Michael Hanselmann
493 f1da30e6 Michael Hanselmann
    # Write to file
494 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
495 23752136 Michael Hanselmann
                                        "%s\n" % serial)
496 f1da30e6 Michael Hanselmann
497 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
498 f1da30e6 Michael Hanselmann
    self._last_serial = serial
499 f1da30e6 Michael Hanselmann
500 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
501 f1da30e6 Michael Hanselmann
502 85f03e0d Michael Hanselmann
  @staticmethod
503 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
504 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
505 f1da30e6 Michael Hanselmann
506 85f03e0d Michael Hanselmann
  @staticmethod
507 85f03e0d Michael Hanselmann
  def _GetArchivedJobPath(job_id):
508 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
509 0cb94105 Michael Hanselmann
510 85f03e0d Michael Hanselmann
  @classmethod
511 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
512 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
513 fae737ac Michael Hanselmann
    if m:
514 fae737ac Michael Hanselmann
      return m.group(1)
515 fae737ac Michael Hanselmann
    else:
516 fae737ac Michael Hanselmann
      return None
517 fae737ac Michael Hanselmann
518 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
519 911a495b Iustin Pop
    """Return all known job IDs.
520 911a495b Iustin Pop

521 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
522 911a495b Iustin Pop
    included. Currently this argument is unused.
523 911a495b Iustin Pop

524 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
525 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
526 ac0930b9 Iustin Pop
    extra IDs).
527 ac0930b9 Iustin Pop

528 911a495b Iustin Pop
    """
529 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
530 3b87986e Iustin Pop
    jlist = utils.NiceSort(jlist)
531 f0d874fe Iustin Pop
    return jlist
532 911a495b Iustin Pop
533 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
534 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
535 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
536 f1da30e6 Michael Hanselmann
537 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
538 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
539 5685c1a5 Michael Hanselmann
    if job:
540 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
541 5685c1a5 Michael Hanselmann
      return job
542 ac0930b9 Iustin Pop
543 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
544 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
545 f1da30e6 Michael Hanselmann
    try:
546 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
547 f1da30e6 Michael Hanselmann
    except IOError, err:
548 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
549 f1da30e6 Michael Hanselmann
        return None
550 f1da30e6 Michael Hanselmann
      raise
551 f1da30e6 Michael Hanselmann
    try:
552 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
553 f1da30e6 Michael Hanselmann
    finally:
554 f1da30e6 Michael Hanselmann
      fd.close()
555 f1da30e6 Michael Hanselmann
556 ac0930b9 Iustin Pop
    job = _QueuedJob.Restore(self, data)
557 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
558 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
559 ac0930b9 Iustin Pop
    return job
560 f1da30e6 Michael Hanselmann
561 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
562 911a495b Iustin Pop
    if not job_ids:
563 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
564 f1da30e6 Michael Hanselmann
565 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
566 f1da30e6 Michael Hanselmann
567 686d7433 Iustin Pop
  @staticmethod
568 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
569 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
570 686d7433 Iustin Pop

571 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
572 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
573 686d7433 Iustin Pop

574 686d7433 Iustin Pop
    """
575 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
576 686d7433 Iustin Pop
577 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
578 db37da70 Michael Hanselmann
  @_RequireOpenQueue
579 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
580 85f03e0d Michael Hanselmann
    """Create and store a new job.
581 f1da30e6 Michael Hanselmann

582 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
583 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
584 c3f0a12f Iustin Pop

585 c3f0a12f Iustin Pop
    @type ops: list
586 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
587 c3f0a12f Iustin Pop

588 c3f0a12f Iustin Pop
    """
589 686d7433 Iustin Pop
    if self._IsQueueMarkedDrain():
590 686d7433 Iustin Pop
      raise errors.JobQueueDrainError()
591 f1da30e6 Michael Hanselmann
    # Get job identifier
592 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
593 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
594 f1da30e6 Michael Hanselmann
595 f1da30e6 Michael Hanselmann
    # Write to disk
596 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
597 f1da30e6 Michael Hanselmann
598 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
599 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
600 ac0930b9 Iustin Pop
601 85f03e0d Michael Hanselmann
    # Add to worker pool
602 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
603 85f03e0d Michael Hanselmann
604 85f03e0d Michael Hanselmann
    return job.id
605 f1da30e6 Michael Hanselmann
606 db37da70 Michael Hanselmann
  @_RequireOpenQueue
607 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
608 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
609 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
610 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
611 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
612 ac0930b9 Iustin Pop
613 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
614 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
615 dfe57c22 Michael Hanselmann
616 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
617 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
618 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
619 5c735209 Iustin Pop
                        timeout):
620 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
621 6c5a7090 Michael Hanselmann

622 6c5a7090 Michael Hanselmann
    @type job_id: string
623 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
624 6c5a7090 Michael Hanselmann
    @type fields: list of strings
625 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
626 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
627 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
628 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
629 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
630 5c735209 Iustin Pop
    @type timeout: float
631 5c735209 Iustin Pop
    @param timeout: maximum time to wait
632 6c5a7090 Michael Hanselmann

633 6c5a7090 Michael Hanselmann
    """
634 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
635 5c735209 Iustin Pop
    end_time = time.time() + timeout
636 dfe57c22 Michael Hanselmann
    while True:
637 5c735209 Iustin Pop
      delta_time = end_time - time.time()
638 5c735209 Iustin Pop
      if delta_time < 0:
639 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
640 5c735209 Iustin Pop
641 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
642 6c5a7090 Michael Hanselmann
      if not job:
643 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
644 6c5a7090 Michael Hanselmann
        break
645 dfe57c22 Michael Hanselmann
646 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
647 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
648 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
649 dfe57c22 Michael Hanselmann
650 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
651 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
652 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
653 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
654 dfe57c22 Michael Hanselmann
      # significantly different.
655 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
656 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
657 dfe57c22 Michael Hanselmann
658 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
659 e92376d7 Iustin Pop
                        constants.JOB_STATUS_RUNNING,
660 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
661 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
662 6c5a7090 Michael Hanselmann
        # no changes.
663 dfe57c22 Michael Hanselmann
        break
664 dfe57c22 Michael Hanselmann
665 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
666 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
667 6c5a7090 Michael Hanselmann
        break
668 6c5a7090 Michael Hanselmann
669 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
670 6c5a7090 Michael Hanselmann
671 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
672 5c735209 Iustin Pop
      job.change.wait(delta_time)
673 dfe57c22 Michael Hanselmann
674 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
675 dfe57c22 Michael Hanselmann
676 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
677 dfe57c22 Michael Hanselmann
678 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
679 db37da70 Michael Hanselmann
  @_RequireOpenQueue
680 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
681 188c5e0a Michael Hanselmann
    """Cancels a job.
682 188c5e0a Michael Hanselmann

683 188c5e0a Michael Hanselmann
    @type job_id: string
684 188c5e0a Michael Hanselmann
    @param job_id: Job ID of job to be cancelled.
685 188c5e0a Michael Hanselmann

686 188c5e0a Michael Hanselmann
    """
687 188c5e0a Michael Hanselmann
    logging.debug("Cancelling job %s", job_id)
688 188c5e0a Michael Hanselmann
689 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
690 188c5e0a Michael Hanselmann
    if not job:
691 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
692 188c5e0a Michael Hanselmann
      return
693 188c5e0a Michael Hanselmann
694 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
695 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
696 188c5e0a Michael Hanselmann
      return
697 188c5e0a Michael Hanselmann
698 85f03e0d Michael Hanselmann
    try:
699 85f03e0d Michael Hanselmann
      for op in job.ops:
700 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
701 85f03e0d Michael Hanselmann
        op.result = "Job cancelled by request"
702 85f03e0d Michael Hanselmann
    finally:
703 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
704 188c5e0a Michael Hanselmann
705 db37da70 Michael Hanselmann
  @_RequireOpenQueue
706 07cd723a Iustin Pop
  def _ArchiveJobUnlocked(self, job_id):
707 c609f802 Michael Hanselmann
    """Archives a job.
708 c609f802 Michael Hanselmann

709 c609f802 Michael Hanselmann
    @type job_id: string
710 c609f802 Michael Hanselmann
    @param job_id: Job ID of job to be archived.
711 c609f802 Michael Hanselmann

712 c609f802 Michael Hanselmann
    """
713 07cd723a Iustin Pop
    logging.info("Archiving job %s", job_id)
714 c609f802 Michael Hanselmann
715 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
716 c609f802 Michael Hanselmann
    if not job:
717 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
718 c609f802 Michael Hanselmann
      return
719 c609f802 Michael Hanselmann
720 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
721 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_SUCCESS,
722 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_ERROR):
723 85f03e0d Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
724 c609f802 Michael Hanselmann
      return
725 c609f802 Michael Hanselmann
726 5685c1a5 Michael Hanselmann
    old = self._GetJobPath(job.id)
727 5685c1a5 Michael Hanselmann
    new = self._GetArchivedJobPath(job.id)
728 c609f802 Michael Hanselmann
729 5685c1a5 Michael Hanselmann
    self._RenameFileUnlocked(old, new)
730 c609f802 Michael Hanselmann
731 5685c1a5 Michael Hanselmann
    logging.debug("Successfully archived job %s", job.id)
732 f1da30e6 Michael Hanselmann
733 07cd723a Iustin Pop
  @utils.LockedMethod
734 07cd723a Iustin Pop
  @_RequireOpenQueue
735 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
736 07cd723a Iustin Pop
    """Archives a job.
737 07cd723a Iustin Pop

738 07cd723a Iustin Pop
    @type job_id: string
739 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
740 07cd723a Iustin Pop

741 07cd723a Iustin Pop
    """
742 07cd723a Iustin Pop
    return self._ArchiveJobUnlocked(job_id)
743 07cd723a Iustin Pop
744 07cd723a Iustin Pop
  @utils.LockedMethod
745 07cd723a Iustin Pop
  @_RequireOpenQueue
746 07cd723a Iustin Pop
  def AutoArchiveJobs(self, age):
747 07cd723a Iustin Pop
    """Archives all jobs based on age.
748 07cd723a Iustin Pop

749 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
750 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
751 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
752 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
753 07cd723a Iustin Pop

754 07cd723a Iustin Pop
    @type age: int
755 07cd723a Iustin Pop
    @param age: the minimum age in seconds
756 07cd723a Iustin Pop

757 07cd723a Iustin Pop
    """
758 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
759 07cd723a Iustin Pop
760 07cd723a Iustin Pop
    now = time.time()
761 07cd723a Iustin Pop
    for jid in self._GetJobIDsUnlocked(archived=False):
762 07cd723a Iustin Pop
      job = self._LoadJobUnlocked(jid)
763 07cd723a Iustin Pop
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
764 07cd723a Iustin Pop
                                  constants.OP_STATUS_ERROR,
765 07cd723a Iustin Pop
                                  constants.OP_STATUS_CANCELED):
766 07cd723a Iustin Pop
        continue
767 07cd723a Iustin Pop
      if job.end_timestamp is None:
768 07cd723a Iustin Pop
        if job.start_timestamp is None:
769 07cd723a Iustin Pop
          job_age = job.received_timestamp
770 07cd723a Iustin Pop
        else:
771 07cd723a Iustin Pop
          job_age = job.start_timestamp
772 07cd723a Iustin Pop
      else:
773 07cd723a Iustin Pop
        job_age = job.end_timestamp
774 07cd723a Iustin Pop
775 07cd723a Iustin Pop
      if age == -1 or now - job_age[0] > age:
776 07cd723a Iustin Pop
        self._ArchiveJobUnlocked(jid)
777 07cd723a Iustin Pop
778 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
779 e2715f69 Michael Hanselmann
    row = []
780 e2715f69 Michael Hanselmann
    for fname in fields:
781 e2715f69 Michael Hanselmann
      if fname == "id":
782 e2715f69 Michael Hanselmann
        row.append(job.id)
783 e2715f69 Michael Hanselmann
      elif fname == "status":
784 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
785 af30b2fd Michael Hanselmann
      elif fname == "ops":
786 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
787 af30b2fd Michael Hanselmann
      elif fname == "opresult":
788 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
789 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
790 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
791 5b23c34c Iustin Pop
      elif fname == "oplog":
792 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
793 c56ec146 Iustin Pop
      elif fname == "opstart":
794 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
795 c56ec146 Iustin Pop
      elif fname == "opend":
796 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
797 c56ec146 Iustin Pop
      elif fname == "received_ts":
798 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
799 c56ec146 Iustin Pop
      elif fname == "start_ts":
800 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
801 c56ec146 Iustin Pop
      elif fname == "end_ts":
802 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
803 60dd1473 Iustin Pop
      elif fname == "summary":
804 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
805 e2715f69 Michael Hanselmann
      else:
806 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
807 e2715f69 Michael Hanselmann
    return row
808 e2715f69 Michael Hanselmann
809 85f03e0d Michael Hanselmann
  @utils.LockedMethod
810 db37da70 Michael Hanselmann
  @_RequireOpenQueue
811 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
812 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
813 e2715f69 Michael Hanselmann

814 e2715f69 Michael Hanselmann
    Args:
815 e2715f69 Michael Hanselmann
    - job_ids: Sequence of job identifiers or None for all
816 e2715f69 Michael Hanselmann
    - fields: Names of fields to return
817 e2715f69 Michael Hanselmann

818 e2715f69 Michael Hanselmann
    """
819 85f03e0d Michael Hanselmann
    jobs = []
820 e2715f69 Michael Hanselmann
821 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
822 85f03e0d Michael Hanselmann
      if job is None:
823 85f03e0d Michael Hanselmann
        jobs.append(None)
824 85f03e0d Michael Hanselmann
      else:
825 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
826 e2715f69 Michael Hanselmann
827 85f03e0d Michael Hanselmann
    return jobs
828 e2715f69 Michael Hanselmann
829 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
830 db37da70 Michael Hanselmann
  @_RequireOpenQueue
831 e2715f69 Michael Hanselmann
  def Shutdown(self):
832 e2715f69 Michael Hanselmann
    """Stops the job queue.
833 e2715f69 Michael Hanselmann

834 e2715f69 Michael Hanselmann
    """
835 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
836 85f03e0d Michael Hanselmann
837 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
838 04ab05ce Michael Hanselmann
    self._queue_lock = None