Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 3ccafd0e

History | View | Annotate | Download (24.1 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 6c5a7090 Michael Hanselmann
Locking:
25 6c5a7090 Michael Hanselmann
There's a single, large lock in the JobQueue class. It's used by all other
26 6c5a7090 Michael Hanselmann
classes in this module.
27 6c5a7090 Michael Hanselmann

28 6c5a7090 Michael Hanselmann
"""
29 498ae1cc Iustin Pop
30 f1da30e6 Michael Hanselmann
import os
31 e2715f69 Michael Hanselmann
import logging
32 e2715f69 Michael Hanselmann
import threading
33 f1da30e6 Michael Hanselmann
import errno
34 f1da30e6 Michael Hanselmann
import re
35 f1048938 Iustin Pop
import time
36 5685c1a5 Michael Hanselmann
import weakref
37 498ae1cc Iustin Pop
38 e2715f69 Michael Hanselmann
from ganeti import constants
39 f1da30e6 Michael Hanselmann
from ganeti import serializer
40 e2715f69 Michael Hanselmann
from ganeti import workerpool
41 f1da30e6 Michael Hanselmann
from ganeti import opcodes
42 7a1ecaed Iustin Pop
from ganeti import errors
43 e2715f69 Michael Hanselmann
from ganeti import mcpu
44 7996a135 Iustin Pop
from ganeti import utils
45 04ab05ce Michael Hanselmann
from ganeti import jstore
46 c3f0a12f Iustin Pop
from ganeti import rpc
47 e2715f69 Michael Hanselmann
48 72737a7f Iustin Pop
from ganeti.rpc import RpcRunner
49 e2715f69 Michael Hanselmann
50 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
51 e2715f69 Michael Hanselmann
52 498ae1cc Iustin Pop
53 70552c46 Michael Hanselmann
def TimeStampNow():
54 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
55 70552c46 Michael Hanselmann
56 70552c46 Michael Hanselmann
57 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
58 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
59 e2715f69 Michael Hanselmann

60 f1048938 Iustin Pop
  The 'log' attribute holds the execution log and consists of tuples
61 6c5a7090 Michael Hanselmann
  of the form (log_serial, timestamp, level, message).
62 f1048938 Iustin Pop

63 e2715f69 Michael Hanselmann
  """
64 85f03e0d Michael Hanselmann
  def __init__(self, op):
65 85f03e0d Michael Hanselmann
    self.input = op
66 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
67 85f03e0d Michael Hanselmann
    self.result = None
68 85f03e0d Michael Hanselmann
    self.log = []
69 70552c46 Michael Hanselmann
    self.start_timestamp = None
70 70552c46 Michael Hanselmann
    self.end_timestamp = None
71 f1da30e6 Michael Hanselmann
72 f1da30e6 Michael Hanselmann
  @classmethod
73 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
74 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
75 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
76 85f03e0d Michael Hanselmann
    obj.status = state["status"]
77 85f03e0d Michael Hanselmann
    obj.result = state["result"]
78 85f03e0d Michael Hanselmann
    obj.log = state["log"]
79 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
80 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
81 f1da30e6 Michael Hanselmann
    return obj
82 f1da30e6 Michael Hanselmann
83 f1da30e6 Michael Hanselmann
  def Serialize(self):
84 6c5a7090 Michael Hanselmann
    return {
85 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
86 6c5a7090 Michael Hanselmann
      "status": self.status,
87 6c5a7090 Michael Hanselmann
      "result": self.result,
88 6c5a7090 Michael Hanselmann
      "log": self.log,
89 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
90 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
91 6c5a7090 Michael Hanselmann
      }
92 f1048938 Iustin Pop
93 e2715f69 Michael Hanselmann
94 e2715f69 Michael Hanselmann
class _QueuedJob(object):
95 e2715f69 Michael Hanselmann
  """In-memory job representation.
96 e2715f69 Michael Hanselmann

97 6c5a7090 Michael Hanselmann
  This is what we use to track the user-submitted jobs. Locking must be taken
98 6c5a7090 Michael Hanselmann
  care of by users of this class.
99 e2715f69 Michael Hanselmann

100 e2715f69 Michael Hanselmann
  """
101 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
102 e2715f69 Michael Hanselmann
    if not ops:
103 e2715f69 Michael Hanselmann
      # TODO
104 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
105 e2715f69 Michael Hanselmann
106 85f03e0d Michael Hanselmann
    self.queue = queue
107 f1da30e6 Michael Hanselmann
    self.id = job_id
108 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
109 85f03e0d Michael Hanselmann
    self.run_op_index = -1
110 6c5a7090 Michael Hanselmann
    self.log_serial = 0
111 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
112 c56ec146 Iustin Pop
    self.start_timestamp = None
113 c56ec146 Iustin Pop
    self.end_timestamp = None
114 6c5a7090 Michael Hanselmann
115 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
116 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
117 f1da30e6 Michael Hanselmann
118 f1da30e6 Michael Hanselmann
  @classmethod
119 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
120 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
121 85f03e0d Michael Hanselmann
    obj.queue = queue
122 85f03e0d Michael Hanselmann
    obj.id = state["id"]
123 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
124 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
125 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
126 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
127 6c5a7090 Michael Hanselmann
128 6c5a7090 Michael Hanselmann
    obj.ops = []
129 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
130 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
131 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
132 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
133 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
134 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
135 6c5a7090 Michael Hanselmann
136 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
137 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
138 6c5a7090 Michael Hanselmann
139 f1da30e6 Michael Hanselmann
    return obj
140 f1da30e6 Michael Hanselmann
141 f1da30e6 Michael Hanselmann
  def Serialize(self):
142 f1da30e6 Michael Hanselmann
    return {
143 f1da30e6 Michael Hanselmann
      "id": self.id,
144 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
145 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
146 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
147 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
148 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
149 f1da30e6 Michael Hanselmann
      }
150 f1da30e6 Michael Hanselmann
151 85f03e0d Michael Hanselmann
  def CalcStatus(self):
152 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
153 e2715f69 Michael Hanselmann
154 e2715f69 Michael Hanselmann
    all_success = True
155 85f03e0d Michael Hanselmann
    for op in self.ops:
156 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
157 e2715f69 Michael Hanselmann
        continue
158 e2715f69 Michael Hanselmann
159 e2715f69 Michael Hanselmann
      all_success = False
160 e2715f69 Michael Hanselmann
161 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
162 e2715f69 Michael Hanselmann
        pass
163 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
164 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
165 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
166 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
167 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
168 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
169 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
170 f1da30e6 Michael Hanselmann
        break
171 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
172 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
173 4cb1d919 Michael Hanselmann
        break
174 e2715f69 Michael Hanselmann
175 e2715f69 Michael Hanselmann
    if all_success:
176 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
177 e2715f69 Michael Hanselmann
178 e2715f69 Michael Hanselmann
    return status
179 e2715f69 Michael Hanselmann
180 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
181 6c5a7090 Michael Hanselmann
    if newer_than is None:
182 6c5a7090 Michael Hanselmann
      serial = -1
183 6c5a7090 Michael Hanselmann
    else:
184 6c5a7090 Michael Hanselmann
      serial = newer_than
185 6c5a7090 Michael Hanselmann
186 6c5a7090 Michael Hanselmann
    entries = []
187 6c5a7090 Michael Hanselmann
    for op in self.ops:
188 6c5a7090 Michael Hanselmann
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
189 6c5a7090 Michael Hanselmann
190 6c5a7090 Michael Hanselmann
    return entries
191 6c5a7090 Michael Hanselmann
192 f1048938 Iustin Pop
193 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
194 e92376d7 Iustin Pop
  def _NotifyStart(self):
195 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
196 e92376d7 Iustin Pop

197 e92376d7 Iustin Pop
    This is called from the mcpu code as a notifier function, when the
198 e92376d7 Iustin Pop
    LU is finally about to start the Exec() method. Of course, to have
199 e92376d7 Iustin Pop
    end-user visible results, the opcode must be initially (before
200 e92376d7 Iustin Pop
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
201 e92376d7 Iustin Pop

202 e92376d7 Iustin Pop
    """
203 e92376d7 Iustin Pop
    assert self.queue, "Queue attribute is missing"
204 e92376d7 Iustin Pop
    assert self.opcode, "Opcode attribute is missing"
205 e92376d7 Iustin Pop
206 e92376d7 Iustin Pop
    self.queue.acquire()
207 e92376d7 Iustin Pop
    try:
208 e92376d7 Iustin Pop
      self.opcode.status = constants.OP_STATUS_RUNNING
209 e92376d7 Iustin Pop
    finally:
210 e92376d7 Iustin Pop
      self.queue.release()
211 e92376d7 Iustin Pop
212 85f03e0d Michael Hanselmann
  def RunTask(self, job):
213 e2715f69 Michael Hanselmann
    """Job executor.
214 e2715f69 Michael Hanselmann

215 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
216 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
217 e2715f69 Michael Hanselmann

218 e2715f69 Michael Hanselmann
    """
219 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
220 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
221 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
222 e92376d7 Iustin Pop
    self.queue = queue = job.queue
223 e2715f69 Michael Hanselmann
    try:
224 85f03e0d Michael Hanselmann
      try:
225 85f03e0d Michael Hanselmann
        count = len(job.ops)
226 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
227 85f03e0d Michael Hanselmann
          try:
228 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
229 85f03e0d Michael Hanselmann
230 85f03e0d Michael Hanselmann
            queue.acquire()
231 85f03e0d Michael Hanselmann
            try:
232 85f03e0d Michael Hanselmann
              job.run_op_index = idx
233 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
234 85f03e0d Michael Hanselmann
              op.result = None
235 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
236 c56ec146 Iustin Pop
              if idx == 0: # first opcode
237 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
238 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
239 85f03e0d Michael Hanselmann
240 38206f3c Iustin Pop
              input_opcode = op.input
241 85f03e0d Michael Hanselmann
            finally:
242 85f03e0d Michael Hanselmann
              queue.release()
243 85f03e0d Michael Hanselmann
244 dfe57c22 Michael Hanselmann
            def _Log(*args):
245 6c5a7090 Michael Hanselmann
              """Append a log entry.
246 6c5a7090 Michael Hanselmann

247 6c5a7090 Michael Hanselmann
              """
248 6c5a7090 Michael Hanselmann
              assert len(args) < 3
249 6c5a7090 Michael Hanselmann
250 6c5a7090 Michael Hanselmann
              if len(args) == 1:
251 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
252 6c5a7090 Michael Hanselmann
                log_msg = args[0]
253 6c5a7090 Michael Hanselmann
              else:
254 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
255 6c5a7090 Michael Hanselmann
256 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
257 6c5a7090 Michael Hanselmann
              # precision.
258 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
259 dfe57c22 Michael Hanselmann
260 6c5a7090 Michael Hanselmann
              queue.acquire()
261 dfe57c22 Michael Hanselmann
              try:
262 6c5a7090 Michael Hanselmann
                job.log_serial += 1
263 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
264 6c5a7090 Michael Hanselmann
265 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
266 dfe57c22 Michael Hanselmann
              finally:
267 6c5a7090 Michael Hanselmann
                queue.release()
268 dfe57c22 Michael Hanselmann
269 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
270 e92376d7 Iustin Pop
            self.opcode = op
271 e92376d7 Iustin Pop
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
272 85f03e0d Michael Hanselmann
273 85f03e0d Michael Hanselmann
            queue.acquire()
274 85f03e0d Michael Hanselmann
            try:
275 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
276 85f03e0d Michael Hanselmann
              op.result = result
277 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
278 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
279 85f03e0d Michael Hanselmann
            finally:
280 85f03e0d Michael Hanselmann
              queue.release()
281 85f03e0d Michael Hanselmann
282 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
283 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
284 85f03e0d Michael Hanselmann
          except Exception, err:
285 85f03e0d Michael Hanselmann
            queue.acquire()
286 85f03e0d Michael Hanselmann
            try:
287 85f03e0d Michael Hanselmann
              try:
288 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
289 85f03e0d Michael Hanselmann
                op.result = str(err)
290 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
291 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
292 85f03e0d Michael Hanselmann
              finally:
293 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
294 85f03e0d Michael Hanselmann
            finally:
295 85f03e0d Michael Hanselmann
              queue.release()
296 85f03e0d Michael Hanselmann
            raise
297 85f03e0d Michael Hanselmann
298 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
299 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
300 85f03e0d Michael Hanselmann
      except:
301 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
302 e2715f69 Michael Hanselmann
    finally:
303 85f03e0d Michael Hanselmann
      queue.acquire()
304 85f03e0d Michael Hanselmann
      try:
305 65548ed5 Michael Hanselmann
        try:
306 65548ed5 Michael Hanselmann
          job.run_op_idx = -1
307 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
308 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
309 65548ed5 Michael Hanselmann
        finally:
310 65548ed5 Michael Hanselmann
          job_id = job.id
311 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
312 85f03e0d Michael Hanselmann
      finally:
313 85f03e0d Michael Hanselmann
        queue.release()
314 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
315 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
316 e2715f69 Michael Hanselmann
317 e2715f69 Michael Hanselmann
318 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
319 5bdce580 Michael Hanselmann
  def __init__(self, queue):
320 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
321 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
322 5bdce580 Michael Hanselmann
    self.queue = queue
323 e2715f69 Michael Hanselmann
324 e2715f69 Michael Hanselmann
325 85f03e0d Michael Hanselmann
class JobQueue(object):
326 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
327 f1da30e6 Michael Hanselmann
328 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
329 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
330 db37da70 Michael Hanselmann

331 db37da70 Michael Hanselmann
    This function should be used for all "public" functions. That is, functions
332 db37da70 Michael Hanselmann
    usually called from other classes.
333 db37da70 Michael Hanselmann

334 db37da70 Michael Hanselmann
    Important: Use this decorator only after utils.LockedMethod!
335 db37da70 Michael Hanselmann

336 db37da70 Michael Hanselmann
    Example:
337 db37da70 Michael Hanselmann
      @utils.LockedMethod
338 db37da70 Michael Hanselmann
      @_RequireOpenQueue
339 db37da70 Michael Hanselmann
      def Example(self):
340 db37da70 Michael Hanselmann
        pass
341 db37da70 Michael Hanselmann

342 db37da70 Michael Hanselmann
    """
343 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
344 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
345 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
346 db37da70 Michael Hanselmann
    return wrapper
347 db37da70 Michael Hanselmann
348 85f03e0d Michael Hanselmann
  def __init__(self, context):
349 5bdce580 Michael Hanselmann
    self.context = context
350 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
351 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
352 f1da30e6 Michael Hanselmann
353 85f03e0d Michael Hanselmann
    # Locking
354 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
355 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
356 85f03e0d Michael Hanselmann
    self.release = self._lock.release
357 85f03e0d Michael Hanselmann
358 04ab05ce Michael Hanselmann
    # Initialize
359 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
360 f1da30e6 Michael Hanselmann
361 04ab05ce Michael Hanselmann
    # Read serial file
362 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
363 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
364 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
365 c4beba1c Iustin Pop
366 23752136 Michael Hanselmann
    # Get initial list of nodes
367 8e00939c Michael Hanselmann
    self._nodes = set(self.context.cfg.GetNodeList())
368 8e00939c Michael Hanselmann
369 8e00939c Michael Hanselmann
    # Remove master node
370 8e00939c Michael Hanselmann
    try:
371 8e00939c Michael Hanselmann
      self._nodes.remove(self._my_hostname)
372 8e00939c Michael Hanselmann
    except ValueError:
373 8e00939c Michael Hanselmann
      pass
374 23752136 Michael Hanselmann
375 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
376 23752136 Michael Hanselmann
377 85f03e0d Michael Hanselmann
    # Setup worker pool
378 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
379 85f03e0d Michael Hanselmann
380 85f03e0d Michael Hanselmann
    # We need to lock here because WorkerPool.AddTask() may start a job while
381 85f03e0d Michael Hanselmann
    # we're still doing our work.
382 85f03e0d Michael Hanselmann
    self.acquire()
383 85f03e0d Michael Hanselmann
    try:
384 85f03e0d Michael Hanselmann
      for job in self._GetJobsUnlocked(None):
385 85f03e0d Michael Hanselmann
        status = job.CalcStatus()
386 85f03e0d Michael Hanselmann
387 85f03e0d Michael Hanselmann
        if status in (constants.JOB_STATUS_QUEUED, ):
388 85f03e0d Michael Hanselmann
          self._wpool.AddTask(job)
389 85f03e0d Michael Hanselmann
390 e92376d7 Iustin Pop
        elif status in (constants.JOB_STATUS_RUNNING,
391 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
392 85f03e0d Michael Hanselmann
          logging.warning("Unfinished job %s found: %s", job.id, job)
393 85f03e0d Michael Hanselmann
          try:
394 85f03e0d Michael Hanselmann
            for op in job.ops:
395 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_ERROR
396 85f03e0d Michael Hanselmann
              op.result = "Unclean master daemon shutdown"
397 85f03e0d Michael Hanselmann
          finally:
398 85f03e0d Michael Hanselmann
            self.UpdateJobUnlocked(job)
399 85f03e0d Michael Hanselmann
    finally:
400 85f03e0d Michael Hanselmann
      self.release()
401 85f03e0d Michael Hanselmann
402 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
403 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
404 d2e03a33 Michael Hanselmann
  def AddNode(self, node_name):
405 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
406 23752136 Michael Hanselmann
407 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
408 72737a7f Iustin Pop
    RpcRunner.call_jobqueue_purge(node_name)
409 23752136 Michael Hanselmann
410 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
411 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
412 23752136 Michael Hanselmann
413 d2e03a33 Michael Hanselmann
    # Upload current serial file
414 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
415 d2e03a33 Michael Hanselmann
416 d2e03a33 Michael Hanselmann
    for file_name in files:
417 9f774ee8 Michael Hanselmann
      # Read file content
418 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
419 9f774ee8 Michael Hanselmann
      try:
420 9f774ee8 Michael Hanselmann
        content = fd.read()
421 9f774ee8 Michael Hanselmann
      finally:
422 9f774ee8 Michael Hanselmann
        fd.close()
423 9f774ee8 Michael Hanselmann
424 72737a7f Iustin Pop
      result = RpcRunner.call_jobqueue_update([node_name], file_name, content)
425 d2e03a33 Michael Hanselmann
      if not result[node_name]:
426 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
427 d2e03a33 Michael Hanselmann
428 d2e03a33 Michael Hanselmann
    self._nodes.add(node_name)
429 d2e03a33 Michael Hanselmann
430 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
431 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
432 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
433 23752136 Michael Hanselmann
    try:
434 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
435 d2e03a33 Michael Hanselmann
      self._nodes.remove(node_name)
436 d2e03a33 Michael Hanselmann
    except KeyError:
437 23752136 Michael Hanselmann
      pass
438 23752136 Michael Hanselmann
439 e74798c1 Michael Hanselmann
  def _CheckRpcResult(self, result, nodes, failmsg):
440 e74798c1 Michael Hanselmann
    failed = []
441 e74798c1 Michael Hanselmann
    success = []
442 e74798c1 Michael Hanselmann
443 e74798c1 Michael Hanselmann
    for node in nodes:
444 e74798c1 Michael Hanselmann
      if result[node]:
445 e74798c1 Michael Hanselmann
        success.append(node)
446 e74798c1 Michael Hanselmann
      else:
447 e74798c1 Michael Hanselmann
        failed.append(node)
448 e74798c1 Michael Hanselmann
449 e74798c1 Michael Hanselmann
    if failed:
450 e74798c1 Michael Hanselmann
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
451 e74798c1 Michael Hanselmann
452 e74798c1 Michael Hanselmann
    # +1 for the master node
453 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
454 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
455 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
456 e74798c1 Michael Hanselmann
457 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
458 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
459 8e00939c Michael Hanselmann

460 8e00939c Michael Hanselmann
    """
461 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
462 8e00939c Michael Hanselmann
463 72737a7f Iustin Pop
    result = RpcRunner.call_jobqueue_update(self._nodes, file_name, data)
464 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
465 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
466 23752136 Michael Hanselmann
467 abc1f2ce Michael Hanselmann
  def _RenameFileUnlocked(self, old, new):
468 abc1f2ce Michael Hanselmann
    os.rename(old, new)
469 abc1f2ce Michael Hanselmann
470 72737a7f Iustin Pop
    result = RpcRunner.call_jobqueue_rename(self._nodes, old, new)
471 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
472 e74798c1 Michael Hanselmann
                         "Moving %s to %s" % (old, new))
473 abc1f2ce Michael Hanselmann
474 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
475 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
476 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
477 85f03e0d Michael Hanselmann
    if job_id < 0:
478 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
479 85f03e0d Michael Hanselmann
480 85f03e0d Michael Hanselmann
    return str(job_id)
481 85f03e0d Michael Hanselmann
482 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
483 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
484 f1da30e6 Michael Hanselmann

485 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
486 f1da30e6 Michael Hanselmann

487 f1da30e6 Michael Hanselmann
    Returns: A string representing the job identifier.
488 f1da30e6 Michael Hanselmann

489 f1da30e6 Michael Hanselmann
    """
490 f1da30e6 Michael Hanselmann
    # New number
491 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
492 f1da30e6 Michael Hanselmann
493 f1da30e6 Michael Hanselmann
    # Write to file
494 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
495 23752136 Michael Hanselmann
                                        "%s\n" % serial)
496 f1da30e6 Michael Hanselmann
497 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
498 f1da30e6 Michael Hanselmann
    self._last_serial = serial
499 f1da30e6 Michael Hanselmann
500 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
501 f1da30e6 Michael Hanselmann
502 85f03e0d Michael Hanselmann
  @staticmethod
503 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
504 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
505 f1da30e6 Michael Hanselmann
506 85f03e0d Michael Hanselmann
  @staticmethod
507 85f03e0d Michael Hanselmann
  def _GetArchivedJobPath(job_id):
508 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
509 0cb94105 Michael Hanselmann
510 85f03e0d Michael Hanselmann
  @classmethod
511 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
512 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
513 fae737ac Michael Hanselmann
    if m:
514 fae737ac Michael Hanselmann
      return m.group(1)
515 fae737ac Michael Hanselmann
    else:
516 fae737ac Michael Hanselmann
      return None
517 fae737ac Michael Hanselmann
518 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
519 911a495b Iustin Pop
    """Return all known job IDs.
520 911a495b Iustin Pop

521 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
522 911a495b Iustin Pop
    included. Currently this argument is unused.
523 911a495b Iustin Pop

524 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
525 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
526 ac0930b9 Iustin Pop
    extra IDs).
527 ac0930b9 Iustin Pop

528 911a495b Iustin Pop
    """
529 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
530 3b87986e Iustin Pop
    jlist = utils.NiceSort(jlist)
531 f0d874fe Iustin Pop
    return jlist
532 911a495b Iustin Pop
533 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
534 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
535 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
536 f1da30e6 Michael Hanselmann
537 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
538 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
539 5685c1a5 Michael Hanselmann
    if job:
540 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
541 5685c1a5 Michael Hanselmann
      return job
542 ac0930b9 Iustin Pop
543 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
544 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
545 f1da30e6 Michael Hanselmann
    try:
546 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
547 f1da30e6 Michael Hanselmann
    except IOError, err:
548 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
549 f1da30e6 Michael Hanselmann
        return None
550 f1da30e6 Michael Hanselmann
      raise
551 f1da30e6 Michael Hanselmann
    try:
552 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
553 f1da30e6 Michael Hanselmann
    finally:
554 f1da30e6 Michael Hanselmann
      fd.close()
555 f1da30e6 Michael Hanselmann
556 ac0930b9 Iustin Pop
    job = _QueuedJob.Restore(self, data)
557 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
558 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
559 ac0930b9 Iustin Pop
    return job
560 f1da30e6 Michael Hanselmann
561 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
562 911a495b Iustin Pop
    if not job_ids:
563 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
564 f1da30e6 Michael Hanselmann
565 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
566 f1da30e6 Michael Hanselmann
567 686d7433 Iustin Pop
  @staticmethod
568 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
569 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
570 686d7433 Iustin Pop

571 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
572 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
573 686d7433 Iustin Pop

574 686d7433 Iustin Pop
    """
575 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
576 686d7433 Iustin Pop
577 3ccafd0e Iustin Pop
  @staticmethod
578 3ccafd0e Iustin Pop
  def SetDrainFlag(drain_flag):
579 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
580 3ccafd0e Iustin Pop

581 3ccafd0e Iustin Pop
    This is similar to the function L{backend.JobQueueSetDrainFlag},
582 3ccafd0e Iustin Pop
    and in the future we might merge them.
583 3ccafd0e Iustin Pop

584 3ccafd0e Iustin Pop
    """
585 3ccafd0e Iustin Pop
    if drain_flag:
586 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
587 3ccafd0e Iustin Pop
    else:
588 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
589 3ccafd0e Iustin Pop
    return True
590 3ccafd0e Iustin Pop
591 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
592 db37da70 Michael Hanselmann
  @_RequireOpenQueue
593 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
594 85f03e0d Michael Hanselmann
    """Create and store a new job.
595 f1da30e6 Michael Hanselmann

596 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
597 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
598 c3f0a12f Iustin Pop

599 c3f0a12f Iustin Pop
    @type ops: list
600 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
601 c3f0a12f Iustin Pop

602 c3f0a12f Iustin Pop
    """
603 686d7433 Iustin Pop
    if self._IsQueueMarkedDrain():
604 686d7433 Iustin Pop
      raise errors.JobQueueDrainError()
605 f1da30e6 Michael Hanselmann
    # Get job identifier
606 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
607 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
608 f1da30e6 Michael Hanselmann
609 f1da30e6 Michael Hanselmann
    # Write to disk
610 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
611 f1da30e6 Michael Hanselmann
612 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
613 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
614 ac0930b9 Iustin Pop
615 85f03e0d Michael Hanselmann
    # Add to worker pool
616 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
617 85f03e0d Michael Hanselmann
618 85f03e0d Michael Hanselmann
    return job.id
619 f1da30e6 Michael Hanselmann
620 db37da70 Michael Hanselmann
  @_RequireOpenQueue
621 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
622 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
623 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
624 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
625 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
626 ac0930b9 Iustin Pop
627 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
628 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
629 dfe57c22 Michael Hanselmann
630 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
631 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
632 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
633 5c735209 Iustin Pop
                        timeout):
634 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
635 6c5a7090 Michael Hanselmann

636 6c5a7090 Michael Hanselmann
    @type job_id: string
637 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
638 6c5a7090 Michael Hanselmann
    @type fields: list of strings
639 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
640 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
641 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
642 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
643 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
644 5c735209 Iustin Pop
    @type timeout: float
645 5c735209 Iustin Pop
    @param timeout: maximum time to wait
646 6c5a7090 Michael Hanselmann

647 6c5a7090 Michael Hanselmann
    """
648 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
649 5c735209 Iustin Pop
    end_time = time.time() + timeout
650 dfe57c22 Michael Hanselmann
    while True:
651 5c735209 Iustin Pop
      delta_time = end_time - time.time()
652 5c735209 Iustin Pop
      if delta_time < 0:
653 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
654 5c735209 Iustin Pop
655 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
656 6c5a7090 Michael Hanselmann
      if not job:
657 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
658 6c5a7090 Michael Hanselmann
        break
659 dfe57c22 Michael Hanselmann
660 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
661 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
662 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
663 dfe57c22 Michael Hanselmann
664 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
665 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
666 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
667 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
668 dfe57c22 Michael Hanselmann
      # significantly different.
669 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
670 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
671 dfe57c22 Michael Hanselmann
672 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
673 e92376d7 Iustin Pop
                        constants.JOB_STATUS_RUNNING,
674 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
675 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
676 6c5a7090 Michael Hanselmann
        # no changes.
677 dfe57c22 Michael Hanselmann
        break
678 dfe57c22 Michael Hanselmann
679 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
680 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
681 6c5a7090 Michael Hanselmann
        break
682 6c5a7090 Michael Hanselmann
683 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
684 6c5a7090 Michael Hanselmann
685 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
686 5c735209 Iustin Pop
      job.change.wait(delta_time)
687 dfe57c22 Michael Hanselmann
688 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
689 dfe57c22 Michael Hanselmann
690 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
691 dfe57c22 Michael Hanselmann
692 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
693 db37da70 Michael Hanselmann
  @_RequireOpenQueue
694 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
695 188c5e0a Michael Hanselmann
    """Cancels a job.
696 188c5e0a Michael Hanselmann

697 188c5e0a Michael Hanselmann
    @type job_id: string
698 188c5e0a Michael Hanselmann
    @param job_id: Job ID of job to be cancelled.
699 188c5e0a Michael Hanselmann

700 188c5e0a Michael Hanselmann
    """
701 188c5e0a Michael Hanselmann
    logging.debug("Cancelling job %s", job_id)
702 188c5e0a Michael Hanselmann
703 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
704 188c5e0a Michael Hanselmann
    if not job:
705 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
706 188c5e0a Michael Hanselmann
      return
707 188c5e0a Michael Hanselmann
708 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
709 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
710 188c5e0a Michael Hanselmann
      return
711 188c5e0a Michael Hanselmann
712 85f03e0d Michael Hanselmann
    try:
713 85f03e0d Michael Hanselmann
      for op in job.ops:
714 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
715 85f03e0d Michael Hanselmann
        op.result = "Job cancelled by request"
716 85f03e0d Michael Hanselmann
    finally:
717 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
718 188c5e0a Michael Hanselmann
719 db37da70 Michael Hanselmann
  @_RequireOpenQueue
720 07cd723a Iustin Pop
  def _ArchiveJobUnlocked(self, job_id):
721 c609f802 Michael Hanselmann
    """Archives a job.
722 c609f802 Michael Hanselmann

723 c609f802 Michael Hanselmann
    @type job_id: string
724 c609f802 Michael Hanselmann
    @param job_id: Job ID of job to be archived.
725 c609f802 Michael Hanselmann

726 c609f802 Michael Hanselmann
    """
727 07cd723a Iustin Pop
    logging.info("Archiving job %s", job_id)
728 c609f802 Michael Hanselmann
729 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
730 c609f802 Michael Hanselmann
    if not job:
731 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
732 c609f802 Michael Hanselmann
      return
733 c609f802 Michael Hanselmann
734 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
735 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_SUCCESS,
736 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_ERROR):
737 85f03e0d Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
738 c609f802 Michael Hanselmann
      return
739 c609f802 Michael Hanselmann
740 5685c1a5 Michael Hanselmann
    old = self._GetJobPath(job.id)
741 5685c1a5 Michael Hanselmann
    new = self._GetArchivedJobPath(job.id)
742 c609f802 Michael Hanselmann
743 5685c1a5 Michael Hanselmann
    self._RenameFileUnlocked(old, new)
744 c609f802 Michael Hanselmann
745 5685c1a5 Michael Hanselmann
    logging.debug("Successfully archived job %s", job.id)
746 f1da30e6 Michael Hanselmann
747 07cd723a Iustin Pop
  @utils.LockedMethod
748 07cd723a Iustin Pop
  @_RequireOpenQueue
749 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
750 07cd723a Iustin Pop
    """Archives a job.
751 07cd723a Iustin Pop

752 07cd723a Iustin Pop
    @type job_id: string
753 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
754 07cd723a Iustin Pop

755 07cd723a Iustin Pop
    """
756 07cd723a Iustin Pop
    return self._ArchiveJobUnlocked(job_id)
757 07cd723a Iustin Pop
758 07cd723a Iustin Pop
  @utils.LockedMethod
759 07cd723a Iustin Pop
  @_RequireOpenQueue
760 07cd723a Iustin Pop
  def AutoArchiveJobs(self, age):
761 07cd723a Iustin Pop
    """Archives all jobs based on age.
762 07cd723a Iustin Pop

763 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
764 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
765 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
766 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
767 07cd723a Iustin Pop

768 07cd723a Iustin Pop
    @type age: int
769 07cd723a Iustin Pop
    @param age: the minimum age in seconds
770 07cd723a Iustin Pop

771 07cd723a Iustin Pop
    """
772 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
773 07cd723a Iustin Pop
774 07cd723a Iustin Pop
    now = time.time()
775 07cd723a Iustin Pop
    for jid in self._GetJobIDsUnlocked(archived=False):
776 07cd723a Iustin Pop
      job = self._LoadJobUnlocked(jid)
777 07cd723a Iustin Pop
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
778 07cd723a Iustin Pop
                                  constants.OP_STATUS_ERROR,
779 07cd723a Iustin Pop
                                  constants.OP_STATUS_CANCELED):
780 07cd723a Iustin Pop
        continue
781 07cd723a Iustin Pop
      if job.end_timestamp is None:
782 07cd723a Iustin Pop
        if job.start_timestamp is None:
783 07cd723a Iustin Pop
          job_age = job.received_timestamp
784 07cd723a Iustin Pop
        else:
785 07cd723a Iustin Pop
          job_age = job.start_timestamp
786 07cd723a Iustin Pop
      else:
787 07cd723a Iustin Pop
        job_age = job.end_timestamp
788 07cd723a Iustin Pop
789 07cd723a Iustin Pop
      if age == -1 or now - job_age[0] > age:
790 07cd723a Iustin Pop
        self._ArchiveJobUnlocked(jid)
791 07cd723a Iustin Pop
792 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
793 e2715f69 Michael Hanselmann
    row = []
794 e2715f69 Michael Hanselmann
    for fname in fields:
795 e2715f69 Michael Hanselmann
      if fname == "id":
796 e2715f69 Michael Hanselmann
        row.append(job.id)
797 e2715f69 Michael Hanselmann
      elif fname == "status":
798 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
799 af30b2fd Michael Hanselmann
      elif fname == "ops":
800 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
801 af30b2fd Michael Hanselmann
      elif fname == "opresult":
802 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
803 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
804 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
805 5b23c34c Iustin Pop
      elif fname == "oplog":
806 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
807 c56ec146 Iustin Pop
      elif fname == "opstart":
808 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
809 c56ec146 Iustin Pop
      elif fname == "opend":
810 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
811 c56ec146 Iustin Pop
      elif fname == "received_ts":
812 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
813 c56ec146 Iustin Pop
      elif fname == "start_ts":
814 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
815 c56ec146 Iustin Pop
      elif fname == "end_ts":
816 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
817 60dd1473 Iustin Pop
      elif fname == "summary":
818 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
819 e2715f69 Michael Hanselmann
      else:
820 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
821 e2715f69 Michael Hanselmann
    return row
822 e2715f69 Michael Hanselmann
823 85f03e0d Michael Hanselmann
  @utils.LockedMethod
824 db37da70 Michael Hanselmann
  @_RequireOpenQueue
825 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
826 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
827 e2715f69 Michael Hanselmann

828 e2715f69 Michael Hanselmann
    Args:
829 e2715f69 Michael Hanselmann
    - job_ids: Sequence of job identifiers or None for all
830 e2715f69 Michael Hanselmann
    - fields: Names of fields to return
831 e2715f69 Michael Hanselmann

832 e2715f69 Michael Hanselmann
    """
833 85f03e0d Michael Hanselmann
    jobs = []
834 e2715f69 Michael Hanselmann
835 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
836 85f03e0d Michael Hanselmann
      if job is None:
837 85f03e0d Michael Hanselmann
        jobs.append(None)
838 85f03e0d Michael Hanselmann
      else:
839 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
840 e2715f69 Michael Hanselmann
841 85f03e0d Michael Hanselmann
    return jobs
842 e2715f69 Michael Hanselmann
843 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
844 db37da70 Michael Hanselmann
  @_RequireOpenQueue
845 e2715f69 Michael Hanselmann
  def Shutdown(self):
846 e2715f69 Michael Hanselmann
    """Stops the job queue.
847 e2715f69 Michael Hanselmann

848 e2715f69 Michael Hanselmann
    """
849 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
850 85f03e0d Michael Hanselmann
851 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
852 04ab05ce Michael Hanselmann
    self._queue_lock = None