Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 5bdce580

History | View | Annotate | Download (17.1 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 498ae1cc Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 498ae1cc Iustin Pop
"""Module implementing the job queue handling."""
23 498ae1cc Iustin Pop
24 f1da30e6 Michael Hanselmann
import os
25 e2715f69 Michael Hanselmann
import logging
26 e2715f69 Michael Hanselmann
import threading
27 f1da30e6 Michael Hanselmann
import errno
28 f1da30e6 Michael Hanselmann
import re
29 f1048938 Iustin Pop
import time
30 498ae1cc Iustin Pop
31 e2715f69 Michael Hanselmann
from ganeti import constants
32 f1da30e6 Michael Hanselmann
from ganeti import serializer
33 e2715f69 Michael Hanselmann
from ganeti import workerpool
34 f1da30e6 Michael Hanselmann
from ganeti import opcodes
35 7a1ecaed Iustin Pop
from ganeti import errors
36 e2715f69 Michael Hanselmann
from ganeti import mcpu
37 7996a135 Iustin Pop
from ganeti import utils
38 c3f0a12f Iustin Pop
from ganeti import rpc
39 e2715f69 Michael Hanselmann
40 e2715f69 Michael Hanselmann
41 e2715f69 Michael Hanselmann
JOBQUEUE_THREADS = 5
42 e2715f69 Michael Hanselmann
43 498ae1cc Iustin Pop
44 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
45 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
46 e2715f69 Michael Hanselmann

47 307149a8 Iustin Pop
  Access is synchronized by the '_lock' attribute.
48 e2715f69 Michael Hanselmann

49 f1048938 Iustin Pop
  The 'log' attribute holds the execution log and consists of tuples
50 f1048938 Iustin Pop
  of the form (timestamp, level, message).
51 f1048938 Iustin Pop

52 e2715f69 Michael Hanselmann
  """
53 85f03e0d Michael Hanselmann
  def __new__(cls, *args, **kwargs):
54 85f03e0d Michael Hanselmann
    obj = object.__new__(cls, *args, **kwargs)
55 85f03e0d Michael Hanselmann
    # Create a special lock for logging
56 85f03e0d Michael Hanselmann
    obj._log_lock = threading.Lock()
57 85f03e0d Michael Hanselmann
    return obj
58 f1da30e6 Michael Hanselmann
59 85f03e0d Michael Hanselmann
  def __init__(self, op):
60 85f03e0d Michael Hanselmann
    self.input = op
61 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
62 85f03e0d Michael Hanselmann
    self.result = None
63 85f03e0d Michael Hanselmann
    self.log = []
64 f1da30e6 Michael Hanselmann
65 f1da30e6 Michael Hanselmann
  @classmethod
66 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
67 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
68 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
69 85f03e0d Michael Hanselmann
    obj.status = state["status"]
70 85f03e0d Michael Hanselmann
    obj.result = state["result"]
71 85f03e0d Michael Hanselmann
    obj.log = state["log"]
72 f1da30e6 Michael Hanselmann
    return obj
73 f1da30e6 Michael Hanselmann
74 f1da30e6 Michael Hanselmann
  def Serialize(self):
75 85f03e0d Michael Hanselmann
    self._log_lock.acquire()
76 85f03e0d Michael Hanselmann
    try:
77 85f03e0d Michael Hanselmann
      return {
78 85f03e0d Michael Hanselmann
        "input": self.input.__getstate__(),
79 85f03e0d Michael Hanselmann
        "status": self.status,
80 85f03e0d Michael Hanselmann
        "result": self.result,
81 85f03e0d Michael Hanselmann
        "log": self.log,
82 85f03e0d Michael Hanselmann
        }
83 85f03e0d Michael Hanselmann
    finally:
84 85f03e0d Michael Hanselmann
      self._log_lock.release()
85 e2715f69 Michael Hanselmann
86 f1048938 Iustin Pop
  def Log(self, *args):
87 f1048938 Iustin Pop
    """Append a log entry.
88 f1048938 Iustin Pop

89 f1048938 Iustin Pop
    """
90 85f03e0d Michael Hanselmann
    assert len(args) < 3
91 f1048938 Iustin Pop
92 f1048938 Iustin Pop
    if len(args) == 1:
93 f1048938 Iustin Pop
      log_type = constants.ELOG_MESSAGE
94 f1048938 Iustin Pop
      log_msg = args[0]
95 f1048938 Iustin Pop
    else:
96 f1048938 Iustin Pop
      log_type, log_msg = args
97 f1048938 Iustin Pop
98 85f03e0d Michael Hanselmann
    self._log_lock.acquire()
99 85f03e0d Michael Hanselmann
    try:
100 85f03e0d Michael Hanselmann
      self.log.append((time.time(), log_type, log_msg))
101 85f03e0d Michael Hanselmann
    finally:
102 85f03e0d Michael Hanselmann
      self._log_lock.release()
103 85f03e0d Michael Hanselmann
104 f1048938 Iustin Pop
  def RetrieveLog(self, start_at=0):
105 f1048938 Iustin Pop
    """Retrieve (a part of) the execution log.
106 f1048938 Iustin Pop

107 f1048938 Iustin Pop
    """
108 85f03e0d Michael Hanselmann
    self._log_lock.acquire()
109 85f03e0d Michael Hanselmann
    try:
110 85f03e0d Michael Hanselmann
      return self.log[start_at:]
111 85f03e0d Michael Hanselmann
    finally:
112 85f03e0d Michael Hanselmann
      self._log_lock.release()
113 f1048938 Iustin Pop
114 e2715f69 Michael Hanselmann
115 e2715f69 Michael Hanselmann
class _QueuedJob(object):
116 e2715f69 Michael Hanselmann
  """In-memory job representation.
117 e2715f69 Michael Hanselmann

118 e2715f69 Michael Hanselmann
  This is what we use to track the user-submitted jobs.
119 e2715f69 Michael Hanselmann

120 e2715f69 Michael Hanselmann
  """
121 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
122 e2715f69 Michael Hanselmann
    if not ops:
123 e2715f69 Michael Hanselmann
      # TODO
124 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
125 e2715f69 Michael Hanselmann
126 85f03e0d Michael Hanselmann
    self.queue = queue
127 f1da30e6 Michael Hanselmann
    self.id = job_id
128 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
129 85f03e0d Michael Hanselmann
    self.run_op_index = -1
130 f1da30e6 Michael Hanselmann
131 f1da30e6 Michael Hanselmann
  @classmethod
132 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
133 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
134 85f03e0d Michael Hanselmann
    obj.queue = queue
135 85f03e0d Michael Hanselmann
    obj.id = state["id"]
136 85f03e0d Michael Hanselmann
    obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
137 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
138 f1da30e6 Michael Hanselmann
    return obj
139 f1da30e6 Michael Hanselmann
140 f1da30e6 Michael Hanselmann
  def Serialize(self):
141 f1da30e6 Michael Hanselmann
    return {
142 f1da30e6 Michael Hanselmann
      "id": self.id,
143 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
144 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
145 f1da30e6 Michael Hanselmann
      }
146 f1da30e6 Michael Hanselmann
147 85f03e0d Michael Hanselmann
  def CalcStatus(self):
148 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
149 e2715f69 Michael Hanselmann
150 e2715f69 Michael Hanselmann
    all_success = True
151 85f03e0d Michael Hanselmann
    for op in self.ops:
152 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
153 e2715f69 Michael Hanselmann
        continue
154 e2715f69 Michael Hanselmann
155 e2715f69 Michael Hanselmann
      all_success = False
156 e2715f69 Michael Hanselmann
157 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
158 e2715f69 Michael Hanselmann
        pass
159 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
160 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
161 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
162 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
163 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
164 f1da30e6 Michael Hanselmann
        break
165 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
166 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
167 4cb1d919 Michael Hanselmann
        break
168 e2715f69 Michael Hanselmann
169 e2715f69 Michael Hanselmann
    if all_success:
170 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
171 e2715f69 Michael Hanselmann
172 e2715f69 Michael Hanselmann
    return status
173 e2715f69 Michael Hanselmann
174 f1048938 Iustin Pop
175 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
176 85f03e0d Michael Hanselmann
  def RunTask(self, job):
177 e2715f69 Michael Hanselmann
    """Job executor.
178 e2715f69 Michael Hanselmann

179 85f03e0d Michael Hanselmann
    This functions processes a job.
180 e2715f69 Michael Hanselmann

181 e2715f69 Michael Hanselmann
    """
182 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
183 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
184 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
185 85f03e0d Michael Hanselmann
    queue = job.queue
186 e2715f69 Michael Hanselmann
    try:
187 85f03e0d Michael Hanselmann
      try:
188 85f03e0d Michael Hanselmann
        count = len(job.ops)
189 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
190 85f03e0d Michael Hanselmann
          try:
191 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
192 85f03e0d Michael Hanselmann
193 85f03e0d Michael Hanselmann
            queue.acquire()
194 85f03e0d Michael Hanselmann
            try:
195 85f03e0d Michael Hanselmann
              job.run_op_index = idx
196 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_RUNNING
197 85f03e0d Michael Hanselmann
              op.result = None
198 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
199 85f03e0d Michael Hanselmann
200 38206f3c Iustin Pop
              input_opcode = op.input
201 85f03e0d Michael Hanselmann
            finally:
202 85f03e0d Michael Hanselmann
              queue.release()
203 85f03e0d Michael Hanselmann
204 38206f3c Iustin Pop
            result = proc.ExecOpCode(input_opcode, op.Log)
205 85f03e0d Michael Hanselmann
206 85f03e0d Michael Hanselmann
            queue.acquire()
207 85f03e0d Michael Hanselmann
            try:
208 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
209 85f03e0d Michael Hanselmann
              op.result = result
210 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
211 85f03e0d Michael Hanselmann
            finally:
212 85f03e0d Michael Hanselmann
              queue.release()
213 85f03e0d Michael Hanselmann
214 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
215 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
216 85f03e0d Michael Hanselmann
          except Exception, err:
217 85f03e0d Michael Hanselmann
            queue.acquire()
218 85f03e0d Michael Hanselmann
            try:
219 85f03e0d Michael Hanselmann
              try:
220 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
221 85f03e0d Michael Hanselmann
                op.result = str(err)
222 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
223 85f03e0d Michael Hanselmann
              finally:
224 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
225 85f03e0d Michael Hanselmann
            finally:
226 85f03e0d Michael Hanselmann
              queue.release()
227 85f03e0d Michael Hanselmann
            raise
228 85f03e0d Michael Hanselmann
229 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
230 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
231 85f03e0d Michael Hanselmann
      except:
232 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
233 e2715f69 Michael Hanselmann
    finally:
234 85f03e0d Michael Hanselmann
      queue.acquire()
235 85f03e0d Michael Hanselmann
      try:
236 85f03e0d Michael Hanselmann
        job_id = job.id
237 85f03e0d Michael Hanselmann
        status = job.CalcStatus()
238 85f03e0d Michael Hanselmann
      finally:
239 85f03e0d Michael Hanselmann
        queue.release()
240 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
241 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
242 e2715f69 Michael Hanselmann
243 e2715f69 Michael Hanselmann
244 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
245 5bdce580 Michael Hanselmann
  def __init__(self, queue):
246 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
247 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
248 5bdce580 Michael Hanselmann
    self.queue = queue
249 e2715f69 Michael Hanselmann
250 e2715f69 Michael Hanselmann
251 85f03e0d Michael Hanselmann
class JobQueue(object):
252 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
253 f1da30e6 Michael Hanselmann
254 85f03e0d Michael Hanselmann
  def __init__(self, context):
255 5bdce580 Michael Hanselmann
    self.context = context
256 ac0930b9 Iustin Pop
    self._memcache = {}
257 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
258 f1da30e6 Michael Hanselmann
259 85f03e0d Michael Hanselmann
    # Locking
260 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
261 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
262 85f03e0d Michael Hanselmann
    self.release = self._lock.release
263 85f03e0d Michael Hanselmann
264 0cb94105 Michael Hanselmann
    # Make sure our directories exists
265 0cb94105 Michael Hanselmann
    for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
266 0cb94105 Michael Hanselmann
      try:
267 0cb94105 Michael Hanselmann
        os.mkdir(path, 0700)
268 0cb94105 Michael Hanselmann
      except OSError, err:
269 0cb94105 Michael Hanselmann
        if err.errno not in (errno.EEXIST, ):
270 0cb94105 Michael Hanselmann
          raise
271 f1da30e6 Michael Hanselmann
272 f1da30e6 Michael Hanselmann
    # Get queue lock
273 f1da30e6 Michael Hanselmann
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
274 f1da30e6 Michael Hanselmann
    try:
275 f1da30e6 Michael Hanselmann
      utils.LockFile(self.lock_fd)
276 f1da30e6 Michael Hanselmann
    except:
277 f1da30e6 Michael Hanselmann
      self.lock_fd.close()
278 f1da30e6 Michael Hanselmann
      raise
279 f1da30e6 Michael Hanselmann
280 f1da30e6 Michael Hanselmann
    # Read version
281 f1da30e6 Michael Hanselmann
    try:
282 f1da30e6 Michael Hanselmann
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
283 f1da30e6 Michael Hanselmann
    except IOError, err:
284 f1da30e6 Michael Hanselmann
      if err.errno not in (errno.ENOENT, ):
285 f1da30e6 Michael Hanselmann
        raise
286 f1da30e6 Michael Hanselmann
287 f1da30e6 Michael Hanselmann
      # Setup a new queue
288 f1da30e6 Michael Hanselmann
      self._InitQueueUnlocked()
289 f1da30e6 Michael Hanselmann
290 f1da30e6 Michael Hanselmann
      # Try to open again
291 f1da30e6 Michael Hanselmann
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
292 f1da30e6 Michael Hanselmann
293 f1da30e6 Michael Hanselmann
    try:
294 f1da30e6 Michael Hanselmann
      # Try to read version
295 f1da30e6 Michael Hanselmann
      version = int(version_fd.read(128))
296 f1da30e6 Michael Hanselmann
297 f1da30e6 Michael Hanselmann
      # Verify version
298 f1da30e6 Michael Hanselmann
      if version != constants.JOB_QUEUE_VERSION:
299 f1da30e6 Michael Hanselmann
        raise errors.JobQueueError("Found version %s, expected %s",
300 f1da30e6 Michael Hanselmann
                                   version, constants.JOB_QUEUE_VERSION)
301 f1da30e6 Michael Hanselmann
    finally:
302 f1da30e6 Michael Hanselmann
      version_fd.close()
303 f1da30e6 Michael Hanselmann
304 c4beba1c Iustin Pop
    self._last_serial = self._ReadSerial()
305 c4beba1c Iustin Pop
    if self._last_serial is None:
306 c4beba1c Iustin Pop
      raise errors.ConfigurationError("Can't read/parse the job queue serial"
307 c4beba1c Iustin Pop
                                      " file")
308 c4beba1c Iustin Pop
309 85f03e0d Michael Hanselmann
    # Setup worker pool
310 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
311 85f03e0d Michael Hanselmann
312 85f03e0d Michael Hanselmann
    # We need to lock here because WorkerPool.AddTask() may start a job while
313 85f03e0d Michael Hanselmann
    # we're still doing our work.
314 85f03e0d Michael Hanselmann
    self.acquire()
315 85f03e0d Michael Hanselmann
    try:
316 85f03e0d Michael Hanselmann
      for job in self._GetJobsUnlocked(None):
317 85f03e0d Michael Hanselmann
        status = job.CalcStatus()
318 85f03e0d Michael Hanselmann
319 85f03e0d Michael Hanselmann
        if status in (constants.JOB_STATUS_QUEUED, ):
320 85f03e0d Michael Hanselmann
          self._wpool.AddTask(job)
321 85f03e0d Michael Hanselmann
322 85f03e0d Michael Hanselmann
        elif status in (constants.JOB_STATUS_RUNNING, ):
323 85f03e0d Michael Hanselmann
          logging.warning("Unfinished job %s found: %s", job.id, job)
324 85f03e0d Michael Hanselmann
          try:
325 85f03e0d Michael Hanselmann
            for op in job.ops:
326 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_ERROR
327 85f03e0d Michael Hanselmann
              op.result = "Unclean master daemon shutdown"
328 85f03e0d Michael Hanselmann
          finally:
329 85f03e0d Michael Hanselmann
            self.UpdateJobUnlocked(job)
330 85f03e0d Michael Hanselmann
    finally:
331 85f03e0d Michael Hanselmann
      self.release()
332 85f03e0d Michael Hanselmann
333 c4beba1c Iustin Pop
  @staticmethod
334 c4beba1c Iustin Pop
  def _ReadSerial():
335 c4beba1c Iustin Pop
    """Try to read the job serial file.
336 c4beba1c Iustin Pop

337 c4beba1c Iustin Pop
    @rtype: None or int
338 c4beba1c Iustin Pop
    @return: If the serial can be read, then it is returned. Otherwise None
339 c4beba1c Iustin Pop
             is returned.
340 c4beba1c Iustin Pop

341 c4beba1c Iustin Pop
    """
342 f1da30e6 Michael Hanselmann
    try:
343 c4beba1c Iustin Pop
      serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
344 c4beba1c Iustin Pop
      try:
345 c4beba1c Iustin Pop
        # Read last serial
346 c4beba1c Iustin Pop
        serial = int(serial_fd.read(1024).strip())
347 c4beba1c Iustin Pop
      finally:
348 c4beba1c Iustin Pop
        serial_fd.close()
349 c4beba1c Iustin Pop
    except (ValueError, EnvironmentError):
350 c4beba1c Iustin Pop
      serial = None
351 c4beba1c Iustin Pop
352 c4beba1c Iustin Pop
    return serial
353 f1da30e6 Michael Hanselmann
354 f1da30e6 Michael Hanselmann
  def _InitQueueUnlocked(self):
355 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
356 f1da30e6 Michael Hanselmann
357 f1da30e6 Michael Hanselmann
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
358 f1da30e6 Michael Hanselmann
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
359 c4beba1c Iustin Pop
    if self._ReadSerial() is None:
360 c4beba1c Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
361 c4beba1c Iustin Pop
                      data="%s\n" % 0)
362 f1da30e6 Michael Hanselmann
363 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
364 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
365 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
366 85f03e0d Michael Hanselmann
    if job_id < 0:
367 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
368 85f03e0d Michael Hanselmann
369 85f03e0d Michael Hanselmann
    return str(job_id)
370 85f03e0d Michael Hanselmann
371 c3f0a12f Iustin Pop
  def _NewSerialUnlocked(self, nodes):
372 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
373 f1da30e6 Michael Hanselmann

374 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
375 f1da30e6 Michael Hanselmann

376 f1da30e6 Michael Hanselmann
    Returns: A string representing the job identifier.
377 f1da30e6 Michael Hanselmann

378 f1da30e6 Michael Hanselmann
    """
379 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
380 f1da30e6 Michael Hanselmann
381 f1da30e6 Michael Hanselmann
    # New number
382 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
383 f1da30e6 Michael Hanselmann
384 f1da30e6 Michael Hanselmann
    # Write to file
385 f1da30e6 Michael Hanselmann
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
386 f1da30e6 Michael Hanselmann
                    data="%s\n" % serial)
387 f1da30e6 Michael Hanselmann
388 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
389 f1da30e6 Michael Hanselmann
    self._last_serial = serial
390 f1da30e6 Michael Hanselmann
391 c3f0a12f Iustin Pop
    # Distribute the serial to the other nodes
392 c3f0a12f Iustin Pop
    try:
393 c3f0a12f Iustin Pop
      nodes.remove(self._my_hostname)
394 c3f0a12f Iustin Pop
    except ValueError:
395 c3f0a12f Iustin Pop
      pass
396 c3f0a12f Iustin Pop
397 c3f0a12f Iustin Pop
    result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
398 c3f0a12f Iustin Pop
    for node in nodes:
399 c3f0a12f Iustin Pop
      if not result[node]:
400 c3f0a12f Iustin Pop
        logging.error("copy of job queue file to node %s failed", node)
401 c3f0a12f Iustin Pop
402 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
403 f1da30e6 Michael Hanselmann
404 85f03e0d Michael Hanselmann
  @staticmethod
405 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
406 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
407 f1da30e6 Michael Hanselmann
408 85f03e0d Michael Hanselmann
  @staticmethod
409 85f03e0d Michael Hanselmann
  def _GetArchivedJobPath(job_id):
410 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
411 0cb94105 Michael Hanselmann
412 85f03e0d Michael Hanselmann
  @classmethod
413 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
414 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
415 fae737ac Michael Hanselmann
    if m:
416 fae737ac Michael Hanselmann
      return m.group(1)
417 fae737ac Michael Hanselmann
    else:
418 fae737ac Michael Hanselmann
      return None
419 fae737ac Michael Hanselmann
420 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
421 911a495b Iustin Pop
    """Return all known job IDs.
422 911a495b Iustin Pop

423 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
424 911a495b Iustin Pop
    included. Currently this argument is unused.
425 911a495b Iustin Pop

426 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
427 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
428 ac0930b9 Iustin Pop
    extra IDs).
429 ac0930b9 Iustin Pop

430 911a495b Iustin Pop
    """
431 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
432 f0d874fe Iustin Pop
    jlist.sort()
433 f0d874fe Iustin Pop
    return jlist
434 911a495b Iustin Pop
435 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
436 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
437 f1da30e6 Michael Hanselmann
438 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
439 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
440 f1da30e6 Michael Hanselmann
441 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
442 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
443 f1da30e6 Michael Hanselmann
444 ac0930b9 Iustin Pop
    if job_id in self._memcache:
445 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
446 ac0930b9 Iustin Pop
      return self._memcache[job_id]
447 ac0930b9 Iustin Pop
448 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
449 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
450 f1da30e6 Michael Hanselmann
    try:
451 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
452 f1da30e6 Michael Hanselmann
    except IOError, err:
453 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
454 f1da30e6 Michael Hanselmann
        return None
455 f1da30e6 Michael Hanselmann
      raise
456 f1da30e6 Michael Hanselmann
    try:
457 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
458 f1da30e6 Michael Hanselmann
    finally:
459 f1da30e6 Michael Hanselmann
      fd.close()
460 f1da30e6 Michael Hanselmann
461 ac0930b9 Iustin Pop
    job = _QueuedJob.Restore(self, data)
462 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
463 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
464 ac0930b9 Iustin Pop
    return job
465 f1da30e6 Michael Hanselmann
466 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
467 911a495b Iustin Pop
    if not job_ids:
468 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
469 f1da30e6 Michael Hanselmann
470 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
471 f1da30e6 Michael Hanselmann
472 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
473 85f03e0d Michael Hanselmann
  def SubmitJob(self, ops, nodes):
474 85f03e0d Michael Hanselmann
    """Create and store a new job.
475 f1da30e6 Michael Hanselmann

476 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
477 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
478 c3f0a12f Iustin Pop

479 c3f0a12f Iustin Pop
    @type ops: list
480 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
481 c3f0a12f Iustin Pop
    @type nodes: list
482 c3f0a12f Iustin Pop
    @param nodes: The list of nodes to which the new job serial will be
483 c3f0a12f Iustin Pop
                  distributed.
484 c3f0a12f Iustin Pop

485 c3f0a12f Iustin Pop
    """
486 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
487 f1da30e6 Michael Hanselmann
488 f1da30e6 Michael Hanselmann
    # Get job identifier
489 c3f0a12f Iustin Pop
    job_id = self._NewSerialUnlocked(nodes)
490 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
491 f1da30e6 Michael Hanselmann
492 f1da30e6 Michael Hanselmann
    # Write to disk
493 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
494 f1da30e6 Michael Hanselmann
495 205d71fd Michael Hanselmann
    logging.debug("Added new job %s to the cache", job_id)
496 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
497 ac0930b9 Iustin Pop
498 85f03e0d Michael Hanselmann
    # Add to worker pool
499 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
500 85f03e0d Michael Hanselmann
501 85f03e0d Michael Hanselmann
    return job.id
502 f1da30e6 Michael Hanselmann
503 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
504 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
505 f1da30e6 Michael Hanselmann
506 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
507 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
508 f1da30e6 Michael Hanselmann
    utils.WriteFile(filename,
509 f1da30e6 Michael Hanselmann
                    data=serializer.DumpJson(job.Serialize(), indent=False))
510 57f8615f Michael Hanselmann
    self._CleanCacheUnlocked([job.id])
511 ac0930b9 Iustin Pop
512 57f8615f Michael Hanselmann
  def _CleanCacheUnlocked(self, exclude):
513 ac0930b9 Iustin Pop
    """Clean the memory cache.
514 ac0930b9 Iustin Pop

515 ac0930b9 Iustin Pop
    The exceptions argument contains job IDs that should not be
516 ac0930b9 Iustin Pop
    cleaned.
517 ac0930b9 Iustin Pop

518 ac0930b9 Iustin Pop
    """
519 57f8615f Michael Hanselmann
    assert isinstance(exclude, list)
520 85f03e0d Michael Hanselmann
521 ac0930b9 Iustin Pop
    for job in self._memcache.values():
522 57f8615f Michael Hanselmann
      if job.id in exclude:
523 ac0930b9 Iustin Pop
        continue
524 85f03e0d Michael Hanselmann
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
525 85f03e0d Michael Hanselmann
                                  constants.JOB_STATUS_RUNNING):
526 205d71fd Michael Hanselmann
        logging.debug("Cleaning job %s from the cache", job.id)
527 ac0930b9 Iustin Pop
        try:
528 ac0930b9 Iustin Pop
          del self._memcache[job.id]
529 ac0930b9 Iustin Pop
        except KeyError:
530 ac0930b9 Iustin Pop
          pass
531 f1da30e6 Michael Hanselmann
532 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
533 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
534 188c5e0a Michael Hanselmann
    """Cancels a job.
535 188c5e0a Michael Hanselmann

536 188c5e0a Michael Hanselmann
    @type job_id: string
537 188c5e0a Michael Hanselmann
    @param job_id: Job ID of job to be cancelled.
538 188c5e0a Michael Hanselmann

539 188c5e0a Michael Hanselmann
    """
540 188c5e0a Michael Hanselmann
    logging.debug("Cancelling job %s", job_id)
541 188c5e0a Michael Hanselmann
542 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
543 188c5e0a Michael Hanselmann
    if not job:
544 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
545 188c5e0a Michael Hanselmann
      return
546 188c5e0a Michael Hanselmann
547 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
548 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
549 188c5e0a Michael Hanselmann
      return
550 188c5e0a Michael Hanselmann
551 85f03e0d Michael Hanselmann
    try:
552 85f03e0d Michael Hanselmann
      for op in job.ops:
553 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
554 85f03e0d Michael Hanselmann
        op.result = "Job cancelled by request"
555 85f03e0d Michael Hanselmann
    finally:
556 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
557 188c5e0a Michael Hanselmann
558 c609f802 Michael Hanselmann
  @utils.LockedMethod
559 f1da30e6 Michael Hanselmann
  def ArchiveJob(self, job_id):
560 c609f802 Michael Hanselmann
    """Archives a job.
561 c609f802 Michael Hanselmann

562 c609f802 Michael Hanselmann
    @type job_id: string
563 c609f802 Michael Hanselmann
    @param job_id: Job ID of job to be archived.
564 c609f802 Michael Hanselmann

565 c609f802 Michael Hanselmann
    """
566 c609f802 Michael Hanselmann
    logging.debug("Archiving job %s", job_id)
567 c609f802 Michael Hanselmann
568 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
569 c609f802 Michael Hanselmann
    if not job:
570 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
571 c609f802 Michael Hanselmann
      return
572 c609f802 Michael Hanselmann
573 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
574 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_SUCCESS,
575 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_ERROR):
576 85f03e0d Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
577 c609f802 Michael Hanselmann
      return
578 c609f802 Michael Hanselmann
579 c609f802 Michael Hanselmann
    try:
580 c609f802 Michael Hanselmann
      old = self._GetJobPath(job.id)
581 c609f802 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
582 c609f802 Michael Hanselmann
583 c609f802 Michael Hanselmann
      os.rename(old, new)
584 c609f802 Michael Hanselmann
585 c609f802 Michael Hanselmann
      logging.debug("Successfully archived job %s", job.id)
586 c609f802 Michael Hanselmann
    finally:
587 c609f802 Michael Hanselmann
      # Cleaning the cache because we don't know what os.rename actually did
588 c609f802 Michael Hanselmann
      # and to be on the safe side.
589 c609f802 Michael Hanselmann
      self._CleanCacheUnlocked([])
590 f1da30e6 Michael Hanselmann
591 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
592 e2715f69 Michael Hanselmann
    row = []
593 e2715f69 Michael Hanselmann
    for fname in fields:
594 e2715f69 Michael Hanselmann
      if fname == "id":
595 e2715f69 Michael Hanselmann
        row.append(job.id)
596 e2715f69 Michael Hanselmann
      elif fname == "status":
597 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
598 af30b2fd Michael Hanselmann
      elif fname == "ops":
599 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
600 af30b2fd Michael Hanselmann
      elif fname == "opresult":
601 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
602 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
603 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
604 f1048938 Iustin Pop
      elif fname == "ticker":
605 85f03e0d Michael Hanselmann
        ji = job.run_op_index
606 f1048938 Iustin Pop
        if ji < 0:
607 f1048938 Iustin Pop
          lmsg = None
608 f1048938 Iustin Pop
        else:
609 85f03e0d Michael Hanselmann
          lmsg = job.ops[ji].RetrieveLog(-1)
610 f1048938 Iustin Pop
          # message might be empty here
611 f1048938 Iustin Pop
          if lmsg:
612 f1048938 Iustin Pop
            lmsg = lmsg[0]
613 f1048938 Iustin Pop
          else:
614 f1048938 Iustin Pop
            lmsg = None
615 f1048938 Iustin Pop
        row.append(lmsg)
616 e2715f69 Michael Hanselmann
      else:
617 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
618 e2715f69 Michael Hanselmann
    return row
619 e2715f69 Michael Hanselmann
620 85f03e0d Michael Hanselmann
  @utils.LockedMethod
621 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
622 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
623 e2715f69 Michael Hanselmann

624 e2715f69 Michael Hanselmann
    Args:
625 e2715f69 Michael Hanselmann
    - job_ids: Sequence of job identifiers or None for all
626 e2715f69 Michael Hanselmann
    - fields: Names of fields to return
627 e2715f69 Michael Hanselmann

628 e2715f69 Michael Hanselmann
    """
629 85f03e0d Michael Hanselmann
    jobs = []
630 e2715f69 Michael Hanselmann
631 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
632 85f03e0d Michael Hanselmann
      if job is None:
633 85f03e0d Michael Hanselmann
        jobs.append(None)
634 85f03e0d Michael Hanselmann
      else:
635 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
636 e2715f69 Michael Hanselmann
637 85f03e0d Michael Hanselmann
    return jobs
638 e2715f69 Michael Hanselmann
639 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
640 e2715f69 Michael Hanselmann
  def Shutdown(self):
641 e2715f69 Michael Hanselmann
    """Stops the job queue.
642 e2715f69 Michael Hanselmann

643 e2715f69 Michael Hanselmann
    """
644 85f03e0d Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
645 85f03e0d Michael Hanselmann
646 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
647 85f03e0d Michael Hanselmann
648 85f03e0d Michael Hanselmann
    self.lock_fd.close()
649 85f03e0d Michael Hanselmann
    self.lock_fd = None