Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ b9bddb6b

History | View | Annotate | Download (23.3 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 6c5a7090 Michael Hanselmann
Locking:
25 6c5a7090 Michael Hanselmann
There's a single, large lock in the JobQueue class. It's used by all other
26 6c5a7090 Michael Hanselmann
classes in this module.
27 6c5a7090 Michael Hanselmann

28 6c5a7090 Michael Hanselmann
"""
29 498ae1cc Iustin Pop
30 f1da30e6 Michael Hanselmann
import os
31 e2715f69 Michael Hanselmann
import logging
32 e2715f69 Michael Hanselmann
import threading
33 f1da30e6 Michael Hanselmann
import errno
34 f1da30e6 Michael Hanselmann
import re
35 f1048938 Iustin Pop
import time
36 5685c1a5 Michael Hanselmann
import weakref
37 498ae1cc Iustin Pop
38 e2715f69 Michael Hanselmann
from ganeti import constants
39 f1da30e6 Michael Hanselmann
from ganeti import serializer
40 e2715f69 Michael Hanselmann
from ganeti import workerpool
41 f1da30e6 Michael Hanselmann
from ganeti import opcodes
42 7a1ecaed Iustin Pop
from ganeti import errors
43 e2715f69 Michael Hanselmann
from ganeti import mcpu
44 7996a135 Iustin Pop
from ganeti import utils
45 04ab05ce Michael Hanselmann
from ganeti import jstore
46 c3f0a12f Iustin Pop
from ganeti import rpc
47 e2715f69 Michael Hanselmann
48 e2715f69 Michael Hanselmann
49 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
50 e2715f69 Michael Hanselmann
51 498ae1cc Iustin Pop
52 70552c46 Michael Hanselmann
def TimeStampNow():
53 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
54 70552c46 Michael Hanselmann
55 70552c46 Michael Hanselmann
56 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
57 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
58 e2715f69 Michael Hanselmann

59 f1048938 Iustin Pop
  The 'log' attribute holds the execution log and consists of tuples
60 6c5a7090 Michael Hanselmann
  of the form (log_serial, timestamp, level, message).
61 f1048938 Iustin Pop

62 e2715f69 Michael Hanselmann
  """
63 85f03e0d Michael Hanselmann
  def __init__(self, op):
64 85f03e0d Michael Hanselmann
    self.input = op
65 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
66 85f03e0d Michael Hanselmann
    self.result = None
67 85f03e0d Michael Hanselmann
    self.log = []
68 70552c46 Michael Hanselmann
    self.start_timestamp = None
69 70552c46 Michael Hanselmann
    self.end_timestamp = None
70 f1da30e6 Michael Hanselmann
71 f1da30e6 Michael Hanselmann
  @classmethod
72 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
73 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
74 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
75 85f03e0d Michael Hanselmann
    obj.status = state["status"]
76 85f03e0d Michael Hanselmann
    obj.result = state["result"]
77 85f03e0d Michael Hanselmann
    obj.log = state["log"]
78 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
79 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
80 f1da30e6 Michael Hanselmann
    return obj
81 f1da30e6 Michael Hanselmann
82 f1da30e6 Michael Hanselmann
  def Serialize(self):
83 6c5a7090 Michael Hanselmann
    return {
84 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
85 6c5a7090 Michael Hanselmann
      "status": self.status,
86 6c5a7090 Michael Hanselmann
      "result": self.result,
87 6c5a7090 Michael Hanselmann
      "log": self.log,
88 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
89 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
90 6c5a7090 Michael Hanselmann
      }
91 f1048938 Iustin Pop
92 e2715f69 Michael Hanselmann
93 e2715f69 Michael Hanselmann
class _QueuedJob(object):
94 e2715f69 Michael Hanselmann
  """In-memory job representation.
95 e2715f69 Michael Hanselmann

96 6c5a7090 Michael Hanselmann
  This is what we use to track the user-submitted jobs. Locking must be taken
97 6c5a7090 Michael Hanselmann
  care of by users of this class.
98 e2715f69 Michael Hanselmann

99 e2715f69 Michael Hanselmann
  """
100 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
101 e2715f69 Michael Hanselmann
    if not ops:
102 e2715f69 Michael Hanselmann
      # TODO
103 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
104 e2715f69 Michael Hanselmann
105 85f03e0d Michael Hanselmann
    self.queue = queue
106 f1da30e6 Michael Hanselmann
    self.id = job_id
107 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
108 85f03e0d Michael Hanselmann
    self.run_op_index = -1
109 6c5a7090 Michael Hanselmann
    self.log_serial = 0
110 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
111 c56ec146 Iustin Pop
    self.start_timestamp = None
112 c56ec146 Iustin Pop
    self.end_timestamp = None
113 6c5a7090 Michael Hanselmann
114 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
115 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
116 f1da30e6 Michael Hanselmann
117 f1da30e6 Michael Hanselmann
  @classmethod
118 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
119 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
120 85f03e0d Michael Hanselmann
    obj.queue = queue
121 85f03e0d Michael Hanselmann
    obj.id = state["id"]
122 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
123 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
124 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
125 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
126 6c5a7090 Michael Hanselmann
127 6c5a7090 Michael Hanselmann
    obj.ops = []
128 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
129 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
130 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
131 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
132 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
133 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
134 6c5a7090 Michael Hanselmann
135 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
136 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
137 6c5a7090 Michael Hanselmann
138 f1da30e6 Michael Hanselmann
    return obj
139 f1da30e6 Michael Hanselmann
140 f1da30e6 Michael Hanselmann
  def Serialize(self):
141 f1da30e6 Michael Hanselmann
    return {
142 f1da30e6 Michael Hanselmann
      "id": self.id,
143 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
144 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
145 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
146 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
147 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
148 f1da30e6 Michael Hanselmann
      }
149 f1da30e6 Michael Hanselmann
150 85f03e0d Michael Hanselmann
  def CalcStatus(self):
151 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
152 e2715f69 Michael Hanselmann
153 e2715f69 Michael Hanselmann
    all_success = True
154 85f03e0d Michael Hanselmann
    for op in self.ops:
155 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
156 e2715f69 Michael Hanselmann
        continue
157 e2715f69 Michael Hanselmann
158 e2715f69 Michael Hanselmann
      all_success = False
159 e2715f69 Michael Hanselmann
160 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
161 e2715f69 Michael Hanselmann
        pass
162 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
163 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
164 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
165 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
166 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
167 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
168 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
169 f1da30e6 Michael Hanselmann
        break
170 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
171 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
172 4cb1d919 Michael Hanselmann
        break
173 e2715f69 Michael Hanselmann
174 e2715f69 Michael Hanselmann
    if all_success:
175 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
176 e2715f69 Michael Hanselmann
177 e2715f69 Michael Hanselmann
    return status
178 e2715f69 Michael Hanselmann
179 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
180 6c5a7090 Michael Hanselmann
    if newer_than is None:
181 6c5a7090 Michael Hanselmann
      serial = -1
182 6c5a7090 Michael Hanselmann
    else:
183 6c5a7090 Michael Hanselmann
      serial = newer_than
184 6c5a7090 Michael Hanselmann
185 6c5a7090 Michael Hanselmann
    entries = []
186 6c5a7090 Michael Hanselmann
    for op in self.ops:
187 6c5a7090 Michael Hanselmann
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
188 6c5a7090 Michael Hanselmann
189 6c5a7090 Michael Hanselmann
    return entries
190 6c5a7090 Michael Hanselmann
191 f1048938 Iustin Pop
192 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
193 e92376d7 Iustin Pop
  def _NotifyStart(self):
194 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
195 e92376d7 Iustin Pop

196 e92376d7 Iustin Pop
    This is called from the mcpu code as a notifier function, when the
197 e92376d7 Iustin Pop
    LU is finally about to start the Exec() method. Of course, to have
198 e92376d7 Iustin Pop
    end-user visible results, the opcode must be initially (before
199 e92376d7 Iustin Pop
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
200 e92376d7 Iustin Pop

201 e92376d7 Iustin Pop
    """
202 e92376d7 Iustin Pop
    assert self.queue, "Queue attribute is missing"
203 e92376d7 Iustin Pop
    assert self.opcode, "Opcode attribute is missing"
204 e92376d7 Iustin Pop
205 e92376d7 Iustin Pop
    self.queue.acquire()
206 e92376d7 Iustin Pop
    try:
207 e92376d7 Iustin Pop
      self.opcode.status = constants.OP_STATUS_RUNNING
208 e92376d7 Iustin Pop
    finally:
209 e92376d7 Iustin Pop
      self.queue.release()
210 e92376d7 Iustin Pop
211 85f03e0d Michael Hanselmann
  def RunTask(self, job):
212 e2715f69 Michael Hanselmann
    """Job executor.
213 e2715f69 Michael Hanselmann

214 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
215 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
216 e2715f69 Michael Hanselmann

217 e2715f69 Michael Hanselmann
    """
218 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
219 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
220 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
221 e92376d7 Iustin Pop
    self.queue = queue = job.queue
222 e2715f69 Michael Hanselmann
    try:
223 85f03e0d Michael Hanselmann
      try:
224 85f03e0d Michael Hanselmann
        count = len(job.ops)
225 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
226 85f03e0d Michael Hanselmann
          try:
227 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
228 85f03e0d Michael Hanselmann
229 85f03e0d Michael Hanselmann
            queue.acquire()
230 85f03e0d Michael Hanselmann
            try:
231 85f03e0d Michael Hanselmann
              job.run_op_index = idx
232 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
233 85f03e0d Michael Hanselmann
              op.result = None
234 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
235 c56ec146 Iustin Pop
              if idx == 0: # first opcode
236 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
237 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
238 85f03e0d Michael Hanselmann
239 38206f3c Iustin Pop
              input_opcode = op.input
240 85f03e0d Michael Hanselmann
            finally:
241 85f03e0d Michael Hanselmann
              queue.release()
242 85f03e0d Michael Hanselmann
243 dfe57c22 Michael Hanselmann
            def _Log(*args):
244 6c5a7090 Michael Hanselmann
              """Append a log entry.
245 6c5a7090 Michael Hanselmann

246 6c5a7090 Michael Hanselmann
              """
247 6c5a7090 Michael Hanselmann
              assert len(args) < 3
248 6c5a7090 Michael Hanselmann
249 6c5a7090 Michael Hanselmann
              if len(args) == 1:
250 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
251 6c5a7090 Michael Hanselmann
                log_msg = args[0]
252 6c5a7090 Michael Hanselmann
              else:
253 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
254 6c5a7090 Michael Hanselmann
255 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
256 6c5a7090 Michael Hanselmann
              # precision.
257 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
258 dfe57c22 Michael Hanselmann
259 6c5a7090 Michael Hanselmann
              queue.acquire()
260 dfe57c22 Michael Hanselmann
              try:
261 6c5a7090 Michael Hanselmann
                job.log_serial += 1
262 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
263 6c5a7090 Michael Hanselmann
264 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
265 dfe57c22 Michael Hanselmann
              finally:
266 6c5a7090 Michael Hanselmann
                queue.release()
267 dfe57c22 Michael Hanselmann
268 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
269 e92376d7 Iustin Pop
            self.opcode = op
270 e92376d7 Iustin Pop
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
271 85f03e0d Michael Hanselmann
272 85f03e0d Michael Hanselmann
            queue.acquire()
273 85f03e0d Michael Hanselmann
            try:
274 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
275 85f03e0d Michael Hanselmann
              op.result = result
276 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
277 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
278 85f03e0d Michael Hanselmann
            finally:
279 85f03e0d Michael Hanselmann
              queue.release()
280 85f03e0d Michael Hanselmann
281 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
282 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
283 85f03e0d Michael Hanselmann
          except Exception, err:
284 85f03e0d Michael Hanselmann
            queue.acquire()
285 85f03e0d Michael Hanselmann
            try:
286 85f03e0d Michael Hanselmann
              try:
287 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
288 85f03e0d Michael Hanselmann
                op.result = str(err)
289 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
290 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
291 85f03e0d Michael Hanselmann
              finally:
292 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
293 85f03e0d Michael Hanselmann
            finally:
294 85f03e0d Michael Hanselmann
              queue.release()
295 85f03e0d Michael Hanselmann
            raise
296 85f03e0d Michael Hanselmann
297 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
298 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
299 85f03e0d Michael Hanselmann
      except:
300 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
301 e2715f69 Michael Hanselmann
    finally:
302 85f03e0d Michael Hanselmann
      queue.acquire()
303 85f03e0d Michael Hanselmann
      try:
304 65548ed5 Michael Hanselmann
        try:
305 65548ed5 Michael Hanselmann
          job.run_op_idx = -1
306 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
307 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
308 65548ed5 Michael Hanselmann
        finally:
309 65548ed5 Michael Hanselmann
          job_id = job.id
310 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
311 85f03e0d Michael Hanselmann
      finally:
312 85f03e0d Michael Hanselmann
        queue.release()
313 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
314 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
315 e2715f69 Michael Hanselmann
316 e2715f69 Michael Hanselmann
317 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
318 5bdce580 Michael Hanselmann
  def __init__(self, queue):
319 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
320 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
321 5bdce580 Michael Hanselmann
    self.queue = queue
322 e2715f69 Michael Hanselmann
323 e2715f69 Michael Hanselmann
324 85f03e0d Michael Hanselmann
class JobQueue(object):
325 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
326 f1da30e6 Michael Hanselmann
327 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
328 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
329 db37da70 Michael Hanselmann

330 db37da70 Michael Hanselmann
    This function should be used for all "public" functions. That is, functions
331 db37da70 Michael Hanselmann
    usually called from other classes.
332 db37da70 Michael Hanselmann

333 db37da70 Michael Hanselmann
    Important: Use this decorator only after utils.LockedMethod!
334 db37da70 Michael Hanselmann

335 db37da70 Michael Hanselmann
    Example:
336 db37da70 Michael Hanselmann
      @utils.LockedMethod
337 db37da70 Michael Hanselmann
      @_RequireOpenQueue
338 db37da70 Michael Hanselmann
      def Example(self):
339 db37da70 Michael Hanselmann
        pass
340 db37da70 Michael Hanselmann

341 db37da70 Michael Hanselmann
    """
342 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
343 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
344 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
345 db37da70 Michael Hanselmann
    return wrapper
346 db37da70 Michael Hanselmann
347 85f03e0d Michael Hanselmann
  def __init__(self, context):
348 5bdce580 Michael Hanselmann
    self.context = context
349 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
350 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
351 f1da30e6 Michael Hanselmann
352 85f03e0d Michael Hanselmann
    # Locking
353 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
354 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
355 85f03e0d Michael Hanselmann
    self.release = self._lock.release
356 85f03e0d Michael Hanselmann
357 04ab05ce Michael Hanselmann
    # Initialize
358 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
359 f1da30e6 Michael Hanselmann
360 04ab05ce Michael Hanselmann
    # Read serial file
361 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
362 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
363 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
364 c4beba1c Iustin Pop
365 23752136 Michael Hanselmann
    # Get initial list of nodes
366 8e00939c Michael Hanselmann
    self._nodes = set(self.context.cfg.GetNodeList())
367 8e00939c Michael Hanselmann
368 8e00939c Michael Hanselmann
    # Remove master node
369 8e00939c Michael Hanselmann
    try:
370 8e00939c Michael Hanselmann
      self._nodes.remove(self._my_hostname)
371 8e00939c Michael Hanselmann
    except ValueError:
372 8e00939c Michael Hanselmann
      pass
373 23752136 Michael Hanselmann
374 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
375 23752136 Michael Hanselmann
376 85f03e0d Michael Hanselmann
    # Setup worker pool
377 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
378 85f03e0d Michael Hanselmann
379 85f03e0d Michael Hanselmann
    # We need to lock here because WorkerPool.AddTask() may start a job while
380 85f03e0d Michael Hanselmann
    # we're still doing our work.
381 85f03e0d Michael Hanselmann
    self.acquire()
382 85f03e0d Michael Hanselmann
    try:
383 85f03e0d Michael Hanselmann
      for job in self._GetJobsUnlocked(None):
384 85f03e0d Michael Hanselmann
        status = job.CalcStatus()
385 85f03e0d Michael Hanselmann
386 85f03e0d Michael Hanselmann
        if status in (constants.JOB_STATUS_QUEUED, ):
387 85f03e0d Michael Hanselmann
          self._wpool.AddTask(job)
388 85f03e0d Michael Hanselmann
389 e92376d7 Iustin Pop
        elif status in (constants.JOB_STATUS_RUNNING,
390 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
391 85f03e0d Michael Hanselmann
          logging.warning("Unfinished job %s found: %s", job.id, job)
392 85f03e0d Michael Hanselmann
          try:
393 85f03e0d Michael Hanselmann
            for op in job.ops:
394 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_ERROR
395 85f03e0d Michael Hanselmann
              op.result = "Unclean master daemon shutdown"
396 85f03e0d Michael Hanselmann
          finally:
397 85f03e0d Michael Hanselmann
            self.UpdateJobUnlocked(job)
398 85f03e0d Michael Hanselmann
    finally:
399 85f03e0d Michael Hanselmann
      self.release()
400 85f03e0d Michael Hanselmann
401 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
402 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
403 d2e03a33 Michael Hanselmann
  def AddNode(self, node_name):
404 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
405 23752136 Michael Hanselmann
406 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
407 9f774ee8 Michael Hanselmann
    rpc.call_jobqueue_purge(node_name)
408 23752136 Michael Hanselmann
409 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
410 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
411 23752136 Michael Hanselmann
412 d2e03a33 Michael Hanselmann
    # Upload current serial file
413 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
414 d2e03a33 Michael Hanselmann
415 d2e03a33 Michael Hanselmann
    for file_name in files:
416 9f774ee8 Michael Hanselmann
      # Read file content
417 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
418 9f774ee8 Michael Hanselmann
      try:
419 9f774ee8 Michael Hanselmann
        content = fd.read()
420 9f774ee8 Michael Hanselmann
      finally:
421 9f774ee8 Michael Hanselmann
        fd.close()
422 9f774ee8 Michael Hanselmann
423 9f774ee8 Michael Hanselmann
      result = rpc.call_jobqueue_update([node_name], file_name, content)
424 d2e03a33 Michael Hanselmann
      if not result[node_name]:
425 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
426 d2e03a33 Michael Hanselmann
427 d2e03a33 Michael Hanselmann
    self._nodes.add(node_name)
428 d2e03a33 Michael Hanselmann
429 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
430 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
431 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
432 23752136 Michael Hanselmann
    try:
433 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
434 d2e03a33 Michael Hanselmann
      self._nodes.remove(node_name)
435 d2e03a33 Michael Hanselmann
    except KeyError:
436 23752136 Michael Hanselmann
      pass
437 23752136 Michael Hanselmann
438 e74798c1 Michael Hanselmann
  def _CheckRpcResult(self, result, nodes, failmsg):
439 e74798c1 Michael Hanselmann
    failed = []
440 e74798c1 Michael Hanselmann
    success = []
441 e74798c1 Michael Hanselmann
442 e74798c1 Michael Hanselmann
    for node in nodes:
443 e74798c1 Michael Hanselmann
      if result[node]:
444 e74798c1 Michael Hanselmann
        success.append(node)
445 e74798c1 Michael Hanselmann
      else:
446 e74798c1 Michael Hanselmann
        failed.append(node)
447 e74798c1 Michael Hanselmann
448 e74798c1 Michael Hanselmann
    if failed:
449 e74798c1 Michael Hanselmann
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
450 e74798c1 Michael Hanselmann
451 e74798c1 Michael Hanselmann
    # +1 for the master node
452 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
453 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
454 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
455 e74798c1 Michael Hanselmann
456 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
457 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
458 8e00939c Michael Hanselmann

459 8e00939c Michael Hanselmann
    """
460 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
461 8e00939c Michael Hanselmann
462 9f774ee8 Michael Hanselmann
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
463 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
464 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
465 23752136 Michael Hanselmann
466 abc1f2ce Michael Hanselmann
  def _RenameFileUnlocked(self, old, new):
467 abc1f2ce Michael Hanselmann
    os.rename(old, new)
468 abc1f2ce Michael Hanselmann
469 abc1f2ce Michael Hanselmann
    result = rpc.call_jobqueue_rename(self._nodes, old, new)
470 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
471 e74798c1 Michael Hanselmann
                         "Moving %s to %s" % (old, new))
472 abc1f2ce Michael Hanselmann
473 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
474 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
475 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
476 85f03e0d Michael Hanselmann
    if job_id < 0:
477 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
478 85f03e0d Michael Hanselmann
479 85f03e0d Michael Hanselmann
    return str(job_id)
480 85f03e0d Michael Hanselmann
481 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
482 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
483 f1da30e6 Michael Hanselmann

484 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
485 f1da30e6 Michael Hanselmann

486 f1da30e6 Michael Hanselmann
    Returns: A string representing the job identifier.
487 f1da30e6 Michael Hanselmann

488 f1da30e6 Michael Hanselmann
    """
489 f1da30e6 Michael Hanselmann
    # New number
490 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
491 f1da30e6 Michael Hanselmann
492 f1da30e6 Michael Hanselmann
    # Write to file
493 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
494 23752136 Michael Hanselmann
                                        "%s\n" % serial)
495 f1da30e6 Michael Hanselmann
496 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
497 f1da30e6 Michael Hanselmann
    self._last_serial = serial
498 f1da30e6 Michael Hanselmann
499 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
500 f1da30e6 Michael Hanselmann
501 85f03e0d Michael Hanselmann
  @staticmethod
502 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
503 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
504 f1da30e6 Michael Hanselmann
505 85f03e0d Michael Hanselmann
  @staticmethod
506 85f03e0d Michael Hanselmann
  def _GetArchivedJobPath(job_id):
507 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
508 0cb94105 Michael Hanselmann
509 85f03e0d Michael Hanselmann
  @classmethod
510 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
511 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
512 fae737ac Michael Hanselmann
    if m:
513 fae737ac Michael Hanselmann
      return m.group(1)
514 fae737ac Michael Hanselmann
    else:
515 fae737ac Michael Hanselmann
      return None
516 fae737ac Michael Hanselmann
517 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
518 911a495b Iustin Pop
    """Return all known job IDs.
519 911a495b Iustin Pop

520 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
521 911a495b Iustin Pop
    included. Currently this argument is unused.
522 911a495b Iustin Pop

523 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
524 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
525 ac0930b9 Iustin Pop
    extra IDs).
526 ac0930b9 Iustin Pop

527 911a495b Iustin Pop
    """
528 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
529 3b87986e Iustin Pop
    jlist = utils.NiceSort(jlist)
530 f0d874fe Iustin Pop
    return jlist
531 911a495b Iustin Pop
532 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
533 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
534 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
535 f1da30e6 Michael Hanselmann
536 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
537 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
538 5685c1a5 Michael Hanselmann
    if job:
539 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
540 5685c1a5 Michael Hanselmann
      return job
541 ac0930b9 Iustin Pop
542 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
543 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
544 f1da30e6 Michael Hanselmann
    try:
545 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
546 f1da30e6 Michael Hanselmann
    except IOError, err:
547 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
548 f1da30e6 Michael Hanselmann
        return None
549 f1da30e6 Michael Hanselmann
      raise
550 f1da30e6 Michael Hanselmann
    try:
551 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
552 f1da30e6 Michael Hanselmann
    finally:
553 f1da30e6 Michael Hanselmann
      fd.close()
554 f1da30e6 Michael Hanselmann
555 ac0930b9 Iustin Pop
    job = _QueuedJob.Restore(self, data)
556 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
557 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
558 ac0930b9 Iustin Pop
    return job
559 f1da30e6 Michael Hanselmann
560 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
561 911a495b Iustin Pop
    if not job_ids:
562 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
563 f1da30e6 Michael Hanselmann
564 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
565 f1da30e6 Michael Hanselmann
566 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
567 db37da70 Michael Hanselmann
  @_RequireOpenQueue
568 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
569 85f03e0d Michael Hanselmann
    """Create and store a new job.
570 f1da30e6 Michael Hanselmann

571 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
572 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
573 c3f0a12f Iustin Pop

574 c3f0a12f Iustin Pop
    @type ops: list
575 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
576 c3f0a12f Iustin Pop

577 c3f0a12f Iustin Pop
    """
578 f1da30e6 Michael Hanselmann
    # Get job identifier
579 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
580 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
581 f1da30e6 Michael Hanselmann
582 f1da30e6 Michael Hanselmann
    # Write to disk
583 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
584 f1da30e6 Michael Hanselmann
585 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
586 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
587 ac0930b9 Iustin Pop
588 85f03e0d Michael Hanselmann
    # Add to worker pool
589 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
590 85f03e0d Michael Hanselmann
591 85f03e0d Michael Hanselmann
    return job.id
592 f1da30e6 Michael Hanselmann
593 db37da70 Michael Hanselmann
  @_RequireOpenQueue
594 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
595 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
596 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
597 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
598 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
599 ac0930b9 Iustin Pop
600 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
601 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
602 dfe57c22 Michael Hanselmann
603 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
604 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
605 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
606 5c735209 Iustin Pop
                        timeout):
607 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
608 6c5a7090 Michael Hanselmann

609 6c5a7090 Michael Hanselmann
    @type job_id: string
610 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
611 6c5a7090 Michael Hanselmann
    @type fields: list of strings
612 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
613 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
614 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
615 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
616 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
617 5c735209 Iustin Pop
    @type timeout: float
618 5c735209 Iustin Pop
    @param timeout: maximum time to wait
619 6c5a7090 Michael Hanselmann

620 6c5a7090 Michael Hanselmann
    """
621 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
622 5c735209 Iustin Pop
    end_time = time.time() + timeout
623 dfe57c22 Michael Hanselmann
    while True:
624 5c735209 Iustin Pop
      delta_time = end_time - time.time()
625 5c735209 Iustin Pop
      if delta_time < 0:
626 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
627 5c735209 Iustin Pop
628 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
629 6c5a7090 Michael Hanselmann
      if not job:
630 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
631 6c5a7090 Michael Hanselmann
        break
632 dfe57c22 Michael Hanselmann
633 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
634 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
635 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
636 dfe57c22 Michael Hanselmann
637 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
638 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
639 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
640 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
641 dfe57c22 Michael Hanselmann
      # significantly different.
642 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
643 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
644 dfe57c22 Michael Hanselmann
645 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
646 e92376d7 Iustin Pop
                        constants.JOB_STATUS_RUNNING,
647 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
648 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
649 6c5a7090 Michael Hanselmann
        # no changes.
650 dfe57c22 Michael Hanselmann
        break
651 dfe57c22 Michael Hanselmann
652 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
653 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
654 6c5a7090 Michael Hanselmann
        break
655 6c5a7090 Michael Hanselmann
656 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
657 6c5a7090 Michael Hanselmann
658 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
659 5c735209 Iustin Pop
      job.change.wait(delta_time)
660 dfe57c22 Michael Hanselmann
661 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
662 dfe57c22 Michael Hanselmann
663 6c5a7090 Michael Hanselmann
    return (job_info, log_entries)
664 dfe57c22 Michael Hanselmann
665 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
666 db37da70 Michael Hanselmann
  @_RequireOpenQueue
667 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
668 188c5e0a Michael Hanselmann
    """Cancels a job.
669 188c5e0a Michael Hanselmann

670 188c5e0a Michael Hanselmann
    @type job_id: string
671 188c5e0a Michael Hanselmann
    @param job_id: Job ID of job to be cancelled.
672 188c5e0a Michael Hanselmann

673 188c5e0a Michael Hanselmann
    """
674 188c5e0a Michael Hanselmann
    logging.debug("Cancelling job %s", job_id)
675 188c5e0a Michael Hanselmann
676 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
677 188c5e0a Michael Hanselmann
    if not job:
678 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
679 188c5e0a Michael Hanselmann
      return
680 188c5e0a Michael Hanselmann
681 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
682 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
683 188c5e0a Michael Hanselmann
      return
684 188c5e0a Michael Hanselmann
685 85f03e0d Michael Hanselmann
    try:
686 85f03e0d Michael Hanselmann
      for op in job.ops:
687 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
688 85f03e0d Michael Hanselmann
        op.result = "Job cancelled by request"
689 85f03e0d Michael Hanselmann
    finally:
690 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
691 188c5e0a Michael Hanselmann
692 db37da70 Michael Hanselmann
  @_RequireOpenQueue
693 07cd723a Iustin Pop
  def _ArchiveJobUnlocked(self, job_id):
694 c609f802 Michael Hanselmann
    """Archives a job.
695 c609f802 Michael Hanselmann

696 c609f802 Michael Hanselmann
    @type job_id: string
697 c609f802 Michael Hanselmann
    @param job_id: Job ID of job to be archived.
698 c609f802 Michael Hanselmann

699 c609f802 Michael Hanselmann
    """
700 07cd723a Iustin Pop
    logging.info("Archiving job %s", job_id)
701 c609f802 Michael Hanselmann
702 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
703 c609f802 Michael Hanselmann
    if not job:
704 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
705 c609f802 Michael Hanselmann
      return
706 c609f802 Michael Hanselmann
707 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
708 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_SUCCESS,
709 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_ERROR):
710 85f03e0d Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
711 c609f802 Michael Hanselmann
      return
712 c609f802 Michael Hanselmann
713 5685c1a5 Michael Hanselmann
    old = self._GetJobPath(job.id)
714 5685c1a5 Michael Hanselmann
    new = self._GetArchivedJobPath(job.id)
715 c609f802 Michael Hanselmann
716 5685c1a5 Michael Hanselmann
    self._RenameFileUnlocked(old, new)
717 c609f802 Michael Hanselmann
718 5685c1a5 Michael Hanselmann
    logging.debug("Successfully archived job %s", job.id)
719 f1da30e6 Michael Hanselmann
720 07cd723a Iustin Pop
  @utils.LockedMethod
721 07cd723a Iustin Pop
  @_RequireOpenQueue
722 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
723 07cd723a Iustin Pop
    """Archives a job.
724 07cd723a Iustin Pop

725 07cd723a Iustin Pop
    @type job_id: string
726 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
727 07cd723a Iustin Pop

728 07cd723a Iustin Pop
    """
729 07cd723a Iustin Pop
    return self._ArchiveJobUnlocked(job_id)
730 07cd723a Iustin Pop
731 07cd723a Iustin Pop
  @utils.LockedMethod
732 07cd723a Iustin Pop
  @_RequireOpenQueue
733 07cd723a Iustin Pop
  def AutoArchiveJobs(self, age):
734 07cd723a Iustin Pop
    """Archives all jobs based on age.
735 07cd723a Iustin Pop

736 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
737 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
738 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
739 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
740 07cd723a Iustin Pop

741 07cd723a Iustin Pop
    @type age: int
742 07cd723a Iustin Pop
    @param age: the minimum age in seconds
743 07cd723a Iustin Pop

744 07cd723a Iustin Pop
    """
745 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
746 07cd723a Iustin Pop
747 07cd723a Iustin Pop
    now = time.time()
748 07cd723a Iustin Pop
    for jid in self._GetJobIDsUnlocked(archived=False):
749 07cd723a Iustin Pop
      job = self._LoadJobUnlocked(jid)
750 07cd723a Iustin Pop
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
751 07cd723a Iustin Pop
                                  constants.OP_STATUS_ERROR,
752 07cd723a Iustin Pop
                                  constants.OP_STATUS_CANCELED):
753 07cd723a Iustin Pop
        continue
754 07cd723a Iustin Pop
      if job.end_timestamp is None:
755 07cd723a Iustin Pop
        if job.start_timestamp is None:
756 07cd723a Iustin Pop
          job_age = job.received_timestamp
757 07cd723a Iustin Pop
        else:
758 07cd723a Iustin Pop
          job_age = job.start_timestamp
759 07cd723a Iustin Pop
      else:
760 07cd723a Iustin Pop
        job_age = job.end_timestamp
761 07cd723a Iustin Pop
762 07cd723a Iustin Pop
      if age == -1 or now - job_age[0] > age:
763 07cd723a Iustin Pop
        self._ArchiveJobUnlocked(jid)
764 07cd723a Iustin Pop
765 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
766 e2715f69 Michael Hanselmann
    row = []
767 e2715f69 Michael Hanselmann
    for fname in fields:
768 e2715f69 Michael Hanselmann
      if fname == "id":
769 e2715f69 Michael Hanselmann
        row.append(job.id)
770 e2715f69 Michael Hanselmann
      elif fname == "status":
771 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
772 af30b2fd Michael Hanselmann
      elif fname == "ops":
773 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
774 af30b2fd Michael Hanselmann
      elif fname == "opresult":
775 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
776 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
777 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
778 5b23c34c Iustin Pop
      elif fname == "oplog":
779 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
780 c56ec146 Iustin Pop
      elif fname == "opstart":
781 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
782 c56ec146 Iustin Pop
      elif fname == "opend":
783 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
784 c56ec146 Iustin Pop
      elif fname == "received_ts":
785 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
786 c56ec146 Iustin Pop
      elif fname == "start_ts":
787 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
788 c56ec146 Iustin Pop
      elif fname == "end_ts":
789 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
790 60dd1473 Iustin Pop
      elif fname == "summary":
791 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
792 e2715f69 Michael Hanselmann
      else:
793 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
794 e2715f69 Michael Hanselmann
    return row
795 e2715f69 Michael Hanselmann
796 85f03e0d Michael Hanselmann
  @utils.LockedMethod
797 db37da70 Michael Hanselmann
  @_RequireOpenQueue
798 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
799 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
800 e2715f69 Michael Hanselmann

801 e2715f69 Michael Hanselmann
    Args:
802 e2715f69 Michael Hanselmann
    - job_ids: Sequence of job identifiers or None for all
803 e2715f69 Michael Hanselmann
    - fields: Names of fields to return
804 e2715f69 Michael Hanselmann

805 e2715f69 Michael Hanselmann
    """
806 85f03e0d Michael Hanselmann
    jobs = []
807 e2715f69 Michael Hanselmann
808 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
809 85f03e0d Michael Hanselmann
      if job is None:
810 85f03e0d Michael Hanselmann
        jobs.append(None)
811 85f03e0d Michael Hanselmann
      else:
812 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
813 e2715f69 Michael Hanselmann
814 85f03e0d Michael Hanselmann
    return jobs
815 e2715f69 Michael Hanselmann
816 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
817 db37da70 Michael Hanselmann
  @_RequireOpenQueue
818 e2715f69 Michael Hanselmann
  def Shutdown(self):
819 e2715f69 Michael Hanselmann
    """Stops the job queue.
820 e2715f69 Michael Hanselmann

821 e2715f69 Michael Hanselmann
    """
822 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
823 85f03e0d Michael Hanselmann
824 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
825 04ab05ce Michael Hanselmann
    self._queue_lock = None