Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 57f8615f

History | View | Annotate | Download (14.4 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 498ae1cc Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 498ae1cc Iustin Pop
"""Module implementing the job queue handling."""
23 498ae1cc Iustin Pop
24 f1da30e6 Michael Hanselmann
import os
25 e2715f69 Michael Hanselmann
import logging
26 e2715f69 Michael Hanselmann
import threading
27 f1da30e6 Michael Hanselmann
import errno
28 f1da30e6 Michael Hanselmann
import re
29 f1048938 Iustin Pop
import time
30 498ae1cc Iustin Pop
31 e2715f69 Michael Hanselmann
from ganeti import constants
32 f1da30e6 Michael Hanselmann
from ganeti import serializer
33 e2715f69 Michael Hanselmann
from ganeti import workerpool
34 f1da30e6 Michael Hanselmann
from ganeti import opcodes
35 7a1ecaed Iustin Pop
from ganeti import errors
36 e2715f69 Michael Hanselmann
from ganeti import mcpu
37 7996a135 Iustin Pop
from ganeti import utils
38 e2715f69 Michael Hanselmann
39 e2715f69 Michael Hanselmann
40 e2715f69 Michael Hanselmann
JOBQUEUE_THREADS = 5
41 e2715f69 Michael Hanselmann
42 498ae1cc Iustin Pop
43 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
44 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
45 e2715f69 Michael Hanselmann

46 307149a8 Iustin Pop
  Access is synchronized by the '_lock' attribute.
47 e2715f69 Michael Hanselmann

48 f1048938 Iustin Pop
  The 'log' attribute holds the execution log and consists of tuples
49 f1048938 Iustin Pop
  of the form (timestamp, level, message).
50 f1048938 Iustin Pop

51 e2715f69 Michael Hanselmann
  """
52 e2715f69 Michael Hanselmann
  def __init__(self, op):
53 f1048938 Iustin Pop
    self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
54 f1da30e6 Michael Hanselmann
55 f1048938 Iustin Pop
  def __Setup(self, input_, status, result, log):
56 307149a8 Iustin Pop
    self._lock = threading.Lock()
57 f1048938 Iustin Pop
    self.input = input_
58 f1da30e6 Michael Hanselmann
    self.status = status
59 f1da30e6 Michael Hanselmann
    self.result = result
60 f1048938 Iustin Pop
    self.log = log
61 f1da30e6 Michael Hanselmann
62 f1da30e6 Michael Hanselmann
  @classmethod
63 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
64 f1da30e6 Michael Hanselmann
    obj = object.__new__(cls)
65 f1da30e6 Michael Hanselmann
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
66 f1048938 Iustin Pop
                state["status"], state["result"], state["log"])
67 f1da30e6 Michael Hanselmann
    return obj
68 f1da30e6 Michael Hanselmann
69 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
70 f1da30e6 Michael Hanselmann
  def Serialize(self):
71 f1da30e6 Michael Hanselmann
    return {
72 f1da30e6 Michael Hanselmann
      "input": self.input.__getstate__(),
73 f1da30e6 Michael Hanselmann
      "status": self.status,
74 f1da30e6 Michael Hanselmann
      "result": self.result,
75 f1048938 Iustin Pop
      "log": self.log,
76 f1da30e6 Michael Hanselmann
      }
77 307149a8 Iustin Pop
78 307149a8 Iustin Pop
  @utils.LockedMethod
79 af30b2fd Michael Hanselmann
  def GetInput(self):
80 af30b2fd Michael Hanselmann
    """Returns the original opcode.
81 af30b2fd Michael Hanselmann

82 af30b2fd Michael Hanselmann
    """
83 af30b2fd Michael Hanselmann
    return self.input
84 af30b2fd Michael Hanselmann
85 af30b2fd Michael Hanselmann
  @utils.LockedMethod
86 307149a8 Iustin Pop
  def SetStatus(self, status, result):
87 307149a8 Iustin Pop
    """Update the opcode status and result.
88 307149a8 Iustin Pop

89 307149a8 Iustin Pop
    """
90 307149a8 Iustin Pop
    self.status = status
91 307149a8 Iustin Pop
    self.result = result
92 307149a8 Iustin Pop
93 307149a8 Iustin Pop
  @utils.LockedMethod
94 307149a8 Iustin Pop
  def GetStatus(self):
95 307149a8 Iustin Pop
    """Get the opcode status.
96 307149a8 Iustin Pop

97 307149a8 Iustin Pop
    """
98 307149a8 Iustin Pop
    return self.status
99 307149a8 Iustin Pop
100 307149a8 Iustin Pop
  @utils.LockedMethod
101 307149a8 Iustin Pop
  def GetResult(self):
102 307149a8 Iustin Pop
    """Get the opcode result.
103 307149a8 Iustin Pop

104 307149a8 Iustin Pop
    """
105 307149a8 Iustin Pop
    return self.result
106 e2715f69 Michael Hanselmann
107 f1048938 Iustin Pop
  @utils.LockedMethod
108 f1048938 Iustin Pop
  def Log(self, *args):
109 f1048938 Iustin Pop
    """Append a log entry.
110 f1048938 Iustin Pop

111 f1048938 Iustin Pop
    """
112 f1048938 Iustin Pop
    assert len(args) < 2
113 f1048938 Iustin Pop
114 f1048938 Iustin Pop
    if len(args) == 1:
115 f1048938 Iustin Pop
      log_type = constants.ELOG_MESSAGE
116 f1048938 Iustin Pop
      log_msg = args[0]
117 f1048938 Iustin Pop
    else:
118 f1048938 Iustin Pop
      log_type, log_msg = args
119 f1048938 Iustin Pop
    self.log.append((time.time(), log_type, log_msg))
120 f1048938 Iustin Pop
121 f1048938 Iustin Pop
  @utils.LockedMethod
122 f1048938 Iustin Pop
  def RetrieveLog(self, start_at=0):
123 f1048938 Iustin Pop
    """Retrieve (a part of) the execution log.
124 f1048938 Iustin Pop

125 f1048938 Iustin Pop
    """
126 f1048938 Iustin Pop
    return self.log[start_at:]
127 f1048938 Iustin Pop
128 e2715f69 Michael Hanselmann
129 e2715f69 Michael Hanselmann
class _QueuedJob(object):
130 e2715f69 Michael Hanselmann
  """In-memory job representation.
131 e2715f69 Michael Hanselmann

132 e2715f69 Michael Hanselmann
  This is what we use to track the user-submitted jobs.
133 e2715f69 Michael Hanselmann

134 e2715f69 Michael Hanselmann
  """
135 f1da30e6 Michael Hanselmann
  def __init__(self, storage, job_id, ops):
136 e2715f69 Michael Hanselmann
    if not ops:
137 e2715f69 Michael Hanselmann
      # TODO
138 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
139 e2715f69 Michael Hanselmann
140 f1048938 Iustin Pop
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
141 e2715f69 Michael Hanselmann
142 f1048938 Iustin Pop
  def __Setup(self, storage, job_id, ops, run_op_index):
143 f1048938 Iustin Pop
    self._lock = threading.Lock()
144 f1da30e6 Michael Hanselmann
    self.storage = storage
145 f1da30e6 Michael Hanselmann
    self.id = job_id
146 f1da30e6 Michael Hanselmann
    self._ops = ops
147 f1048938 Iustin Pop
    self.run_op_index = run_op_index
148 f1da30e6 Michael Hanselmann
149 f1da30e6 Michael Hanselmann
  @classmethod
150 f1da30e6 Michael Hanselmann
  def Restore(cls, storage, state):
151 f1da30e6 Michael Hanselmann
    obj = object.__new__(cls)
152 f1048938 Iustin Pop
    op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
153 f1048938 Iustin Pop
    obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
154 f1da30e6 Michael Hanselmann
    return obj
155 f1da30e6 Michael Hanselmann
156 f1da30e6 Michael Hanselmann
  def Serialize(self):
157 f1da30e6 Michael Hanselmann
    return {
158 f1da30e6 Michael Hanselmann
      "id": self.id,
159 f1da30e6 Michael Hanselmann
      "ops": [op.Serialize() for op in self._ops],
160 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
161 f1da30e6 Michael Hanselmann
      }
162 f1da30e6 Michael Hanselmann
163 f1da30e6 Michael Hanselmann
  def SetUnclean(self, msg):
164 f1da30e6 Michael Hanselmann
    try:
165 f1da30e6 Michael Hanselmann
      for op in self._ops:
166 f1da30e6 Michael Hanselmann
        op.SetStatus(constants.OP_STATUS_ERROR, msg)
167 f1da30e6 Michael Hanselmann
    finally:
168 f1da30e6 Michael Hanselmann
      self.storage.UpdateJob(self)
169 e2715f69 Michael Hanselmann
170 307149a8 Iustin Pop
  def GetStatus(self):
171 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
172 e2715f69 Michael Hanselmann
173 e2715f69 Michael Hanselmann
    all_success = True
174 e2715f69 Michael Hanselmann
    for op in self._ops:
175 307149a8 Iustin Pop
      op_status = op.GetStatus()
176 307149a8 Iustin Pop
      if op_status == constants.OP_STATUS_SUCCESS:
177 e2715f69 Michael Hanselmann
        continue
178 e2715f69 Michael Hanselmann
179 e2715f69 Michael Hanselmann
      all_success = False
180 e2715f69 Michael Hanselmann
181 307149a8 Iustin Pop
      if op_status == constants.OP_STATUS_QUEUED:
182 e2715f69 Michael Hanselmann
        pass
183 307149a8 Iustin Pop
      elif op_status == constants.OP_STATUS_RUNNING:
184 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
185 f1da30e6 Michael Hanselmann
      elif op_status == constants.OP_STATUS_ERROR:
186 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
187 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
188 f1da30e6 Michael Hanselmann
        break
189 e2715f69 Michael Hanselmann
190 e2715f69 Michael Hanselmann
    if all_success:
191 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
192 e2715f69 Michael Hanselmann
193 e2715f69 Michael Hanselmann
    return status
194 e2715f69 Michael Hanselmann
195 f1048938 Iustin Pop
  @utils.LockedMethod
196 f1048938 Iustin Pop
  def GetRunOpIndex(self):
197 f1048938 Iustin Pop
    return self.run_op_index
198 f1048938 Iustin Pop
199 e2715f69 Michael Hanselmann
  def Run(self, proc):
200 e2715f69 Michael Hanselmann
    """Job executor.
201 e2715f69 Michael Hanselmann

202 e2715f69 Michael Hanselmann
    This functions processes a this job in the context of given processor
203 e2715f69 Michael Hanselmann
    instance.
204 e2715f69 Michael Hanselmann

205 e2715f69 Michael Hanselmann
    Args:
206 e2715f69 Michael Hanselmann
    - proc: Ganeti Processor to run the job with
207 e2715f69 Michael Hanselmann

208 e2715f69 Michael Hanselmann
    """
209 e2715f69 Michael Hanselmann
    try:
210 c8549bfd Michael Hanselmann
      count = len(self._ops)
211 c8549bfd Michael Hanselmann
      for idx, op in enumerate(self._ops):
212 e2715f69 Michael Hanselmann
        try:
213 307149a8 Iustin Pop
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
214 f1048938 Iustin Pop
215 f1048938 Iustin Pop
          self._lock.acquire()
216 f1048938 Iustin Pop
          try:
217 f1048938 Iustin Pop
            self.run_op_index = idx
218 f1048938 Iustin Pop
          finally:
219 f1048938 Iustin Pop
            self._lock.release()
220 f1048938 Iustin Pop
221 307149a8 Iustin Pop
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
222 f1da30e6 Michael Hanselmann
          self.storage.UpdateJob(self)
223 e2715f69 Michael Hanselmann
224 f1048938 Iustin Pop
          result = proc.ExecOpCode(op.input, op.Log)
225 e2715f69 Michael Hanselmann
226 307149a8 Iustin Pop
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
227 f1da30e6 Michael Hanselmann
          self.storage.UpdateJob(self)
228 307149a8 Iustin Pop
          logging.debug("Op %s/%s: Successfully finished %s",
229 307149a8 Iustin Pop
                        idx + 1, count, op)
230 e2715f69 Michael Hanselmann
        except Exception, err:
231 f1da30e6 Michael Hanselmann
          try:
232 f1da30e6 Michael Hanselmann
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
233 f1da30e6 Michael Hanselmann
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
234 f1da30e6 Michael Hanselmann
          finally:
235 f1da30e6 Michael Hanselmann
            self.storage.UpdateJob(self)
236 e2715f69 Michael Hanselmann
          raise
237 e2715f69 Michael Hanselmann
238 e2715f69 Michael Hanselmann
    except errors.GenericError, err:
239 e2715f69 Michael Hanselmann
      logging.error("ganeti exception %s", exc_info=err)
240 e2715f69 Michael Hanselmann
    except Exception, err:
241 e2715f69 Michael Hanselmann
      logging.error("unhandled exception %s", exc_info=err)
242 e2715f69 Michael Hanselmann
    except:
243 e2715f69 Michael Hanselmann
      logging.error("unhandled unknown exception %s", exc_info=err)
244 e2715f69 Michael Hanselmann
245 e2715f69 Michael Hanselmann
246 e2715f69 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
247 e2715f69 Michael Hanselmann
  def RunTask(self, job):
248 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
249 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
250 e2715f69 Michael Hanselmann
    # TODO: feedback function
251 f1048938 Iustin Pop
    proc = mcpu.Processor(self.pool.context)
252 e2715f69 Michael Hanselmann
    try:
253 e2715f69 Michael Hanselmann
      job.Run(proc)
254 e2715f69 Michael Hanselmann
    finally:
255 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
256 e2715f69 Michael Hanselmann
                    self.worker_id, job.id, job.GetStatus())
257 e2715f69 Michael Hanselmann
258 e2715f69 Michael Hanselmann
259 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
260 e2715f69 Michael Hanselmann
  def __init__(self, context):
261 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
262 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
263 e2715f69 Michael Hanselmann
    self.context = context
264 e2715f69 Michael Hanselmann
265 e2715f69 Michael Hanselmann
266 f1da30e6 Michael Hanselmann
class JobStorage(object):
267 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
268 f1da30e6 Michael Hanselmann
269 f1da30e6 Michael Hanselmann
  def __init__(self):
270 f1da30e6 Michael Hanselmann
    self._lock = threading.Lock()
271 ac0930b9 Iustin Pop
    self._memcache = {}
272 f1da30e6 Michael Hanselmann
273 f1da30e6 Michael Hanselmann
    # Make sure our directory exists
274 f1da30e6 Michael Hanselmann
    try:
275 f1da30e6 Michael Hanselmann
      os.mkdir(constants.QUEUE_DIR, 0700)
276 f1da30e6 Michael Hanselmann
    except OSError, err:
277 f1da30e6 Michael Hanselmann
      if err.errno not in (errno.EEXIST, ):
278 f1da30e6 Michael Hanselmann
        raise
279 f1da30e6 Michael Hanselmann
280 f1da30e6 Michael Hanselmann
    # Get queue lock
281 f1da30e6 Michael Hanselmann
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
282 f1da30e6 Michael Hanselmann
    try:
283 f1da30e6 Michael Hanselmann
      utils.LockFile(self.lock_fd)
284 f1da30e6 Michael Hanselmann
    except:
285 f1da30e6 Michael Hanselmann
      self.lock_fd.close()
286 f1da30e6 Michael Hanselmann
      raise
287 f1da30e6 Michael Hanselmann
288 f1da30e6 Michael Hanselmann
    # Read version
289 f1da30e6 Michael Hanselmann
    try:
290 f1da30e6 Michael Hanselmann
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
291 f1da30e6 Michael Hanselmann
    except IOError, err:
292 f1da30e6 Michael Hanselmann
      if err.errno not in (errno.ENOENT, ):
293 f1da30e6 Michael Hanselmann
        raise
294 f1da30e6 Michael Hanselmann
295 f1da30e6 Michael Hanselmann
      # Setup a new queue
296 f1da30e6 Michael Hanselmann
      self._InitQueueUnlocked()
297 f1da30e6 Michael Hanselmann
298 f1da30e6 Michael Hanselmann
      # Try to open again
299 f1da30e6 Michael Hanselmann
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
300 f1da30e6 Michael Hanselmann
301 f1da30e6 Michael Hanselmann
    try:
302 f1da30e6 Michael Hanselmann
      # Try to read version
303 f1da30e6 Michael Hanselmann
      version = int(version_fd.read(128))
304 f1da30e6 Michael Hanselmann
305 f1da30e6 Michael Hanselmann
      # Verify version
306 f1da30e6 Michael Hanselmann
      if version != constants.JOB_QUEUE_VERSION:
307 f1da30e6 Michael Hanselmann
        raise errors.JobQueueError("Found version %s, expected %s",
308 f1da30e6 Michael Hanselmann
                                   version, constants.JOB_QUEUE_VERSION)
309 f1da30e6 Michael Hanselmann
    finally:
310 f1da30e6 Michael Hanselmann
      version_fd.close()
311 f1da30e6 Michael Hanselmann
312 f1da30e6 Michael Hanselmann
    serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
313 f1da30e6 Michael Hanselmann
    try:
314 f1da30e6 Michael Hanselmann
      # Read last serial
315 f1da30e6 Michael Hanselmann
      self._last_serial = int(serial_fd.read(1024).strip())
316 f1da30e6 Michael Hanselmann
    finally:
317 f1da30e6 Michael Hanselmann
      serial_fd.close()
318 f1da30e6 Michael Hanselmann
319 f1da30e6 Michael Hanselmann
  def Close(self):
320 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
321 f1da30e6 Michael Hanselmann
322 f1da30e6 Michael Hanselmann
    self.lock_fd.close()
323 f1da30e6 Michael Hanselmann
    self.lock_fd = None
324 f1da30e6 Michael Hanselmann
325 f1da30e6 Michael Hanselmann
  def _InitQueueUnlocked(self):
326 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
327 f1da30e6 Michael Hanselmann
328 f1da30e6 Michael Hanselmann
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
329 f1da30e6 Michael Hanselmann
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
330 f1da30e6 Michael Hanselmann
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
331 f1da30e6 Michael Hanselmann
                    data="%s\n" % 0)
332 f1da30e6 Michael Hanselmann
333 f1da30e6 Michael Hanselmann
  def _NewSerialUnlocked(self):
334 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
335 f1da30e6 Michael Hanselmann

336 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
337 f1da30e6 Michael Hanselmann

338 f1da30e6 Michael Hanselmann
    Returns: A string representing the job identifier.
339 f1da30e6 Michael Hanselmann

340 f1da30e6 Michael Hanselmann
    """
341 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
342 f1da30e6 Michael Hanselmann
343 f1da30e6 Michael Hanselmann
    # New number
344 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
345 f1da30e6 Michael Hanselmann
346 f1da30e6 Michael Hanselmann
    # Write to file
347 f1da30e6 Michael Hanselmann
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
348 f1da30e6 Michael Hanselmann
                    data="%s\n" % serial)
349 f1da30e6 Michael Hanselmann
350 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
351 f1da30e6 Michael Hanselmann
    self._last_serial = serial
352 f1da30e6 Michael Hanselmann
353 f1da30e6 Michael Hanselmann
    return serial
354 f1da30e6 Michael Hanselmann
355 f1da30e6 Michael Hanselmann
  def _GetJobPath(self, job_id):
356 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
357 f1da30e6 Michael Hanselmann
358 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
359 911a495b Iustin Pop
    """Return all known job IDs.
360 911a495b Iustin Pop

361 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
362 911a495b Iustin Pop
    included. Currently this argument is unused.
363 911a495b Iustin Pop

364 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
365 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
366 ac0930b9 Iustin Pop
    extra IDs).
367 ac0930b9 Iustin Pop

368 911a495b Iustin Pop
    """
369 911a495b Iustin Pop
    jfiles = self._ListJobFiles()
370 f0d874fe Iustin Pop
    jlist = [int(m.group(1)) for m in
371 f0d874fe Iustin Pop
             [self._RE_JOB_FILE.match(name) for name in jfiles]]
372 f0d874fe Iustin Pop
    jlist.sort()
373 f0d874fe Iustin Pop
    return jlist
374 911a495b Iustin Pop
375 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
376 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
377 f1da30e6 Michael Hanselmann
378 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
379 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
380 f1da30e6 Michael Hanselmann
381 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
382 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
383 f1da30e6 Michael Hanselmann
384 ac0930b9 Iustin Pop
    if job_id in self._memcache:
385 ac0930b9 Iustin Pop
      logging.debug("Found job %d in memcache", job_id)
386 ac0930b9 Iustin Pop
      return self._memcache[job_id]
387 ac0930b9 Iustin Pop
388 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
389 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
390 f1da30e6 Michael Hanselmann
    try:
391 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
392 f1da30e6 Michael Hanselmann
    except IOError, err:
393 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
394 f1da30e6 Michael Hanselmann
        return None
395 f1da30e6 Michael Hanselmann
      raise
396 f1da30e6 Michael Hanselmann
    try:
397 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
398 f1da30e6 Michael Hanselmann
    finally:
399 f1da30e6 Michael Hanselmann
      fd.close()
400 f1da30e6 Michael Hanselmann
401 ac0930b9 Iustin Pop
    job = _QueuedJob.Restore(self, data)
402 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
403 ac0930b9 Iustin Pop
    logging.debug("Added job %d to the cache", job_id)
404 ac0930b9 Iustin Pop
    return job
405 f1da30e6 Michael Hanselmann
406 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
407 911a495b Iustin Pop
    if not job_ids:
408 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
409 f1da30e6 Michael Hanselmann
410 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
411 f1da30e6 Michael Hanselmann
412 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
413 f1da30e6 Michael Hanselmann
  def GetJobs(self, job_ids):
414 f1da30e6 Michael Hanselmann
    return self._GetJobsUnlocked(job_ids)
415 f1da30e6 Michael Hanselmann
416 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
417 f1da30e6 Michael Hanselmann
  def AddJob(self, ops):
418 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
419 f1da30e6 Michael Hanselmann
420 f1da30e6 Michael Hanselmann
    # Get job identifier
421 f1da30e6 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
422 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
423 f1da30e6 Michael Hanselmann
424 f1da30e6 Michael Hanselmann
    # Write to disk
425 f1da30e6 Michael Hanselmann
    self._UpdateJobUnlocked(job)
426 f1da30e6 Michael Hanselmann
427 ac0930b9 Iustin Pop
    logging.debug("Added new job %d to the cache", job_id)
428 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
429 ac0930b9 Iustin Pop
430 f1da30e6 Michael Hanselmann
    return job
431 f1da30e6 Michael Hanselmann
432 f1da30e6 Michael Hanselmann
  def _UpdateJobUnlocked(self, job):
433 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
434 f1da30e6 Michael Hanselmann
435 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
436 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
437 f1da30e6 Michael Hanselmann
    utils.WriteFile(filename,
438 f1da30e6 Michael Hanselmann
                    data=serializer.DumpJson(job.Serialize(), indent=False))
439 57f8615f Michael Hanselmann
    self._CleanCacheUnlocked([job.id])
440 ac0930b9 Iustin Pop
441 57f8615f Michael Hanselmann
  def _CleanCacheUnlocked(self, exclude):
442 ac0930b9 Iustin Pop
    """Clean the memory cache.
443 ac0930b9 Iustin Pop

444 ac0930b9 Iustin Pop
    The exceptions argument contains job IDs that should not be
445 ac0930b9 Iustin Pop
    cleaned.
446 ac0930b9 Iustin Pop

447 ac0930b9 Iustin Pop
    """
448 57f8615f Michael Hanselmann
    assert isinstance(exclude, list)
449 ac0930b9 Iustin Pop
    for job in self._memcache.values():
450 57f8615f Michael Hanselmann
      if job.id in exclude:
451 ac0930b9 Iustin Pop
        continue
452 ac0930b9 Iustin Pop
      if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
453 ac0930b9 Iustin Pop
                                 constants.JOB_STATUS_RUNNING):
454 ac0930b9 Iustin Pop
        logging.debug("Cleaning job %d from the cache", job.id)
455 ac0930b9 Iustin Pop
        try:
456 ac0930b9 Iustin Pop
          del self._memcache[job.id]
457 ac0930b9 Iustin Pop
        except KeyError:
458 ac0930b9 Iustin Pop
          pass
459 f1da30e6 Michael Hanselmann
460 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
461 f1da30e6 Michael Hanselmann
  def UpdateJob(self, job):
462 f1da30e6 Michael Hanselmann
    return self._UpdateJobUnlocked(job)
463 f1da30e6 Michael Hanselmann
464 f1da30e6 Michael Hanselmann
  def ArchiveJob(self, job_id):
465 f1da30e6 Michael Hanselmann
    raise NotImplementedError()
466 f1da30e6 Michael Hanselmann
467 f1da30e6 Michael Hanselmann
468 e2715f69 Michael Hanselmann
class JobQueue:
469 e2715f69 Michael Hanselmann
  """The job queue.
470 e2715f69 Michael Hanselmann

471 e2715f69 Michael Hanselmann
   """
472 e2715f69 Michael Hanselmann
  def __init__(self, context):
473 e2715f69 Michael Hanselmann
    self._lock = threading.Lock()
474 f1da30e6 Michael Hanselmann
    self._jobs = JobStorage()
475 e2715f69 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(context)
476 e2715f69 Michael Hanselmann
477 f1da30e6 Michael Hanselmann
    for job in self._jobs.GetJobs(None):
478 f1da30e6 Michael Hanselmann
      status = job.GetStatus()
479 f1da30e6 Michael Hanselmann
      if status in (constants.JOB_STATUS_QUEUED, ):
480 f1da30e6 Michael Hanselmann
        self._wpool.AddTask(job)
481 e2715f69 Michael Hanselmann
482 f1da30e6 Michael Hanselmann
      elif status in (constants.JOB_STATUS_RUNNING, ):
483 f1da30e6 Michael Hanselmann
        logging.warning("Unfinished job %s found: %s", job.id, job)
484 f1da30e6 Michael Hanselmann
        job.SetUnclean("Unclean master daemon shutdown")
485 e2715f69 Michael Hanselmann
486 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
487 e2715f69 Michael Hanselmann
  def SubmitJob(self, ops):
488 e2715f69 Michael Hanselmann
    """Add a new job to the queue.
489 e2715f69 Michael Hanselmann

490 e2715f69 Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
491 e2715f69 Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
492 e2715f69 Michael Hanselmann

493 e2715f69 Michael Hanselmann
    Args:
494 e2715f69 Michael Hanselmann
    - ops: Sequence of opcodes
495 e2715f69 Michael Hanselmann

496 e2715f69 Michael Hanselmann
    """
497 f1da30e6 Michael Hanselmann
    job = self._jobs.AddJob(ops)
498 e2715f69 Michael Hanselmann
499 e2715f69 Michael Hanselmann
    # Add to worker pool
500 e2715f69 Michael Hanselmann
    self._wpool.AddTask(job)
501 e2715f69 Michael Hanselmann
502 f1da30e6 Michael Hanselmann
    return job.id
503 e2715f69 Michael Hanselmann
504 e2715f69 Michael Hanselmann
  def ArchiveJob(self, job_id):
505 e2715f69 Michael Hanselmann
    raise NotImplementedError()
506 e2715f69 Michael Hanselmann
507 e2715f69 Michael Hanselmann
  def CancelJob(self, job_id):
508 e2715f69 Michael Hanselmann
    raise NotImplementedError()
509 e2715f69 Michael Hanselmann
510 e2715f69 Michael Hanselmann
  def _GetJobInfo(self, job, fields):
511 e2715f69 Michael Hanselmann
    row = []
512 e2715f69 Michael Hanselmann
    for fname in fields:
513 e2715f69 Michael Hanselmann
      if fname == "id":
514 e2715f69 Michael Hanselmann
        row.append(job.id)
515 e2715f69 Michael Hanselmann
      elif fname == "status":
516 e2715f69 Michael Hanselmann
        row.append(job.GetStatus())
517 af30b2fd Michael Hanselmann
      elif fname == "ops":
518 af30b2fd Michael Hanselmann
        row.append([op.GetInput().__getstate__() for op in job._ops])
519 af30b2fd Michael Hanselmann
      elif fname == "opresult":
520 307149a8 Iustin Pop
        row.append([op.GetResult() for op in job._ops])
521 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
522 af30b2fd Michael Hanselmann
        row.append([op.GetStatus() for op in job._ops])
523 f1048938 Iustin Pop
      elif fname == "ticker":
524 f1048938 Iustin Pop
        ji = job.GetRunOpIndex()
525 f1048938 Iustin Pop
        if ji < 0:
526 f1048938 Iustin Pop
          lmsg = None
527 f1048938 Iustin Pop
        else:
528 f1048938 Iustin Pop
          lmsg = job._ops[ji].RetrieveLog(-1)
529 f1048938 Iustin Pop
          # message might be empty here
530 f1048938 Iustin Pop
          if lmsg:
531 f1048938 Iustin Pop
            lmsg = lmsg[0]
532 f1048938 Iustin Pop
          else:
533 f1048938 Iustin Pop
            lmsg = None
534 f1048938 Iustin Pop
        row.append(lmsg)
535 e2715f69 Michael Hanselmann
      else:
536 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
537 e2715f69 Michael Hanselmann
    return row
538 e2715f69 Michael Hanselmann
539 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
540 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
541 e2715f69 Michael Hanselmann

542 e2715f69 Michael Hanselmann
    Args:
543 e2715f69 Michael Hanselmann
    - job_ids: Sequence of job identifiers or None for all
544 e2715f69 Michael Hanselmann
    - fields: Names of fields to return
545 e2715f69 Michael Hanselmann

546 e2715f69 Michael Hanselmann
    """
547 e2715f69 Michael Hanselmann
    self._lock.acquire()
548 e2715f69 Michael Hanselmann
    try:
549 e2715f69 Michael Hanselmann
      jobs = []
550 e2715f69 Michael Hanselmann
551 f1da30e6 Michael Hanselmann
      for job in self._jobs.GetJobs(job_ids):
552 e2715f69 Michael Hanselmann
        if job is None:
553 e2715f69 Michael Hanselmann
          jobs.append(None)
554 e2715f69 Michael Hanselmann
        else:
555 e2715f69 Michael Hanselmann
          jobs.append(self._GetJobInfo(job, fields))
556 e2715f69 Michael Hanselmann
557 e2715f69 Michael Hanselmann
      return jobs
558 e2715f69 Michael Hanselmann
    finally:
559 e2715f69 Michael Hanselmann
      self._lock.release()
560 e2715f69 Michael Hanselmann
561 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
562 e2715f69 Michael Hanselmann
  def Shutdown(self):
563 e2715f69 Michael Hanselmann
    """Stops the job queue.
564 e2715f69 Michael Hanselmann

565 e2715f69 Michael Hanselmann
    """
566 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
567 f1da30e6 Michael Hanselmann
    self._jobs.Close()