Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ fae737ac

History | View | Annotate | Download (17.8 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 498ae1cc Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 498ae1cc Iustin Pop
"""Module implementing the job queue handling."""
23 498ae1cc Iustin Pop
24 f1da30e6 Michael Hanselmann
import os
25 e2715f69 Michael Hanselmann
import logging
26 e2715f69 Michael Hanselmann
import threading
27 f1da30e6 Michael Hanselmann
import errno
28 f1da30e6 Michael Hanselmann
import re
29 f1048938 Iustin Pop
import time
30 498ae1cc Iustin Pop
31 e2715f69 Michael Hanselmann
from ganeti import constants
32 f1da30e6 Michael Hanselmann
from ganeti import serializer
33 e2715f69 Michael Hanselmann
from ganeti import workerpool
34 f1da30e6 Michael Hanselmann
from ganeti import opcodes
35 7a1ecaed Iustin Pop
from ganeti import errors
36 e2715f69 Michael Hanselmann
from ganeti import mcpu
37 7996a135 Iustin Pop
from ganeti import utils
38 c3f0a12f Iustin Pop
from ganeti import rpc
39 e2715f69 Michael Hanselmann
40 e2715f69 Michael Hanselmann
41 e2715f69 Michael Hanselmann
JOBQUEUE_THREADS = 5
42 e2715f69 Michael Hanselmann
43 498ae1cc Iustin Pop
44 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
45 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
46 e2715f69 Michael Hanselmann

47 307149a8 Iustin Pop
  Access is synchronized by the '_lock' attribute.
48 e2715f69 Michael Hanselmann

49 f1048938 Iustin Pop
  The 'log' attribute holds the execution log and consists of tuples
50 f1048938 Iustin Pop
  of the form (timestamp, level, message).
51 f1048938 Iustin Pop

52 e2715f69 Michael Hanselmann
  """
53 e2715f69 Michael Hanselmann
  def __init__(self, op):
54 f1048938 Iustin Pop
    self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
55 f1da30e6 Michael Hanselmann
56 f1048938 Iustin Pop
  def __Setup(self, input_, status, result, log):
57 307149a8 Iustin Pop
    self._lock = threading.Lock()
58 f1048938 Iustin Pop
    self.input = input_
59 f1da30e6 Michael Hanselmann
    self.status = status
60 f1da30e6 Michael Hanselmann
    self.result = result
61 f1048938 Iustin Pop
    self.log = log
62 f1da30e6 Michael Hanselmann
63 f1da30e6 Michael Hanselmann
  @classmethod
64 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
65 f1da30e6 Michael Hanselmann
    obj = object.__new__(cls)
66 f1da30e6 Michael Hanselmann
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
67 f1048938 Iustin Pop
                state["status"], state["result"], state["log"])
68 f1da30e6 Michael Hanselmann
    return obj
69 f1da30e6 Michael Hanselmann
70 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
71 f1da30e6 Michael Hanselmann
  def Serialize(self):
72 f1da30e6 Michael Hanselmann
    return {
73 f1da30e6 Michael Hanselmann
      "input": self.input.__getstate__(),
74 f1da30e6 Michael Hanselmann
      "status": self.status,
75 f1da30e6 Michael Hanselmann
      "result": self.result,
76 f1048938 Iustin Pop
      "log": self.log,
77 f1da30e6 Michael Hanselmann
      }
78 307149a8 Iustin Pop
79 307149a8 Iustin Pop
  @utils.LockedMethod
80 af30b2fd Michael Hanselmann
  def GetInput(self):
81 af30b2fd Michael Hanselmann
    """Returns the original opcode.
82 af30b2fd Michael Hanselmann

83 af30b2fd Michael Hanselmann
    """
84 af30b2fd Michael Hanselmann
    return self.input
85 af30b2fd Michael Hanselmann
86 af30b2fd Michael Hanselmann
  @utils.LockedMethod
87 307149a8 Iustin Pop
  def SetStatus(self, status, result):
88 307149a8 Iustin Pop
    """Update the opcode status and result.
89 307149a8 Iustin Pop

90 307149a8 Iustin Pop
    """
91 307149a8 Iustin Pop
    self.status = status
92 307149a8 Iustin Pop
    self.result = result
93 307149a8 Iustin Pop
94 307149a8 Iustin Pop
  @utils.LockedMethod
95 307149a8 Iustin Pop
  def GetStatus(self):
96 307149a8 Iustin Pop
    """Get the opcode status.
97 307149a8 Iustin Pop

98 307149a8 Iustin Pop
    """
99 307149a8 Iustin Pop
    return self.status
100 307149a8 Iustin Pop
101 307149a8 Iustin Pop
  @utils.LockedMethod
102 307149a8 Iustin Pop
  def GetResult(self):
103 307149a8 Iustin Pop
    """Get the opcode result.
104 307149a8 Iustin Pop

105 307149a8 Iustin Pop
    """
106 307149a8 Iustin Pop
    return self.result
107 e2715f69 Michael Hanselmann
108 f1048938 Iustin Pop
  @utils.LockedMethod
109 f1048938 Iustin Pop
  def Log(self, *args):
110 f1048938 Iustin Pop
    """Append a log entry.
111 f1048938 Iustin Pop

112 f1048938 Iustin Pop
    """
113 f1048938 Iustin Pop
    assert len(args) < 2
114 f1048938 Iustin Pop
115 f1048938 Iustin Pop
    if len(args) == 1:
116 f1048938 Iustin Pop
      log_type = constants.ELOG_MESSAGE
117 f1048938 Iustin Pop
      log_msg = args[0]
118 f1048938 Iustin Pop
    else:
119 f1048938 Iustin Pop
      log_type, log_msg = args
120 f1048938 Iustin Pop
    self.log.append((time.time(), log_type, log_msg))
121 f1048938 Iustin Pop
122 f1048938 Iustin Pop
  @utils.LockedMethod
123 f1048938 Iustin Pop
  def RetrieveLog(self, start_at=0):
124 f1048938 Iustin Pop
    """Retrieve (a part of) the execution log.
125 f1048938 Iustin Pop

126 f1048938 Iustin Pop
    """
127 f1048938 Iustin Pop
    return self.log[start_at:]
128 f1048938 Iustin Pop
129 e2715f69 Michael Hanselmann
130 e2715f69 Michael Hanselmann
class _QueuedJob(object):
131 e2715f69 Michael Hanselmann
  """In-memory job representation.
132 e2715f69 Michael Hanselmann

133 e2715f69 Michael Hanselmann
  This is what we use to track the user-submitted jobs.
134 e2715f69 Michael Hanselmann

135 e2715f69 Michael Hanselmann
  """
136 f1da30e6 Michael Hanselmann
  def __init__(self, storage, job_id, ops):
137 e2715f69 Michael Hanselmann
    if not ops:
138 e2715f69 Michael Hanselmann
      # TODO
139 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
140 e2715f69 Michael Hanselmann
141 f1048938 Iustin Pop
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
142 e2715f69 Michael Hanselmann
143 f1048938 Iustin Pop
  def __Setup(self, storage, job_id, ops, run_op_index):
144 f1048938 Iustin Pop
    self._lock = threading.Lock()
145 f1da30e6 Michael Hanselmann
    self.storage = storage
146 f1da30e6 Michael Hanselmann
    self.id = job_id
147 f1da30e6 Michael Hanselmann
    self._ops = ops
148 f1048938 Iustin Pop
    self.run_op_index = run_op_index
149 f1da30e6 Michael Hanselmann
150 f1da30e6 Michael Hanselmann
  @classmethod
151 f1da30e6 Michael Hanselmann
  def Restore(cls, storage, state):
152 f1da30e6 Michael Hanselmann
    obj = object.__new__(cls)
153 f1048938 Iustin Pop
    op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
154 f1048938 Iustin Pop
    obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
155 f1da30e6 Michael Hanselmann
    return obj
156 f1da30e6 Michael Hanselmann
157 f1da30e6 Michael Hanselmann
  def Serialize(self):
158 f1da30e6 Michael Hanselmann
    return {
159 f1da30e6 Michael Hanselmann
      "id": self.id,
160 f1da30e6 Michael Hanselmann
      "ops": [op.Serialize() for op in self._ops],
161 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
162 f1da30e6 Michael Hanselmann
      }
163 f1da30e6 Michael Hanselmann
164 f1da30e6 Michael Hanselmann
  def SetUnclean(self, msg):
165 f1da30e6 Michael Hanselmann
    try:
166 f1da30e6 Michael Hanselmann
      for op in self._ops:
167 f1da30e6 Michael Hanselmann
        op.SetStatus(constants.OP_STATUS_ERROR, msg)
168 f1da30e6 Michael Hanselmann
    finally:
169 f1da30e6 Michael Hanselmann
      self.storage.UpdateJob(self)
170 e2715f69 Michael Hanselmann
171 307149a8 Iustin Pop
  def GetStatus(self):
172 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
173 e2715f69 Michael Hanselmann
174 e2715f69 Michael Hanselmann
    all_success = True
175 e2715f69 Michael Hanselmann
    for op in self._ops:
176 307149a8 Iustin Pop
      op_status = op.GetStatus()
177 307149a8 Iustin Pop
      if op_status == constants.OP_STATUS_SUCCESS:
178 e2715f69 Michael Hanselmann
        continue
179 e2715f69 Michael Hanselmann
180 e2715f69 Michael Hanselmann
      all_success = False
181 e2715f69 Michael Hanselmann
182 307149a8 Iustin Pop
      if op_status == constants.OP_STATUS_QUEUED:
183 e2715f69 Michael Hanselmann
        pass
184 307149a8 Iustin Pop
      elif op_status == constants.OP_STATUS_RUNNING:
185 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
186 f1da30e6 Michael Hanselmann
      elif op_status == constants.OP_STATUS_ERROR:
187 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
188 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
189 f1da30e6 Michael Hanselmann
        break
190 e2715f69 Michael Hanselmann
191 e2715f69 Michael Hanselmann
    if all_success:
192 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
193 e2715f69 Michael Hanselmann
194 e2715f69 Michael Hanselmann
    return status
195 e2715f69 Michael Hanselmann
196 f1048938 Iustin Pop
  @utils.LockedMethod
197 f1048938 Iustin Pop
  def GetRunOpIndex(self):
198 f1048938 Iustin Pop
    return self.run_op_index
199 f1048938 Iustin Pop
200 e2715f69 Michael Hanselmann
  def Run(self, proc):
201 e2715f69 Michael Hanselmann
    """Job executor.
202 e2715f69 Michael Hanselmann

203 e2715f69 Michael Hanselmann
    This functions processes a this job in the context of given processor
204 e2715f69 Michael Hanselmann
    instance.
205 e2715f69 Michael Hanselmann

206 e2715f69 Michael Hanselmann
    Args:
207 e2715f69 Michael Hanselmann
    - proc: Ganeti Processor to run the job with
208 e2715f69 Michael Hanselmann

209 e2715f69 Michael Hanselmann
    """
210 e2715f69 Michael Hanselmann
    try:
211 c8549bfd Michael Hanselmann
      count = len(self._ops)
212 c8549bfd Michael Hanselmann
      for idx, op in enumerate(self._ops):
213 e2715f69 Michael Hanselmann
        try:
214 307149a8 Iustin Pop
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
215 f1048938 Iustin Pop
216 f1048938 Iustin Pop
          self._lock.acquire()
217 f1048938 Iustin Pop
          try:
218 f1048938 Iustin Pop
            self.run_op_index = idx
219 f1048938 Iustin Pop
          finally:
220 f1048938 Iustin Pop
            self._lock.release()
221 f1048938 Iustin Pop
222 307149a8 Iustin Pop
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
223 f1da30e6 Michael Hanselmann
          self.storage.UpdateJob(self)
224 e2715f69 Michael Hanselmann
225 f1048938 Iustin Pop
          result = proc.ExecOpCode(op.input, op.Log)
226 e2715f69 Michael Hanselmann
227 307149a8 Iustin Pop
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
228 f1da30e6 Michael Hanselmann
          self.storage.UpdateJob(self)
229 307149a8 Iustin Pop
          logging.debug("Op %s/%s: Successfully finished %s",
230 307149a8 Iustin Pop
                        idx + 1, count, op)
231 e2715f69 Michael Hanselmann
        except Exception, err:
232 f1da30e6 Michael Hanselmann
          try:
233 f1da30e6 Michael Hanselmann
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
234 f1da30e6 Michael Hanselmann
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
235 f1da30e6 Michael Hanselmann
          finally:
236 f1da30e6 Michael Hanselmann
            self.storage.UpdateJob(self)
237 e2715f69 Michael Hanselmann
          raise
238 e2715f69 Michael Hanselmann
239 e2715f69 Michael Hanselmann
    except errors.GenericError, err:
240 e2715f69 Michael Hanselmann
      logging.error("ganeti exception %s", exc_info=err)
241 e2715f69 Michael Hanselmann
    except Exception, err:
242 e2715f69 Michael Hanselmann
      logging.error("unhandled exception %s", exc_info=err)
243 e2715f69 Michael Hanselmann
    except:
244 e2715f69 Michael Hanselmann
      logging.error("unhandled unknown exception %s", exc_info=err)
245 e2715f69 Michael Hanselmann
246 e2715f69 Michael Hanselmann
247 e2715f69 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
248 e2715f69 Michael Hanselmann
  def RunTask(self, job):
249 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
250 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
251 e2715f69 Michael Hanselmann
    # TODO: feedback function
252 f1048938 Iustin Pop
    proc = mcpu.Processor(self.pool.context)
253 e2715f69 Michael Hanselmann
    try:
254 e2715f69 Michael Hanselmann
      job.Run(proc)
255 e2715f69 Michael Hanselmann
    finally:
256 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
257 e2715f69 Michael Hanselmann
                    self.worker_id, job.id, job.GetStatus())
258 e2715f69 Michael Hanselmann
259 e2715f69 Michael Hanselmann
260 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
261 e2715f69 Michael Hanselmann
  def __init__(self, context):
262 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
263 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
264 e2715f69 Michael Hanselmann
    self.context = context
265 e2715f69 Michael Hanselmann
266 e2715f69 Michael Hanselmann
267 ce594241 Michael Hanselmann
class JobStorageBase(object):
268 ce594241 Michael Hanselmann
  def __init__(self, id_prefix):
269 ce594241 Michael Hanselmann
    self.id_prefix = id_prefix
270 ce594241 Michael Hanselmann
271 ce594241 Michael Hanselmann
    if id_prefix:
272 ce594241 Michael Hanselmann
      prefix_pattern = re.escape("%s-" % id_prefix)
273 ce594241 Michael Hanselmann
    else:
274 ce594241 Michael Hanselmann
      prefix_pattern = ""
275 ce594241 Michael Hanselmann
276 ce594241 Michael Hanselmann
    # Apart from the prefix, all job IDs are numeric
277 ce594241 Michael Hanselmann
    self._re_job_id = re.compile(r"^%s\d+$" % prefix_pattern)
278 ce594241 Michael Hanselmann
279 ce594241 Michael Hanselmann
  def OwnsJobId(self, job_id):
280 ce594241 Michael Hanselmann
    return self._re_job_id.match(job_id)
281 ce594241 Michael Hanselmann
282 ce594241 Michael Hanselmann
  def FormatJobID(self, job_id):
283 ce594241 Michael Hanselmann
    if not isinstance(job_id, (int, long)):
284 ce594241 Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
285 ce594241 Michael Hanselmann
    if job_id < 0:
286 ce594241 Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
287 ce594241 Michael Hanselmann
288 ce594241 Michael Hanselmann
    if self.id_prefix:
289 ce594241 Michael Hanselmann
      prefix = "%s-" % self.id_prefix
290 ce594241 Michael Hanselmann
    else:
291 ce594241 Michael Hanselmann
      prefix = ""
292 ce594241 Michael Hanselmann
293 ce594241 Michael Hanselmann
    return "%s%010d" % (prefix, job_id)
294 ce594241 Michael Hanselmann
295 c609f802 Michael Hanselmann
  def _ShouldJobBeArchivedUnlocked(self, job):
296 c609f802 Michael Hanselmann
    if job.GetStatus() not in (constants.JOB_STATUS_CANCELED,
297 c609f802 Michael Hanselmann
                               constants.JOB_STATUS_SUCCESS,
298 c609f802 Michael Hanselmann
                               constants.JOB_STATUS_ERROR):
299 c609f802 Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
300 c609f802 Michael Hanselmann
      return False
301 c609f802 Michael Hanselmann
    return True
302 c609f802 Michael Hanselmann
303 ce594241 Michael Hanselmann
304 ce594241 Michael Hanselmann
class DiskJobStorage(JobStorageBase):
305 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
306 f1da30e6 Michael Hanselmann
307 ce594241 Michael Hanselmann
  def __init__(self, id_prefix):
308 ce594241 Michael Hanselmann
    JobStorageBase.__init__(self, id_prefix)
309 ce594241 Michael Hanselmann
310 f1da30e6 Michael Hanselmann
    self._lock = threading.Lock()
311 ac0930b9 Iustin Pop
    self._memcache = {}
312 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
313 f1da30e6 Michael Hanselmann
314 0cb94105 Michael Hanselmann
    # Make sure our directories exists
315 0cb94105 Michael Hanselmann
    for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
316 0cb94105 Michael Hanselmann
      try:
317 0cb94105 Michael Hanselmann
        os.mkdir(path, 0700)
318 0cb94105 Michael Hanselmann
      except OSError, err:
319 0cb94105 Michael Hanselmann
        if err.errno not in (errno.EEXIST, ):
320 0cb94105 Michael Hanselmann
          raise
321 f1da30e6 Michael Hanselmann
322 f1da30e6 Michael Hanselmann
    # Get queue lock
323 f1da30e6 Michael Hanselmann
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
324 f1da30e6 Michael Hanselmann
    try:
325 f1da30e6 Michael Hanselmann
      utils.LockFile(self.lock_fd)
326 f1da30e6 Michael Hanselmann
    except:
327 f1da30e6 Michael Hanselmann
      self.lock_fd.close()
328 f1da30e6 Michael Hanselmann
      raise
329 f1da30e6 Michael Hanselmann
330 f1da30e6 Michael Hanselmann
    # Read version
331 f1da30e6 Michael Hanselmann
    try:
332 f1da30e6 Michael Hanselmann
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
333 f1da30e6 Michael Hanselmann
    except IOError, err:
334 f1da30e6 Michael Hanselmann
      if err.errno not in (errno.ENOENT, ):
335 f1da30e6 Michael Hanselmann
        raise
336 f1da30e6 Michael Hanselmann
337 f1da30e6 Michael Hanselmann
      # Setup a new queue
338 f1da30e6 Michael Hanselmann
      self._InitQueueUnlocked()
339 f1da30e6 Michael Hanselmann
340 f1da30e6 Michael Hanselmann
      # Try to open again
341 f1da30e6 Michael Hanselmann
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
342 f1da30e6 Michael Hanselmann
343 f1da30e6 Michael Hanselmann
    try:
344 f1da30e6 Michael Hanselmann
      # Try to read version
345 f1da30e6 Michael Hanselmann
      version = int(version_fd.read(128))
346 f1da30e6 Michael Hanselmann
347 f1da30e6 Michael Hanselmann
      # Verify version
348 f1da30e6 Michael Hanselmann
      if version != constants.JOB_QUEUE_VERSION:
349 f1da30e6 Michael Hanselmann
        raise errors.JobQueueError("Found version %s, expected %s",
350 f1da30e6 Michael Hanselmann
                                   version, constants.JOB_QUEUE_VERSION)
351 f1da30e6 Michael Hanselmann
    finally:
352 f1da30e6 Michael Hanselmann
      version_fd.close()
353 f1da30e6 Michael Hanselmann
354 c4beba1c Iustin Pop
    self._last_serial = self._ReadSerial()
355 c4beba1c Iustin Pop
    if self._last_serial is None:
356 c4beba1c Iustin Pop
      raise errors.ConfigurationError("Can't read/parse the job queue serial"
357 c4beba1c Iustin Pop
                                      " file")
358 c4beba1c Iustin Pop
359 c4beba1c Iustin Pop
  @staticmethod
360 c4beba1c Iustin Pop
  def _ReadSerial():
361 c4beba1c Iustin Pop
    """Try to read the job serial file.
362 c4beba1c Iustin Pop

363 c4beba1c Iustin Pop
    @rtype: None or int
364 c4beba1c Iustin Pop
    @return: If the serial can be read, then it is returned. Otherwise None
365 c4beba1c Iustin Pop
             is returned.
366 c4beba1c Iustin Pop

367 c4beba1c Iustin Pop
    """
368 f1da30e6 Michael Hanselmann
    try:
369 c4beba1c Iustin Pop
      serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
370 c4beba1c Iustin Pop
      try:
371 c4beba1c Iustin Pop
        # Read last serial
372 c4beba1c Iustin Pop
        serial = int(serial_fd.read(1024).strip())
373 c4beba1c Iustin Pop
      finally:
374 c4beba1c Iustin Pop
        serial_fd.close()
375 c4beba1c Iustin Pop
    except (ValueError, EnvironmentError):
376 c4beba1c Iustin Pop
      serial = None
377 c4beba1c Iustin Pop
378 c4beba1c Iustin Pop
    return serial
379 f1da30e6 Michael Hanselmann
380 f1da30e6 Michael Hanselmann
  def Close(self):
381 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
382 f1da30e6 Michael Hanselmann
383 f1da30e6 Michael Hanselmann
    self.lock_fd.close()
384 f1da30e6 Michael Hanselmann
    self.lock_fd = None
385 f1da30e6 Michael Hanselmann
386 f1da30e6 Michael Hanselmann
  def _InitQueueUnlocked(self):
387 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
388 f1da30e6 Michael Hanselmann
389 f1da30e6 Michael Hanselmann
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
390 f1da30e6 Michael Hanselmann
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
391 c4beba1c Iustin Pop
    if self._ReadSerial() is None:
392 c4beba1c Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
393 c4beba1c Iustin Pop
                      data="%s\n" % 0)
394 f1da30e6 Michael Hanselmann
395 c3f0a12f Iustin Pop
  def _NewSerialUnlocked(self, nodes):
396 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
397 f1da30e6 Michael Hanselmann

398 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
399 f1da30e6 Michael Hanselmann

400 f1da30e6 Michael Hanselmann
    Returns: A string representing the job identifier.
401 f1da30e6 Michael Hanselmann

402 f1da30e6 Michael Hanselmann
    """
403 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
404 f1da30e6 Michael Hanselmann
405 f1da30e6 Michael Hanselmann
    # New number
406 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
407 f1da30e6 Michael Hanselmann
408 f1da30e6 Michael Hanselmann
    # Write to file
409 f1da30e6 Michael Hanselmann
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
410 f1da30e6 Michael Hanselmann
                    data="%s\n" % serial)
411 f1da30e6 Michael Hanselmann
412 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
413 f1da30e6 Michael Hanselmann
    self._last_serial = serial
414 f1da30e6 Michael Hanselmann
415 c3f0a12f Iustin Pop
    # Distribute the serial to the other nodes
416 c3f0a12f Iustin Pop
    try:
417 c3f0a12f Iustin Pop
      nodes.remove(self._my_hostname)
418 c3f0a12f Iustin Pop
    except ValueError:
419 c3f0a12f Iustin Pop
      pass
420 c3f0a12f Iustin Pop
421 c3f0a12f Iustin Pop
    result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
422 c3f0a12f Iustin Pop
    for node in nodes:
423 c3f0a12f Iustin Pop
      if not result[node]:
424 c3f0a12f Iustin Pop
        logging.error("copy of job queue file to node %s failed", node)
425 c3f0a12f Iustin Pop
426 ce594241 Michael Hanselmann
    return self.FormatJobID(serial)
427 f1da30e6 Michael Hanselmann
428 f1da30e6 Michael Hanselmann
  def _GetJobPath(self, job_id):
429 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
430 f1da30e6 Michael Hanselmann
431 0cb94105 Michael Hanselmann
  def _GetArchivedJobPath(self, job_id):
432 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
433 0cb94105 Michael Hanselmann
434 fae737ac Michael Hanselmann
  def _ExtractJobID(self, name):
435 fae737ac Michael Hanselmann
    m = self._RE_JOB_FILE.match(name)
436 fae737ac Michael Hanselmann
    if m:
437 fae737ac Michael Hanselmann
      return m.group(1)
438 fae737ac Michael Hanselmann
    else:
439 fae737ac Michael Hanselmann
      return None
440 fae737ac Michael Hanselmann
441 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
442 911a495b Iustin Pop
    """Return all known job IDs.
443 911a495b Iustin Pop

444 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
445 911a495b Iustin Pop
    included. Currently this argument is unused.
446 911a495b Iustin Pop

447 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
448 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
449 ac0930b9 Iustin Pop
    extra IDs).
450 ac0930b9 Iustin Pop

451 911a495b Iustin Pop
    """
452 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
453 f0d874fe Iustin Pop
    jlist.sort()
454 f0d874fe Iustin Pop
    return jlist
455 911a495b Iustin Pop
456 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
457 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
458 f1da30e6 Michael Hanselmann
459 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
460 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
461 f1da30e6 Michael Hanselmann
462 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
463 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
464 f1da30e6 Michael Hanselmann
465 ac0930b9 Iustin Pop
    if job_id in self._memcache:
466 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
467 ac0930b9 Iustin Pop
      return self._memcache[job_id]
468 ac0930b9 Iustin Pop
469 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
470 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
471 f1da30e6 Michael Hanselmann
    try:
472 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
473 f1da30e6 Michael Hanselmann
    except IOError, err:
474 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
475 f1da30e6 Michael Hanselmann
        return None
476 f1da30e6 Michael Hanselmann
      raise
477 f1da30e6 Michael Hanselmann
    try:
478 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
479 f1da30e6 Michael Hanselmann
    finally:
480 f1da30e6 Michael Hanselmann
      fd.close()
481 f1da30e6 Michael Hanselmann
482 ac0930b9 Iustin Pop
    job = _QueuedJob.Restore(self, data)
483 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
484 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
485 ac0930b9 Iustin Pop
    return job
486 f1da30e6 Michael Hanselmann
487 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
488 911a495b Iustin Pop
    if not job_ids:
489 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
490 f1da30e6 Michael Hanselmann
491 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
492 f1da30e6 Michael Hanselmann
493 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
494 f1da30e6 Michael Hanselmann
  def GetJobs(self, job_ids):
495 f1da30e6 Michael Hanselmann
    return self._GetJobsUnlocked(job_ids)
496 f1da30e6 Michael Hanselmann
497 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
498 c3f0a12f Iustin Pop
  def AddJob(self, ops, nodes):
499 c3f0a12f Iustin Pop
    """Create and store on disk a new job.
500 c3f0a12f Iustin Pop

501 c3f0a12f Iustin Pop
    @type ops: list
502 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
503 c3f0a12f Iustin Pop
    @type nodes: list
504 c3f0a12f Iustin Pop
    @param nodes: The list of nodes to which the new job serial will be
505 c3f0a12f Iustin Pop
                  distributed.
506 c3f0a12f Iustin Pop

507 c3f0a12f Iustin Pop
    """
508 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
509 f1da30e6 Michael Hanselmann
510 f1da30e6 Michael Hanselmann
    # Get job identifier
511 c3f0a12f Iustin Pop
    job_id = self._NewSerialUnlocked(nodes)
512 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
513 f1da30e6 Michael Hanselmann
514 f1da30e6 Michael Hanselmann
    # Write to disk
515 f1da30e6 Michael Hanselmann
    self._UpdateJobUnlocked(job)
516 f1da30e6 Michael Hanselmann
517 205d71fd Michael Hanselmann
    logging.debug("Added new job %s to the cache", job_id)
518 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
519 ac0930b9 Iustin Pop
520 f1da30e6 Michael Hanselmann
    return job
521 f1da30e6 Michael Hanselmann
522 f1da30e6 Michael Hanselmann
  def _UpdateJobUnlocked(self, job):
523 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
524 f1da30e6 Michael Hanselmann
525 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
526 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
527 f1da30e6 Michael Hanselmann
    utils.WriteFile(filename,
528 f1da30e6 Michael Hanselmann
                    data=serializer.DumpJson(job.Serialize(), indent=False))
529 57f8615f Michael Hanselmann
    self._CleanCacheUnlocked([job.id])
530 ac0930b9 Iustin Pop
531 57f8615f Michael Hanselmann
  def _CleanCacheUnlocked(self, exclude):
532 ac0930b9 Iustin Pop
    """Clean the memory cache.
533 ac0930b9 Iustin Pop

534 ac0930b9 Iustin Pop
    The exceptions argument contains job IDs that should not be
535 ac0930b9 Iustin Pop
    cleaned.
536 ac0930b9 Iustin Pop

537 ac0930b9 Iustin Pop
    """
538 57f8615f Michael Hanselmann
    assert isinstance(exclude, list)
539 ac0930b9 Iustin Pop
    for job in self._memcache.values():
540 57f8615f Michael Hanselmann
      if job.id in exclude:
541 ac0930b9 Iustin Pop
        continue
542 ac0930b9 Iustin Pop
      if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
543 ac0930b9 Iustin Pop
                                 constants.JOB_STATUS_RUNNING):
544 205d71fd Michael Hanselmann
        logging.debug("Cleaning job %s from the cache", job.id)
545 ac0930b9 Iustin Pop
        try:
546 ac0930b9 Iustin Pop
          del self._memcache[job.id]
547 ac0930b9 Iustin Pop
        except KeyError:
548 ac0930b9 Iustin Pop
          pass
549 f1da30e6 Michael Hanselmann
550 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
551 f1da30e6 Michael Hanselmann
  def UpdateJob(self, job):
552 f1da30e6 Michael Hanselmann
    return self._UpdateJobUnlocked(job)
553 f1da30e6 Michael Hanselmann
554 c609f802 Michael Hanselmann
  @utils.LockedMethod
555 f1da30e6 Michael Hanselmann
  def ArchiveJob(self, job_id):
556 c609f802 Michael Hanselmann
    """Archives a job.
557 c609f802 Michael Hanselmann

558 c609f802 Michael Hanselmann
    @type job_id: string
559 c609f802 Michael Hanselmann
    @param job_id: Job ID of job to be archived.
560 c609f802 Michael Hanselmann

561 c609f802 Michael Hanselmann
    """
562 c609f802 Michael Hanselmann
    logging.debug("Archiving job %s", job_id)
563 c609f802 Michael Hanselmann
564 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
565 c609f802 Michael Hanselmann
    if not job:
566 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
567 c609f802 Michael Hanselmann
      return
568 c609f802 Michael Hanselmann
569 c609f802 Michael Hanselmann
    if not self._ShouldJobBeArchivedUnlocked(job):
570 c609f802 Michael Hanselmann
      return
571 c609f802 Michael Hanselmann
572 c609f802 Michael Hanselmann
    try:
573 c609f802 Michael Hanselmann
      old = self._GetJobPath(job.id)
574 c609f802 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
575 c609f802 Michael Hanselmann
576 c609f802 Michael Hanselmann
      os.rename(old, new)
577 c609f802 Michael Hanselmann
578 c609f802 Michael Hanselmann
      logging.debug("Successfully archived job %s", job.id)
579 c609f802 Michael Hanselmann
    finally:
580 c609f802 Michael Hanselmann
      # Cleaning the cache because we don't know what os.rename actually did
581 c609f802 Michael Hanselmann
      # and to be on the safe side.
582 c609f802 Michael Hanselmann
      self._CleanCacheUnlocked([])
583 f1da30e6 Michael Hanselmann
584 f1da30e6 Michael Hanselmann
585 e2715f69 Michael Hanselmann
class JobQueue:
586 e2715f69 Michael Hanselmann
  """The job queue.
587 e2715f69 Michael Hanselmann

588 ce594241 Michael Hanselmann
  """
589 e2715f69 Michael Hanselmann
  def __init__(self, context):
590 e2715f69 Michael Hanselmann
    self._lock = threading.Lock()
591 ce594241 Michael Hanselmann
    self._jobs = DiskJobStorage("")
592 e2715f69 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(context)
593 e2715f69 Michael Hanselmann
594 f1da30e6 Michael Hanselmann
    for job in self._jobs.GetJobs(None):
595 f1da30e6 Michael Hanselmann
      status = job.GetStatus()
596 f1da30e6 Michael Hanselmann
      if status in (constants.JOB_STATUS_QUEUED, ):
597 f1da30e6 Michael Hanselmann
        self._wpool.AddTask(job)
598 e2715f69 Michael Hanselmann
599 f1da30e6 Michael Hanselmann
      elif status in (constants.JOB_STATUS_RUNNING, ):
600 f1da30e6 Michael Hanselmann
        logging.warning("Unfinished job %s found: %s", job.id, job)
601 f1da30e6 Michael Hanselmann
        job.SetUnclean("Unclean master daemon shutdown")
602 e2715f69 Michael Hanselmann
603 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
604 c3f0a12f Iustin Pop
  def SubmitJob(self, ops, nodes):
605 e2715f69 Michael Hanselmann
    """Add a new job to the queue.
606 e2715f69 Michael Hanselmann

607 e2715f69 Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
608 e2715f69 Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
609 e2715f69 Michael Hanselmann

610 c3f0a12f Iustin Pop
    @type ops: list
611 c3f0a12f Iustin Pop
    @param ops: the sequence of opcodes that will become the new job
612 c3f0a12f Iustin Pop
    @type nodes: list
613 c3f0a12f Iustin Pop
    @param nodes: the list of nodes to which the queue should be
614 c3f0a12f Iustin Pop
                  distributed
615 e2715f69 Michael Hanselmann

616 e2715f69 Michael Hanselmann
    """
617 c3f0a12f Iustin Pop
    job = self._jobs.AddJob(ops, nodes)
618 e2715f69 Michael Hanselmann
619 e2715f69 Michael Hanselmann
    # Add to worker pool
620 e2715f69 Michael Hanselmann
    self._wpool.AddTask(job)
621 e2715f69 Michael Hanselmann
622 f1da30e6 Michael Hanselmann
    return job.id
623 e2715f69 Michael Hanselmann
624 e2715f69 Michael Hanselmann
  def ArchiveJob(self, job_id):
625 c609f802 Michael Hanselmann
    self._jobs.ArchiveJob(job_id)
626 e2715f69 Michael Hanselmann
627 e2715f69 Michael Hanselmann
  def CancelJob(self, job_id):
628 e2715f69 Michael Hanselmann
    raise NotImplementedError()
629 e2715f69 Michael Hanselmann
630 e2715f69 Michael Hanselmann
  def _GetJobInfo(self, job, fields):
631 e2715f69 Michael Hanselmann
    row = []
632 e2715f69 Michael Hanselmann
    for fname in fields:
633 e2715f69 Michael Hanselmann
      if fname == "id":
634 e2715f69 Michael Hanselmann
        row.append(job.id)
635 e2715f69 Michael Hanselmann
      elif fname == "status":
636 e2715f69 Michael Hanselmann
        row.append(job.GetStatus())
637 af30b2fd Michael Hanselmann
      elif fname == "ops":
638 af30b2fd Michael Hanselmann
        row.append([op.GetInput().__getstate__() for op in job._ops])
639 af30b2fd Michael Hanselmann
      elif fname == "opresult":
640 307149a8 Iustin Pop
        row.append([op.GetResult() for op in job._ops])
641 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
642 af30b2fd Michael Hanselmann
        row.append([op.GetStatus() for op in job._ops])
643 f1048938 Iustin Pop
      elif fname == "ticker":
644 f1048938 Iustin Pop
        ji = job.GetRunOpIndex()
645 f1048938 Iustin Pop
        if ji < 0:
646 f1048938 Iustin Pop
          lmsg = None
647 f1048938 Iustin Pop
        else:
648 f1048938 Iustin Pop
          lmsg = job._ops[ji].RetrieveLog(-1)
649 f1048938 Iustin Pop
          # message might be empty here
650 f1048938 Iustin Pop
          if lmsg:
651 f1048938 Iustin Pop
            lmsg = lmsg[0]
652 f1048938 Iustin Pop
          else:
653 f1048938 Iustin Pop
            lmsg = None
654 f1048938 Iustin Pop
        row.append(lmsg)
655 e2715f69 Michael Hanselmann
      else:
656 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
657 e2715f69 Michael Hanselmann
    return row
658 e2715f69 Michael Hanselmann
659 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
660 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
661 e2715f69 Michael Hanselmann

662 e2715f69 Michael Hanselmann
    Args:
663 e2715f69 Michael Hanselmann
    - job_ids: Sequence of job identifiers or None for all
664 e2715f69 Michael Hanselmann
    - fields: Names of fields to return
665 e2715f69 Michael Hanselmann

666 e2715f69 Michael Hanselmann
    """
667 e2715f69 Michael Hanselmann
    self._lock.acquire()
668 e2715f69 Michael Hanselmann
    try:
669 e2715f69 Michael Hanselmann
      jobs = []
670 e2715f69 Michael Hanselmann
671 f1da30e6 Michael Hanselmann
      for job in self._jobs.GetJobs(job_ids):
672 e2715f69 Michael Hanselmann
        if job is None:
673 e2715f69 Michael Hanselmann
          jobs.append(None)
674 e2715f69 Michael Hanselmann
        else:
675 e2715f69 Michael Hanselmann
          jobs.append(self._GetJobInfo(job, fields))
676 e2715f69 Michael Hanselmann
677 e2715f69 Michael Hanselmann
      return jobs
678 e2715f69 Michael Hanselmann
    finally:
679 e2715f69 Michael Hanselmann
      self._lock.release()
680 e2715f69 Michael Hanselmann
681 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
682 e2715f69 Michael Hanselmann
  def Shutdown(self):
683 e2715f69 Michael Hanselmann
    """Stops the job queue.
684 e2715f69 Michael Hanselmann

685 e2715f69 Michael Hanselmann
    """
686 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
687 f1da30e6 Michael Hanselmann
    self._jobs.Close()