Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ ac0930b9

History | View | Annotate | Download (13.1 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 498ae1cc Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 498ae1cc Iustin Pop
"""Module implementing the job queue handling."""
23 498ae1cc Iustin Pop
24 f1da30e6 Michael Hanselmann
import os
25 e2715f69 Michael Hanselmann
import logging
26 e2715f69 Michael Hanselmann
import threading
27 f1da30e6 Michael Hanselmann
import errno
28 f1da30e6 Michael Hanselmann
import re
29 498ae1cc Iustin Pop
30 e2715f69 Michael Hanselmann
from ganeti import constants
31 f1da30e6 Michael Hanselmann
from ganeti import serializer
32 e2715f69 Michael Hanselmann
from ganeti import workerpool
33 f1da30e6 Michael Hanselmann
from ganeti import opcodes
34 7a1ecaed Iustin Pop
from ganeti import errors
35 e2715f69 Michael Hanselmann
from ganeti import mcpu
36 7996a135 Iustin Pop
from ganeti import utils
37 e2715f69 Michael Hanselmann
38 e2715f69 Michael Hanselmann
39 e2715f69 Michael Hanselmann
JOBQUEUE_THREADS = 5
40 e2715f69 Michael Hanselmann
41 498ae1cc Iustin Pop
42 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
43 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
44 e2715f69 Michael Hanselmann

45 307149a8 Iustin Pop
  Access is synchronized by the '_lock' attribute.
46 e2715f69 Michael Hanselmann

47 e2715f69 Michael Hanselmann
  """
48 e2715f69 Michael Hanselmann
  def __init__(self, op):
49 f1da30e6 Michael Hanselmann
    self.__Setup(op, constants.OP_STATUS_QUEUED, None)
50 f1da30e6 Michael Hanselmann
51 f1da30e6 Michael Hanselmann
  def __Setup(self, input, status, result):
52 307149a8 Iustin Pop
    self._lock = threading.Lock()
53 f1da30e6 Michael Hanselmann
    self.input = input
54 f1da30e6 Michael Hanselmann
    self.status = status
55 f1da30e6 Michael Hanselmann
    self.result = result
56 f1da30e6 Michael Hanselmann
57 f1da30e6 Michael Hanselmann
  @classmethod
58 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
59 f1da30e6 Michael Hanselmann
    obj = object.__new__(cls)
60 f1da30e6 Michael Hanselmann
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
61 f1da30e6 Michael Hanselmann
                state["status"], state["result"])
62 f1da30e6 Michael Hanselmann
    return obj
63 f1da30e6 Michael Hanselmann
64 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
65 f1da30e6 Michael Hanselmann
  def Serialize(self):
66 f1da30e6 Michael Hanselmann
    return {
67 f1da30e6 Michael Hanselmann
      "input": self.input.__getstate__(),
68 f1da30e6 Michael Hanselmann
      "status": self.status,
69 f1da30e6 Michael Hanselmann
      "result": self.result,
70 f1da30e6 Michael Hanselmann
      }
71 307149a8 Iustin Pop
72 307149a8 Iustin Pop
  @utils.LockedMethod
73 af30b2fd Michael Hanselmann
  def GetInput(self):
74 af30b2fd Michael Hanselmann
    """Returns the original opcode.
75 af30b2fd Michael Hanselmann

76 af30b2fd Michael Hanselmann
    """
77 af30b2fd Michael Hanselmann
    return self.input
78 af30b2fd Michael Hanselmann
79 af30b2fd Michael Hanselmann
  @utils.LockedMethod
80 307149a8 Iustin Pop
  def SetStatus(self, status, result):
81 307149a8 Iustin Pop
    """Update the opcode status and result.
82 307149a8 Iustin Pop

83 307149a8 Iustin Pop
    """
84 307149a8 Iustin Pop
    self.status = status
85 307149a8 Iustin Pop
    self.result = result
86 307149a8 Iustin Pop
87 307149a8 Iustin Pop
  @utils.LockedMethod
88 307149a8 Iustin Pop
  def GetStatus(self):
89 307149a8 Iustin Pop
    """Get the opcode status.
90 307149a8 Iustin Pop

91 307149a8 Iustin Pop
    """
92 307149a8 Iustin Pop
    return self.status
93 307149a8 Iustin Pop
94 307149a8 Iustin Pop
  @utils.LockedMethod
95 307149a8 Iustin Pop
  def GetResult(self):
96 307149a8 Iustin Pop
    """Get the opcode result.
97 307149a8 Iustin Pop

98 307149a8 Iustin Pop
    """
99 307149a8 Iustin Pop
    return self.result
100 e2715f69 Michael Hanselmann
101 e2715f69 Michael Hanselmann
102 e2715f69 Michael Hanselmann
class _QueuedJob(object):
103 e2715f69 Michael Hanselmann
  """In-memory job representation.
104 e2715f69 Michael Hanselmann

105 e2715f69 Michael Hanselmann
  This is what we use to track the user-submitted jobs.
106 e2715f69 Michael Hanselmann

107 e2715f69 Michael Hanselmann
  """
108 f1da30e6 Michael Hanselmann
  def __init__(self, storage, job_id, ops):
109 e2715f69 Michael Hanselmann
    if not ops:
110 e2715f69 Michael Hanselmann
      # TODO
111 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
112 e2715f69 Michael Hanselmann
113 f1da30e6 Michael Hanselmann
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops])
114 e2715f69 Michael Hanselmann
115 f1da30e6 Michael Hanselmann
  def __Setup(self, storage, job_id, ops):
116 f1da30e6 Michael Hanselmann
    self.storage = storage
117 f1da30e6 Michael Hanselmann
    self.id = job_id
118 f1da30e6 Michael Hanselmann
    self._ops = ops
119 f1da30e6 Michael Hanselmann
120 f1da30e6 Michael Hanselmann
  @classmethod
121 f1da30e6 Michael Hanselmann
  def Restore(cls, storage, state):
122 f1da30e6 Michael Hanselmann
    obj = object.__new__(cls)
123 f1da30e6 Michael Hanselmann
    obj.__Setup(storage, state["id"],
124 f1da30e6 Michael Hanselmann
                [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]])
125 f1da30e6 Michael Hanselmann
    return obj
126 f1da30e6 Michael Hanselmann
127 f1da30e6 Michael Hanselmann
  def Serialize(self):
128 f1da30e6 Michael Hanselmann
    return {
129 f1da30e6 Michael Hanselmann
      "id": self.id,
130 f1da30e6 Michael Hanselmann
      "ops": [op.Serialize() for op in self._ops],
131 f1da30e6 Michael Hanselmann
      }
132 f1da30e6 Michael Hanselmann
133 f1da30e6 Michael Hanselmann
  def SetUnclean(self, msg):
134 f1da30e6 Michael Hanselmann
    try:
135 f1da30e6 Michael Hanselmann
      for op in self._ops:
136 f1da30e6 Michael Hanselmann
        op.SetStatus(constants.OP_STATUS_ERROR, msg)
137 f1da30e6 Michael Hanselmann
    finally:
138 f1da30e6 Michael Hanselmann
      self.storage.UpdateJob(self)
139 e2715f69 Michael Hanselmann
140 307149a8 Iustin Pop
  def GetStatus(self):
141 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
142 e2715f69 Michael Hanselmann
143 e2715f69 Michael Hanselmann
    all_success = True
144 e2715f69 Michael Hanselmann
    for op in self._ops:
145 307149a8 Iustin Pop
      op_status = op.GetStatus()
146 307149a8 Iustin Pop
      if op_status == constants.OP_STATUS_SUCCESS:
147 e2715f69 Michael Hanselmann
        continue
148 e2715f69 Michael Hanselmann
149 e2715f69 Michael Hanselmann
      all_success = False
150 e2715f69 Michael Hanselmann
151 307149a8 Iustin Pop
      if op_status == constants.OP_STATUS_QUEUED:
152 e2715f69 Michael Hanselmann
        pass
153 307149a8 Iustin Pop
      elif op_status == constants.OP_STATUS_RUNNING:
154 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
155 f1da30e6 Michael Hanselmann
      elif op_status == constants.OP_STATUS_ERROR:
156 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
157 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
158 f1da30e6 Michael Hanselmann
        break
159 e2715f69 Michael Hanselmann
160 e2715f69 Michael Hanselmann
    if all_success:
161 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
162 e2715f69 Michael Hanselmann
163 e2715f69 Michael Hanselmann
    return status
164 e2715f69 Michael Hanselmann
165 e2715f69 Michael Hanselmann
  def Run(self, proc):
166 e2715f69 Michael Hanselmann
    """Job executor.
167 e2715f69 Michael Hanselmann

168 e2715f69 Michael Hanselmann
    This functions processes a this job in the context of given processor
169 e2715f69 Michael Hanselmann
    instance.
170 e2715f69 Michael Hanselmann

171 e2715f69 Michael Hanselmann
    Args:
172 e2715f69 Michael Hanselmann
    - proc: Ganeti Processor to run the job with
173 e2715f69 Michael Hanselmann

174 e2715f69 Michael Hanselmann
    """
175 e2715f69 Michael Hanselmann
    try:
176 c8549bfd Michael Hanselmann
      count = len(self._ops)
177 c8549bfd Michael Hanselmann
      for idx, op in enumerate(self._ops):
178 e2715f69 Michael Hanselmann
        try:
179 307149a8 Iustin Pop
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
180 307149a8 Iustin Pop
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
181 f1da30e6 Michael Hanselmann
          self.storage.UpdateJob(self)
182 e2715f69 Michael Hanselmann
183 e2715f69 Michael Hanselmann
          result = proc.ExecOpCode(op.input)
184 e2715f69 Michael Hanselmann
185 307149a8 Iustin Pop
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
186 f1da30e6 Michael Hanselmann
          self.storage.UpdateJob(self)
187 307149a8 Iustin Pop
          logging.debug("Op %s/%s: Successfully finished %s",
188 307149a8 Iustin Pop
                        idx + 1, count, op)
189 e2715f69 Michael Hanselmann
        except Exception, err:
190 f1da30e6 Michael Hanselmann
          try:
191 f1da30e6 Michael Hanselmann
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
192 f1da30e6 Michael Hanselmann
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
193 f1da30e6 Michael Hanselmann
          finally:
194 f1da30e6 Michael Hanselmann
            self.storage.UpdateJob(self)
195 e2715f69 Michael Hanselmann
          raise
196 e2715f69 Michael Hanselmann
197 e2715f69 Michael Hanselmann
    except errors.GenericError, err:
198 e2715f69 Michael Hanselmann
      logging.error("ganeti exception %s", exc_info=err)
199 e2715f69 Michael Hanselmann
    except Exception, err:
200 e2715f69 Michael Hanselmann
      logging.error("unhandled exception %s", exc_info=err)
201 e2715f69 Michael Hanselmann
    except:
202 e2715f69 Michael Hanselmann
      logging.error("unhandled unknown exception %s", exc_info=err)
203 e2715f69 Michael Hanselmann
204 e2715f69 Michael Hanselmann
205 e2715f69 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
206 e2715f69 Michael Hanselmann
  def RunTask(self, job):
207 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
208 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
209 e2715f69 Michael Hanselmann
    # TODO: feedback function
210 e2715f69 Michael Hanselmann
    proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
211 e2715f69 Michael Hanselmann
    try:
212 e2715f69 Michael Hanselmann
      job.Run(proc)
213 e2715f69 Michael Hanselmann
    finally:
214 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
215 e2715f69 Michael Hanselmann
                    self.worker_id, job.id, job.GetStatus())
216 e2715f69 Michael Hanselmann
217 e2715f69 Michael Hanselmann
218 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
219 e2715f69 Michael Hanselmann
  def __init__(self, context):
220 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
221 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
222 e2715f69 Michael Hanselmann
    self.context = context
223 e2715f69 Michael Hanselmann
224 e2715f69 Michael Hanselmann
225 f1da30e6 Michael Hanselmann
class JobStorage(object):
226 911a495b Iustin Pop
  _RE_JOB_FILE = re.compile(r"^job-(\d+)$")
227 f1da30e6 Michael Hanselmann
228 f1da30e6 Michael Hanselmann
  def __init__(self):
229 f1da30e6 Michael Hanselmann
    self._lock = threading.Lock()
230 ac0930b9 Iustin Pop
    self._memcache = {}
231 f1da30e6 Michael Hanselmann
232 f1da30e6 Michael Hanselmann
    # Make sure our directory exists
233 f1da30e6 Michael Hanselmann
    try:
234 f1da30e6 Michael Hanselmann
      os.mkdir(constants.QUEUE_DIR, 0700)
235 f1da30e6 Michael Hanselmann
    except OSError, err:
236 f1da30e6 Michael Hanselmann
      if err.errno not in (errno.EEXIST, ):
237 f1da30e6 Michael Hanselmann
        raise
238 f1da30e6 Michael Hanselmann
239 f1da30e6 Michael Hanselmann
    # Get queue lock
240 f1da30e6 Michael Hanselmann
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
241 f1da30e6 Michael Hanselmann
    try:
242 f1da30e6 Michael Hanselmann
      utils.LockFile(self.lock_fd)
243 f1da30e6 Michael Hanselmann
    except:
244 f1da30e6 Michael Hanselmann
      self.lock_fd.close()
245 f1da30e6 Michael Hanselmann
      raise
246 f1da30e6 Michael Hanselmann
247 f1da30e6 Michael Hanselmann
    # Read version
248 f1da30e6 Michael Hanselmann
    try:
249 f1da30e6 Michael Hanselmann
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
250 f1da30e6 Michael Hanselmann
    except IOError, err:
251 f1da30e6 Michael Hanselmann
      if err.errno not in (errno.ENOENT, ):
252 f1da30e6 Michael Hanselmann
        raise
253 f1da30e6 Michael Hanselmann
254 f1da30e6 Michael Hanselmann
      # Setup a new queue
255 f1da30e6 Michael Hanselmann
      self._InitQueueUnlocked()
256 f1da30e6 Michael Hanselmann
257 f1da30e6 Michael Hanselmann
      # Try to open again
258 f1da30e6 Michael Hanselmann
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
259 f1da30e6 Michael Hanselmann
260 f1da30e6 Michael Hanselmann
    try:
261 f1da30e6 Michael Hanselmann
      # Try to read version
262 f1da30e6 Michael Hanselmann
      version = int(version_fd.read(128))
263 f1da30e6 Michael Hanselmann
264 f1da30e6 Michael Hanselmann
      # Verify version
265 f1da30e6 Michael Hanselmann
      if version != constants.JOB_QUEUE_VERSION:
266 f1da30e6 Michael Hanselmann
        raise errors.JobQueueError("Found version %s, expected %s",
267 f1da30e6 Michael Hanselmann
                                   version, constants.JOB_QUEUE_VERSION)
268 f1da30e6 Michael Hanselmann
    finally:
269 f1da30e6 Michael Hanselmann
      version_fd.close()
270 f1da30e6 Michael Hanselmann
271 f1da30e6 Michael Hanselmann
    serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
272 f1da30e6 Michael Hanselmann
    try:
273 f1da30e6 Michael Hanselmann
      # Read last serial
274 f1da30e6 Michael Hanselmann
      self._last_serial = int(serial_fd.read(1024).strip())
275 f1da30e6 Michael Hanselmann
    finally:
276 f1da30e6 Michael Hanselmann
      serial_fd.close()
277 f1da30e6 Michael Hanselmann
278 f1da30e6 Michael Hanselmann
  def Close(self):
279 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
280 f1da30e6 Michael Hanselmann
281 f1da30e6 Michael Hanselmann
    self.lock_fd.close()
282 f1da30e6 Michael Hanselmann
    self.lock_fd = None
283 f1da30e6 Michael Hanselmann
284 f1da30e6 Michael Hanselmann
  def _InitQueueUnlocked(self):
285 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
286 f1da30e6 Michael Hanselmann
287 f1da30e6 Michael Hanselmann
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
288 f1da30e6 Michael Hanselmann
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
289 f1da30e6 Michael Hanselmann
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
290 f1da30e6 Michael Hanselmann
                    data="%s\n" % 0)
291 f1da30e6 Michael Hanselmann
292 f1da30e6 Michael Hanselmann
  def _NewSerialUnlocked(self):
293 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
294 f1da30e6 Michael Hanselmann

295 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
296 f1da30e6 Michael Hanselmann

297 f1da30e6 Michael Hanselmann
    Returns: A string representing the job identifier.
298 f1da30e6 Michael Hanselmann

299 f1da30e6 Michael Hanselmann
    """
300 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
301 f1da30e6 Michael Hanselmann
302 f1da30e6 Michael Hanselmann
    # New number
303 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
304 f1da30e6 Michael Hanselmann
305 f1da30e6 Michael Hanselmann
    # Write to file
306 f1da30e6 Michael Hanselmann
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
307 f1da30e6 Michael Hanselmann
                    data="%s\n" % serial)
308 f1da30e6 Michael Hanselmann
309 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
310 f1da30e6 Michael Hanselmann
    self._last_serial = serial
311 f1da30e6 Michael Hanselmann
312 f1da30e6 Michael Hanselmann
    return serial
313 f1da30e6 Michael Hanselmann
314 f1da30e6 Michael Hanselmann
  def _GetJobPath(self, job_id):
315 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
316 f1da30e6 Michael Hanselmann
317 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
318 911a495b Iustin Pop
    """Return all known job IDs.
319 911a495b Iustin Pop

320 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
321 911a495b Iustin Pop
    included. Currently this argument is unused.
322 911a495b Iustin Pop

323 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
324 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
325 ac0930b9 Iustin Pop
    extra IDs).
326 ac0930b9 Iustin Pop

327 911a495b Iustin Pop
    """
328 911a495b Iustin Pop
    jfiles = self._ListJobFiles()
329 8a70e415 Iustin Pop
    return [int(m.group(1)) for m in
330 911a495b Iustin Pop
            [self._RE_JOB_FILE.match(name) for name in jfiles]]
331 911a495b Iustin Pop
332 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
333 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
334 f1da30e6 Michael Hanselmann
335 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
336 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
337 f1da30e6 Michael Hanselmann
338 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
339 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
340 f1da30e6 Michael Hanselmann
341 ac0930b9 Iustin Pop
    if job_id in self._memcache:
342 ac0930b9 Iustin Pop
      logging.debug("Found job %d in memcache", job_id)
343 ac0930b9 Iustin Pop
      return self._memcache[job_id]
344 ac0930b9 Iustin Pop
345 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
346 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
347 f1da30e6 Michael Hanselmann
    try:
348 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
349 f1da30e6 Michael Hanselmann
    except IOError, err:
350 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
351 f1da30e6 Michael Hanselmann
        return None
352 f1da30e6 Michael Hanselmann
      raise
353 f1da30e6 Michael Hanselmann
    try:
354 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
355 f1da30e6 Michael Hanselmann
    finally:
356 f1da30e6 Michael Hanselmann
      fd.close()
357 f1da30e6 Michael Hanselmann
358 ac0930b9 Iustin Pop
    job = _QueuedJob.Restore(self, data)
359 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
360 ac0930b9 Iustin Pop
    logging.debug("Added job %d to the cache", job_id)
361 ac0930b9 Iustin Pop
    return job
362 f1da30e6 Michael Hanselmann
363 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
364 911a495b Iustin Pop
    if not job_ids:
365 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
366 f1da30e6 Michael Hanselmann
367 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
368 f1da30e6 Michael Hanselmann
369 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
370 f1da30e6 Michael Hanselmann
  def GetJobs(self, job_ids):
371 f1da30e6 Michael Hanselmann
    return self._GetJobsUnlocked(job_ids)
372 f1da30e6 Michael Hanselmann
373 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
374 f1da30e6 Michael Hanselmann
  def AddJob(self, ops):
375 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
376 f1da30e6 Michael Hanselmann
377 f1da30e6 Michael Hanselmann
    # Get job identifier
378 f1da30e6 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
379 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
380 f1da30e6 Michael Hanselmann
381 f1da30e6 Michael Hanselmann
    # Write to disk
382 f1da30e6 Michael Hanselmann
    self._UpdateJobUnlocked(job)
383 f1da30e6 Michael Hanselmann
384 ac0930b9 Iustin Pop
    logging.debug("Added new job %d to the cache", job_id)
385 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
386 ac0930b9 Iustin Pop
387 f1da30e6 Michael Hanselmann
    return job
388 f1da30e6 Michael Hanselmann
389 f1da30e6 Michael Hanselmann
  def _UpdateJobUnlocked(self, job):
390 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
391 f1da30e6 Michael Hanselmann
392 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
393 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
394 f1da30e6 Michael Hanselmann
    utils.WriteFile(filename,
395 f1da30e6 Michael Hanselmann
                    data=serializer.DumpJson(job.Serialize(), indent=False))
396 ac0930b9 Iustin Pop
    self._CleanCacheUnlocked(exceptions=[job.id])
397 ac0930b9 Iustin Pop
398 ac0930b9 Iustin Pop
  def _CleanCacheUnlocked(self, exceptions=None):
399 ac0930b9 Iustin Pop
    """Clean the memory cache.
400 ac0930b9 Iustin Pop

401 ac0930b9 Iustin Pop
    The exceptions argument contains job IDs that should not be
402 ac0930b9 Iustin Pop
    cleaned.
403 ac0930b9 Iustin Pop

404 ac0930b9 Iustin Pop
    """
405 ac0930b9 Iustin Pop
    assert isinstance(exceptions, list)
406 ac0930b9 Iustin Pop
    for job in self._memcache.values():
407 ac0930b9 Iustin Pop
      if job.id in exceptions:
408 ac0930b9 Iustin Pop
        continue
409 ac0930b9 Iustin Pop
      if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
410 ac0930b9 Iustin Pop
                                 constants.JOB_STATUS_RUNNING):
411 ac0930b9 Iustin Pop
        logging.debug("Cleaning job %d from the cache", job.id)
412 ac0930b9 Iustin Pop
        try:
413 ac0930b9 Iustin Pop
          del self._memcache[job.id]
414 ac0930b9 Iustin Pop
        except KeyError:
415 ac0930b9 Iustin Pop
          pass
416 f1da30e6 Michael Hanselmann
417 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
418 f1da30e6 Michael Hanselmann
  def UpdateJob(self, job):
419 f1da30e6 Michael Hanselmann
    return self._UpdateJobUnlocked(job)
420 f1da30e6 Michael Hanselmann
421 f1da30e6 Michael Hanselmann
  def ArchiveJob(self, job_id):
422 f1da30e6 Michael Hanselmann
    raise NotImplementedError()
423 f1da30e6 Michael Hanselmann
424 f1da30e6 Michael Hanselmann
425 e2715f69 Michael Hanselmann
class JobQueue:
426 e2715f69 Michael Hanselmann
  """The job queue.
427 e2715f69 Michael Hanselmann

428 e2715f69 Michael Hanselmann
   """
429 e2715f69 Michael Hanselmann
  def __init__(self, context):
430 e2715f69 Michael Hanselmann
    self._lock = threading.Lock()
431 f1da30e6 Michael Hanselmann
    self._jobs = JobStorage()
432 e2715f69 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(context)
433 e2715f69 Michael Hanselmann
434 f1da30e6 Michael Hanselmann
    for job in self._jobs.GetJobs(None):
435 f1da30e6 Michael Hanselmann
      status = job.GetStatus()
436 f1da30e6 Michael Hanselmann
      if status in (constants.JOB_STATUS_QUEUED, ):
437 f1da30e6 Michael Hanselmann
        self._wpool.AddTask(job)
438 e2715f69 Michael Hanselmann
439 f1da30e6 Michael Hanselmann
      elif status in (constants.JOB_STATUS_RUNNING, ):
440 f1da30e6 Michael Hanselmann
        logging.warning("Unfinished job %s found: %s", job.id, job)
441 f1da30e6 Michael Hanselmann
        job.SetUnclean("Unclean master daemon shutdown")
442 e2715f69 Michael Hanselmann
443 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
444 e2715f69 Michael Hanselmann
  def SubmitJob(self, ops):
445 e2715f69 Michael Hanselmann
    """Add a new job to the queue.
446 e2715f69 Michael Hanselmann

447 e2715f69 Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
448 e2715f69 Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
449 e2715f69 Michael Hanselmann

450 e2715f69 Michael Hanselmann
    Args:
451 e2715f69 Michael Hanselmann
    - ops: Sequence of opcodes
452 e2715f69 Michael Hanselmann

453 e2715f69 Michael Hanselmann
    """
454 f1da30e6 Michael Hanselmann
    job = self._jobs.AddJob(ops)
455 e2715f69 Michael Hanselmann
456 e2715f69 Michael Hanselmann
    # Add to worker pool
457 e2715f69 Michael Hanselmann
    self._wpool.AddTask(job)
458 e2715f69 Michael Hanselmann
459 f1da30e6 Michael Hanselmann
    return job.id
460 e2715f69 Michael Hanselmann
461 e2715f69 Michael Hanselmann
  def ArchiveJob(self, job_id):
462 e2715f69 Michael Hanselmann
    raise NotImplementedError()
463 e2715f69 Michael Hanselmann
464 e2715f69 Michael Hanselmann
  def CancelJob(self, job_id):
465 e2715f69 Michael Hanselmann
    raise NotImplementedError()
466 e2715f69 Michael Hanselmann
467 e2715f69 Michael Hanselmann
  def _GetJobInfo(self, job, fields):
468 e2715f69 Michael Hanselmann
    row = []
469 e2715f69 Michael Hanselmann
    for fname in fields:
470 e2715f69 Michael Hanselmann
      if fname == "id":
471 e2715f69 Michael Hanselmann
        row.append(job.id)
472 e2715f69 Michael Hanselmann
      elif fname == "status":
473 e2715f69 Michael Hanselmann
        row.append(job.GetStatus())
474 af30b2fd Michael Hanselmann
      elif fname == "ops":
475 af30b2fd Michael Hanselmann
        row.append([op.GetInput().__getstate__() for op in job._ops])
476 af30b2fd Michael Hanselmann
      elif fname == "opresult":
477 307149a8 Iustin Pop
        row.append([op.GetResult() for op in job._ops])
478 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
479 af30b2fd Michael Hanselmann
        row.append([op.GetStatus() for op in job._ops])
480 e2715f69 Michael Hanselmann
      else:
481 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
482 e2715f69 Michael Hanselmann
    return row
483 e2715f69 Michael Hanselmann
484 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
485 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
486 e2715f69 Michael Hanselmann

487 e2715f69 Michael Hanselmann
    Args:
488 e2715f69 Michael Hanselmann
    - job_ids: Sequence of job identifiers or None for all
489 e2715f69 Michael Hanselmann
    - fields: Names of fields to return
490 e2715f69 Michael Hanselmann

491 e2715f69 Michael Hanselmann
    """
492 e2715f69 Michael Hanselmann
    self._lock.acquire()
493 e2715f69 Michael Hanselmann
    try:
494 e2715f69 Michael Hanselmann
      jobs = []
495 e2715f69 Michael Hanselmann
496 f1da30e6 Michael Hanselmann
      for job in self._jobs.GetJobs(job_ids):
497 e2715f69 Michael Hanselmann
        if job is None:
498 e2715f69 Michael Hanselmann
          jobs.append(None)
499 e2715f69 Michael Hanselmann
        else:
500 e2715f69 Michael Hanselmann
          jobs.append(self._GetJobInfo(job, fields))
501 e2715f69 Michael Hanselmann
502 e2715f69 Michael Hanselmann
      return jobs
503 e2715f69 Michael Hanselmann
    finally:
504 e2715f69 Michael Hanselmann
      self._lock.release()
505 e2715f69 Michael Hanselmann
506 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
507 e2715f69 Michael Hanselmann
  def Shutdown(self):
508 e2715f69 Michael Hanselmann
    """Stops the job queue.
509 e2715f69 Michael Hanselmann

510 e2715f69 Michael Hanselmann
    """
511 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
512 f1da30e6 Michael Hanselmann
    self._jobs.Close()