Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 64381ad7

History | View | Annotate | Download (17.1 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 498ae1cc Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 498ae1cc Iustin Pop
"""Module implementing the job queue handling."""
23 498ae1cc Iustin Pop
24 f1da30e6 Michael Hanselmann
import os
25 e2715f69 Michael Hanselmann
import logging
26 e2715f69 Michael Hanselmann
import threading
27 f1da30e6 Michael Hanselmann
import errno
28 f1da30e6 Michael Hanselmann
import re
29 f1048938 Iustin Pop
import time
30 498ae1cc Iustin Pop
31 e2715f69 Michael Hanselmann
from ganeti import constants
32 f1da30e6 Michael Hanselmann
from ganeti import serializer
33 e2715f69 Michael Hanselmann
from ganeti import workerpool
34 f1da30e6 Michael Hanselmann
from ganeti import opcodes
35 7a1ecaed Iustin Pop
from ganeti import errors
36 e2715f69 Michael Hanselmann
from ganeti import mcpu
37 7996a135 Iustin Pop
from ganeti import utils
38 c3f0a12f Iustin Pop
from ganeti import rpc
39 e2715f69 Michael Hanselmann
40 e2715f69 Michael Hanselmann
41 e2715f69 Michael Hanselmann
JOBQUEUE_THREADS = 5
42 e2715f69 Michael Hanselmann
43 498ae1cc Iustin Pop
44 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
45 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
46 e2715f69 Michael Hanselmann

47 307149a8 Iustin Pop
  Access is synchronized by the '_lock' attribute.
48 e2715f69 Michael Hanselmann

49 f1048938 Iustin Pop
  The 'log' attribute holds the execution log and consists of tuples
50 f1048938 Iustin Pop
  of the form (timestamp, level, message).
51 f1048938 Iustin Pop

52 e2715f69 Michael Hanselmann
  """
53 85f03e0d Michael Hanselmann
  def __new__(cls, *args, **kwargs):
54 85f03e0d Michael Hanselmann
    obj = object.__new__(cls, *args, **kwargs)
55 85f03e0d Michael Hanselmann
    # Create a special lock for logging
56 85f03e0d Michael Hanselmann
    obj._log_lock = threading.Lock()
57 85f03e0d Michael Hanselmann
    return obj
58 f1da30e6 Michael Hanselmann
59 85f03e0d Michael Hanselmann
  def __init__(self, op):
60 85f03e0d Michael Hanselmann
    self.input = op
61 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
62 85f03e0d Michael Hanselmann
    self.result = None
63 85f03e0d Michael Hanselmann
    self.log = []
64 f1da30e6 Michael Hanselmann
65 f1da30e6 Michael Hanselmann
  @classmethod
66 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
67 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
68 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
69 85f03e0d Michael Hanselmann
    obj.status = state["status"]
70 85f03e0d Michael Hanselmann
    obj.result = state["result"]
71 85f03e0d Michael Hanselmann
    obj.log = state["log"]
72 f1da30e6 Michael Hanselmann
    return obj
73 f1da30e6 Michael Hanselmann
74 f1da30e6 Michael Hanselmann
  def Serialize(self):
75 85f03e0d Michael Hanselmann
    self._log_lock.acquire()
76 85f03e0d Michael Hanselmann
    try:
77 85f03e0d Michael Hanselmann
      return {
78 85f03e0d Michael Hanselmann
        "input": self.input.__getstate__(),
79 85f03e0d Michael Hanselmann
        "status": self.status,
80 85f03e0d Michael Hanselmann
        "result": self.result,
81 85f03e0d Michael Hanselmann
        "log": self.log,
82 85f03e0d Michael Hanselmann
        }
83 85f03e0d Michael Hanselmann
    finally:
84 85f03e0d Michael Hanselmann
      self._log_lock.release()
85 e2715f69 Michael Hanselmann
86 f1048938 Iustin Pop
  def Log(self, *args):
87 f1048938 Iustin Pop
    """Append a log entry.
88 f1048938 Iustin Pop

89 f1048938 Iustin Pop
    """
90 85f03e0d Michael Hanselmann
    assert len(args) < 3
91 f1048938 Iustin Pop
92 f1048938 Iustin Pop
    if len(args) == 1:
93 f1048938 Iustin Pop
      log_type = constants.ELOG_MESSAGE
94 f1048938 Iustin Pop
      log_msg = args[0]
95 f1048938 Iustin Pop
    else:
96 f1048938 Iustin Pop
      log_type, log_msg = args
97 f1048938 Iustin Pop
98 85f03e0d Michael Hanselmann
    self._log_lock.acquire()
99 85f03e0d Michael Hanselmann
    try:
100 85f03e0d Michael Hanselmann
      self.log.append((time.time(), log_type, log_msg))
101 85f03e0d Michael Hanselmann
    finally:
102 85f03e0d Michael Hanselmann
      self._log_lock.release()
103 85f03e0d Michael Hanselmann
104 f1048938 Iustin Pop
  def RetrieveLog(self, start_at=0):
105 f1048938 Iustin Pop
    """Retrieve (a part of) the execution log.
106 f1048938 Iustin Pop

107 f1048938 Iustin Pop
    """
108 85f03e0d Michael Hanselmann
    self._log_lock.acquire()
109 85f03e0d Michael Hanselmann
    try:
110 85f03e0d Michael Hanselmann
      return self.log[start_at:]
111 85f03e0d Michael Hanselmann
    finally:
112 85f03e0d Michael Hanselmann
      self._log_lock.release()
113 f1048938 Iustin Pop
114 e2715f69 Michael Hanselmann
115 e2715f69 Michael Hanselmann
class _QueuedJob(object):
116 e2715f69 Michael Hanselmann
  """In-memory job representation.
117 e2715f69 Michael Hanselmann

118 e2715f69 Michael Hanselmann
  This is what we use to track the user-submitted jobs.
119 e2715f69 Michael Hanselmann

120 e2715f69 Michael Hanselmann
  """
121 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
122 e2715f69 Michael Hanselmann
    if not ops:
123 e2715f69 Michael Hanselmann
      # TODO
124 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
125 e2715f69 Michael Hanselmann
126 85f03e0d Michael Hanselmann
    self.queue = queue
127 f1da30e6 Michael Hanselmann
    self.id = job_id
128 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
129 85f03e0d Michael Hanselmann
    self.run_op_index = -1
130 f1da30e6 Michael Hanselmann
131 f1da30e6 Michael Hanselmann
  @classmethod
132 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
133 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
134 85f03e0d Michael Hanselmann
    obj.queue = queue
135 85f03e0d Michael Hanselmann
    obj.id = state["id"]
136 85f03e0d Michael Hanselmann
    obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
137 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
138 f1da30e6 Michael Hanselmann
    return obj
139 f1da30e6 Michael Hanselmann
140 f1da30e6 Michael Hanselmann
  def Serialize(self):
141 f1da30e6 Michael Hanselmann
    return {
142 f1da30e6 Michael Hanselmann
      "id": self.id,
143 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
144 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
145 f1da30e6 Michael Hanselmann
      }
146 f1da30e6 Michael Hanselmann
147 85f03e0d Michael Hanselmann
  def CalcStatus(self):
148 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
149 e2715f69 Michael Hanselmann
150 e2715f69 Michael Hanselmann
    all_success = True
151 85f03e0d Michael Hanselmann
    for op in self.ops:
152 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
153 e2715f69 Michael Hanselmann
        continue
154 e2715f69 Michael Hanselmann
155 e2715f69 Michael Hanselmann
      all_success = False
156 e2715f69 Michael Hanselmann
157 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
158 e2715f69 Michael Hanselmann
        pass
159 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
160 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
161 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
162 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
163 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
164 f1da30e6 Michael Hanselmann
        break
165 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
166 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
167 4cb1d919 Michael Hanselmann
        break
168 e2715f69 Michael Hanselmann
169 e2715f69 Michael Hanselmann
    if all_success:
170 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
171 e2715f69 Michael Hanselmann
172 e2715f69 Michael Hanselmann
    return status
173 e2715f69 Michael Hanselmann
174 f1048938 Iustin Pop
175 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
176 85f03e0d Michael Hanselmann
  def RunTask(self, job):
177 e2715f69 Michael Hanselmann
    """Job executor.
178 e2715f69 Michael Hanselmann

179 85f03e0d Michael Hanselmann
    This functions processes a job.
180 e2715f69 Michael Hanselmann

181 e2715f69 Michael Hanselmann
    """
182 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
183 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
184 f1048938 Iustin Pop
    proc = mcpu.Processor(self.pool.context)
185 85f03e0d Michael Hanselmann
    queue = job.queue
186 e2715f69 Michael Hanselmann
    try:
187 85f03e0d Michael Hanselmann
      try:
188 85f03e0d Michael Hanselmann
        count = len(job.ops)
189 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
190 85f03e0d Michael Hanselmann
          try:
191 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
192 85f03e0d Michael Hanselmann
193 85f03e0d Michael Hanselmann
            queue.acquire()
194 85f03e0d Michael Hanselmann
            try:
195 85f03e0d Michael Hanselmann
              job.run_op_index = idx
196 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_RUNNING
197 85f03e0d Michael Hanselmann
              op.result = None
198 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
199 85f03e0d Michael Hanselmann
200 38206f3c Iustin Pop
              input_opcode = op.input
201 85f03e0d Michael Hanselmann
            finally:
202 85f03e0d Michael Hanselmann
              queue.release()
203 85f03e0d Michael Hanselmann
204 38206f3c Iustin Pop
            result = proc.ExecOpCode(input_opcode, op.Log)
205 85f03e0d Michael Hanselmann
206 85f03e0d Michael Hanselmann
            queue.acquire()
207 85f03e0d Michael Hanselmann
            try:
208 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
209 85f03e0d Michael Hanselmann
              op.result = result
210 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
211 85f03e0d Michael Hanselmann
            finally:
212 85f03e0d Michael Hanselmann
              queue.release()
213 85f03e0d Michael Hanselmann
214 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
215 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
216 85f03e0d Michael Hanselmann
          except Exception, err:
217 85f03e0d Michael Hanselmann
            queue.acquire()
218 85f03e0d Michael Hanselmann
            try:
219 85f03e0d Michael Hanselmann
              try:
220 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
221 85f03e0d Michael Hanselmann
                op.result = str(err)
222 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
223 85f03e0d Michael Hanselmann
              finally:
224 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
225 85f03e0d Michael Hanselmann
            finally:
226 85f03e0d Michael Hanselmann
              queue.release()
227 85f03e0d Michael Hanselmann
            raise
228 85f03e0d Michael Hanselmann
229 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
230 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
231 85f03e0d Michael Hanselmann
      except:
232 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
233 e2715f69 Michael Hanselmann
    finally:
234 85f03e0d Michael Hanselmann
      queue.acquire()
235 85f03e0d Michael Hanselmann
      try:
236 85f03e0d Michael Hanselmann
        job_id = job.id
237 85f03e0d Michael Hanselmann
        status = job.CalcStatus()
238 85f03e0d Michael Hanselmann
      finally:
239 85f03e0d Michael Hanselmann
        queue.release()
240 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
241 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
242 e2715f69 Michael Hanselmann
243 e2715f69 Michael Hanselmann
244 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
245 e2715f69 Michael Hanselmann
  def __init__(self, context):
246 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
247 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
248 e2715f69 Michael Hanselmann
    self.context = context
249 e2715f69 Michael Hanselmann
250 e2715f69 Michael Hanselmann
251 85f03e0d Michael Hanselmann
class JobQueue(object):
252 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
253 f1da30e6 Michael Hanselmann
254 85f03e0d Michael Hanselmann
  def __init__(self, context):
255 ac0930b9 Iustin Pop
    self._memcache = {}
256 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
257 f1da30e6 Michael Hanselmann
258 85f03e0d Michael Hanselmann
    # Locking
259 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
260 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
261 85f03e0d Michael Hanselmann
    self.release = self._lock.release
262 85f03e0d Michael Hanselmann
263 0cb94105 Michael Hanselmann
    # Make sure our directories exists
264 0cb94105 Michael Hanselmann
    for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
265 0cb94105 Michael Hanselmann
      try:
266 0cb94105 Michael Hanselmann
        os.mkdir(path, 0700)
267 0cb94105 Michael Hanselmann
      except OSError, err:
268 0cb94105 Michael Hanselmann
        if err.errno not in (errno.EEXIST, ):
269 0cb94105 Michael Hanselmann
          raise
270 f1da30e6 Michael Hanselmann
271 f1da30e6 Michael Hanselmann
    # Get queue lock
272 f1da30e6 Michael Hanselmann
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
273 f1da30e6 Michael Hanselmann
    try:
274 f1da30e6 Michael Hanselmann
      utils.LockFile(self.lock_fd)
275 f1da30e6 Michael Hanselmann
    except:
276 f1da30e6 Michael Hanselmann
      self.lock_fd.close()
277 f1da30e6 Michael Hanselmann
      raise
278 f1da30e6 Michael Hanselmann
279 f1da30e6 Michael Hanselmann
    # Read version
280 f1da30e6 Michael Hanselmann
    try:
281 f1da30e6 Michael Hanselmann
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
282 f1da30e6 Michael Hanselmann
    except IOError, err:
283 f1da30e6 Michael Hanselmann
      if err.errno not in (errno.ENOENT, ):
284 f1da30e6 Michael Hanselmann
        raise
285 f1da30e6 Michael Hanselmann
286 f1da30e6 Michael Hanselmann
      # Setup a new queue
287 f1da30e6 Michael Hanselmann
      self._InitQueueUnlocked()
288 f1da30e6 Michael Hanselmann
289 f1da30e6 Michael Hanselmann
      # Try to open again
290 f1da30e6 Michael Hanselmann
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
291 f1da30e6 Michael Hanselmann
292 f1da30e6 Michael Hanselmann
    try:
293 f1da30e6 Michael Hanselmann
      # Try to read version
294 f1da30e6 Michael Hanselmann
      version = int(version_fd.read(128))
295 f1da30e6 Michael Hanselmann
296 f1da30e6 Michael Hanselmann
      # Verify version
297 f1da30e6 Michael Hanselmann
      if version != constants.JOB_QUEUE_VERSION:
298 f1da30e6 Michael Hanselmann
        raise errors.JobQueueError("Found version %s, expected %s",
299 f1da30e6 Michael Hanselmann
                                   version, constants.JOB_QUEUE_VERSION)
300 f1da30e6 Michael Hanselmann
    finally:
301 f1da30e6 Michael Hanselmann
      version_fd.close()
302 f1da30e6 Michael Hanselmann
303 c4beba1c Iustin Pop
    self._last_serial = self._ReadSerial()
304 c4beba1c Iustin Pop
    if self._last_serial is None:
305 c4beba1c Iustin Pop
      raise errors.ConfigurationError("Can't read/parse the job queue serial"
306 c4beba1c Iustin Pop
                                      " file")
307 c4beba1c Iustin Pop
308 85f03e0d Michael Hanselmann
    # Setup worker pool
309 85f03e0d Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(context)
310 85f03e0d Michael Hanselmann
311 85f03e0d Michael Hanselmann
    # We need to lock here because WorkerPool.AddTask() may start a job while
312 85f03e0d Michael Hanselmann
    # we're still doing our work.
313 85f03e0d Michael Hanselmann
    self.acquire()
314 85f03e0d Michael Hanselmann
    try:
315 85f03e0d Michael Hanselmann
      for job in self._GetJobsUnlocked(None):
316 85f03e0d Michael Hanselmann
        status = job.CalcStatus()
317 85f03e0d Michael Hanselmann
318 85f03e0d Michael Hanselmann
        if status in (constants.JOB_STATUS_QUEUED, ):
319 85f03e0d Michael Hanselmann
          self._wpool.AddTask(job)
320 85f03e0d Michael Hanselmann
321 85f03e0d Michael Hanselmann
        elif status in (constants.JOB_STATUS_RUNNING, ):
322 85f03e0d Michael Hanselmann
          logging.warning("Unfinished job %s found: %s", job.id, job)
323 85f03e0d Michael Hanselmann
          try:
324 85f03e0d Michael Hanselmann
            for op in job.ops:
325 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_ERROR
326 85f03e0d Michael Hanselmann
              op.result = "Unclean master daemon shutdown"
327 85f03e0d Michael Hanselmann
          finally:
328 85f03e0d Michael Hanselmann
            self.UpdateJobUnlocked(job)
329 85f03e0d Michael Hanselmann
    finally:
330 85f03e0d Michael Hanselmann
      self.release()
331 85f03e0d Michael Hanselmann
332 c4beba1c Iustin Pop
  @staticmethod
333 c4beba1c Iustin Pop
  def _ReadSerial():
334 c4beba1c Iustin Pop
    """Try to read the job serial file.
335 c4beba1c Iustin Pop

336 c4beba1c Iustin Pop
    @rtype: None or int
337 c4beba1c Iustin Pop
    @return: If the serial can be read, then it is returned. Otherwise None
338 c4beba1c Iustin Pop
             is returned.
339 c4beba1c Iustin Pop

340 c4beba1c Iustin Pop
    """
341 f1da30e6 Michael Hanselmann
    try:
342 c4beba1c Iustin Pop
      serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
343 c4beba1c Iustin Pop
      try:
344 c4beba1c Iustin Pop
        # Read last serial
345 c4beba1c Iustin Pop
        serial = int(serial_fd.read(1024).strip())
346 c4beba1c Iustin Pop
      finally:
347 c4beba1c Iustin Pop
        serial_fd.close()
348 c4beba1c Iustin Pop
    except (ValueError, EnvironmentError):
349 c4beba1c Iustin Pop
      serial = None
350 c4beba1c Iustin Pop
351 c4beba1c Iustin Pop
    return serial
352 f1da30e6 Michael Hanselmann
353 f1da30e6 Michael Hanselmann
  def _InitQueueUnlocked(self):
354 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
355 f1da30e6 Michael Hanselmann
356 f1da30e6 Michael Hanselmann
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
357 f1da30e6 Michael Hanselmann
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
358 c4beba1c Iustin Pop
    if self._ReadSerial() is None:
359 c4beba1c Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
360 c4beba1c Iustin Pop
                      data="%s\n" % 0)
361 f1da30e6 Michael Hanselmann
362 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
363 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
364 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
365 85f03e0d Michael Hanselmann
    if job_id < 0:
366 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
367 85f03e0d Michael Hanselmann
368 85f03e0d Michael Hanselmann
    return str(job_id)
369 85f03e0d Michael Hanselmann
370 c3f0a12f Iustin Pop
  def _NewSerialUnlocked(self, nodes):
371 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
372 f1da30e6 Michael Hanselmann

373 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
374 f1da30e6 Michael Hanselmann

375 f1da30e6 Michael Hanselmann
    Returns: A string representing the job identifier.
376 f1da30e6 Michael Hanselmann

377 f1da30e6 Michael Hanselmann
    """
378 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
379 f1da30e6 Michael Hanselmann
380 f1da30e6 Michael Hanselmann
    # New number
381 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
382 f1da30e6 Michael Hanselmann
383 f1da30e6 Michael Hanselmann
    # Write to file
384 f1da30e6 Michael Hanselmann
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
385 f1da30e6 Michael Hanselmann
                    data="%s\n" % serial)
386 f1da30e6 Michael Hanselmann
387 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
388 f1da30e6 Michael Hanselmann
    self._last_serial = serial
389 f1da30e6 Michael Hanselmann
390 c3f0a12f Iustin Pop
    # Distribute the serial to the other nodes
391 c3f0a12f Iustin Pop
    try:
392 c3f0a12f Iustin Pop
      nodes.remove(self._my_hostname)
393 c3f0a12f Iustin Pop
    except ValueError:
394 c3f0a12f Iustin Pop
      pass
395 c3f0a12f Iustin Pop
396 c3f0a12f Iustin Pop
    result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
397 c3f0a12f Iustin Pop
    for node in nodes:
398 c3f0a12f Iustin Pop
      if not result[node]:
399 c3f0a12f Iustin Pop
        logging.error("copy of job queue file to node %s failed", node)
400 c3f0a12f Iustin Pop
401 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
402 f1da30e6 Michael Hanselmann
403 85f03e0d Michael Hanselmann
  @staticmethod
404 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
405 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
406 f1da30e6 Michael Hanselmann
407 85f03e0d Michael Hanselmann
  @staticmethod
408 85f03e0d Michael Hanselmann
  def _GetArchivedJobPath(job_id):
409 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
410 0cb94105 Michael Hanselmann
411 85f03e0d Michael Hanselmann
  @classmethod
412 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
413 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
414 fae737ac Michael Hanselmann
    if m:
415 fae737ac Michael Hanselmann
      return m.group(1)
416 fae737ac Michael Hanselmann
    else:
417 fae737ac Michael Hanselmann
      return None
418 fae737ac Michael Hanselmann
419 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
420 911a495b Iustin Pop
    """Return all known job IDs.
421 911a495b Iustin Pop

422 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
423 911a495b Iustin Pop
    included. Currently this argument is unused.
424 911a495b Iustin Pop

425 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
426 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
427 ac0930b9 Iustin Pop
    extra IDs).
428 ac0930b9 Iustin Pop

429 911a495b Iustin Pop
    """
430 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
431 f0d874fe Iustin Pop
    jlist.sort()
432 f0d874fe Iustin Pop
    return jlist
433 911a495b Iustin Pop
434 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
435 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
436 f1da30e6 Michael Hanselmann
437 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
438 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
439 f1da30e6 Michael Hanselmann
440 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
441 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
442 f1da30e6 Michael Hanselmann
443 ac0930b9 Iustin Pop
    if job_id in self._memcache:
444 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
445 ac0930b9 Iustin Pop
      return self._memcache[job_id]
446 ac0930b9 Iustin Pop
447 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
448 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
449 f1da30e6 Michael Hanselmann
    try:
450 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
451 f1da30e6 Michael Hanselmann
    except IOError, err:
452 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
453 f1da30e6 Michael Hanselmann
        return None
454 f1da30e6 Michael Hanselmann
      raise
455 f1da30e6 Michael Hanselmann
    try:
456 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
457 f1da30e6 Michael Hanselmann
    finally:
458 f1da30e6 Michael Hanselmann
      fd.close()
459 f1da30e6 Michael Hanselmann
460 ac0930b9 Iustin Pop
    job = _QueuedJob.Restore(self, data)
461 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
462 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
463 ac0930b9 Iustin Pop
    return job
464 f1da30e6 Michael Hanselmann
465 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
466 911a495b Iustin Pop
    if not job_ids:
467 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
468 f1da30e6 Michael Hanselmann
469 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
470 f1da30e6 Michael Hanselmann
471 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
472 85f03e0d Michael Hanselmann
  def SubmitJob(self, ops, nodes):
473 85f03e0d Michael Hanselmann
    """Create and store a new job.
474 f1da30e6 Michael Hanselmann

475 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
476 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
477 c3f0a12f Iustin Pop

478 c3f0a12f Iustin Pop
    @type ops: list
479 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
480 c3f0a12f Iustin Pop
    @type nodes: list
481 c3f0a12f Iustin Pop
    @param nodes: The list of nodes to which the new job serial will be
482 c3f0a12f Iustin Pop
                  distributed.
483 c3f0a12f Iustin Pop

484 c3f0a12f Iustin Pop
    """
485 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
486 f1da30e6 Michael Hanselmann
487 f1da30e6 Michael Hanselmann
    # Get job identifier
488 c3f0a12f Iustin Pop
    job_id = self._NewSerialUnlocked(nodes)
489 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
490 f1da30e6 Michael Hanselmann
491 f1da30e6 Michael Hanselmann
    # Write to disk
492 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
493 f1da30e6 Michael Hanselmann
494 205d71fd Michael Hanselmann
    logging.debug("Added new job %s to the cache", job_id)
495 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
496 ac0930b9 Iustin Pop
497 85f03e0d Michael Hanselmann
    # Add to worker pool
498 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
499 85f03e0d Michael Hanselmann
500 85f03e0d Michael Hanselmann
    return job.id
501 f1da30e6 Michael Hanselmann
502 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
503 f1da30e6 Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
504 f1da30e6 Michael Hanselmann
505 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
506 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
507 f1da30e6 Michael Hanselmann
    utils.WriteFile(filename,
508 f1da30e6 Michael Hanselmann
                    data=serializer.DumpJson(job.Serialize(), indent=False))
509 57f8615f Michael Hanselmann
    self._CleanCacheUnlocked([job.id])
510 ac0930b9 Iustin Pop
511 57f8615f Michael Hanselmann
  def _CleanCacheUnlocked(self, exclude):
512 ac0930b9 Iustin Pop
    """Clean the memory cache.
513 ac0930b9 Iustin Pop

514 ac0930b9 Iustin Pop
    The exceptions argument contains job IDs that should not be
515 ac0930b9 Iustin Pop
    cleaned.
516 ac0930b9 Iustin Pop

517 ac0930b9 Iustin Pop
    """
518 57f8615f Michael Hanselmann
    assert isinstance(exclude, list)
519 85f03e0d Michael Hanselmann
520 ac0930b9 Iustin Pop
    for job in self._memcache.values():
521 57f8615f Michael Hanselmann
      if job.id in exclude:
522 ac0930b9 Iustin Pop
        continue
523 85f03e0d Michael Hanselmann
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
524 85f03e0d Michael Hanselmann
                                  constants.JOB_STATUS_RUNNING):
525 205d71fd Michael Hanselmann
        logging.debug("Cleaning job %s from the cache", job.id)
526 ac0930b9 Iustin Pop
        try:
527 ac0930b9 Iustin Pop
          del self._memcache[job.id]
528 ac0930b9 Iustin Pop
        except KeyError:
529 ac0930b9 Iustin Pop
          pass
530 f1da30e6 Michael Hanselmann
531 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
532 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
533 188c5e0a Michael Hanselmann
    """Cancels a job.
534 188c5e0a Michael Hanselmann

535 188c5e0a Michael Hanselmann
    @type job_id: string
536 188c5e0a Michael Hanselmann
    @param job_id: Job ID of job to be cancelled.
537 188c5e0a Michael Hanselmann

538 188c5e0a Michael Hanselmann
    """
539 188c5e0a Michael Hanselmann
    logging.debug("Cancelling job %s", job_id)
540 188c5e0a Michael Hanselmann
541 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
542 188c5e0a Michael Hanselmann
    if not job:
543 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
544 188c5e0a Michael Hanselmann
      return
545 188c5e0a Michael Hanselmann
546 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
547 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
548 188c5e0a Michael Hanselmann
      return
549 188c5e0a Michael Hanselmann
550 85f03e0d Michael Hanselmann
    try:
551 85f03e0d Michael Hanselmann
      for op in job.ops:
552 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
553 85f03e0d Michael Hanselmann
        op.result = "Job cancelled by request"
554 85f03e0d Michael Hanselmann
    finally:
555 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
556 188c5e0a Michael Hanselmann
557 c609f802 Michael Hanselmann
  @utils.LockedMethod
558 f1da30e6 Michael Hanselmann
  def ArchiveJob(self, job_id):
559 c609f802 Michael Hanselmann
    """Archives a job.
560 c609f802 Michael Hanselmann

561 c609f802 Michael Hanselmann
    @type job_id: string
562 c609f802 Michael Hanselmann
    @param job_id: Job ID of job to be archived.
563 c609f802 Michael Hanselmann

564 c609f802 Michael Hanselmann
    """
565 c609f802 Michael Hanselmann
    logging.debug("Archiving job %s", job_id)
566 c609f802 Michael Hanselmann
567 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
568 c609f802 Michael Hanselmann
    if not job:
569 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
570 c609f802 Michael Hanselmann
      return
571 c609f802 Michael Hanselmann
572 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
573 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_SUCCESS,
574 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_ERROR):
575 85f03e0d Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
576 c609f802 Michael Hanselmann
      return
577 c609f802 Michael Hanselmann
578 c609f802 Michael Hanselmann
    try:
579 c609f802 Michael Hanselmann
      old = self._GetJobPath(job.id)
580 c609f802 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
581 c609f802 Michael Hanselmann
582 c609f802 Michael Hanselmann
      os.rename(old, new)
583 c609f802 Michael Hanselmann
584 c609f802 Michael Hanselmann
      logging.debug("Successfully archived job %s", job.id)
585 c609f802 Michael Hanselmann
    finally:
586 c609f802 Michael Hanselmann
      # Cleaning the cache because we don't know what os.rename actually did
587 c609f802 Michael Hanselmann
      # and to be on the safe side.
588 c609f802 Michael Hanselmann
      self._CleanCacheUnlocked([])
589 f1da30e6 Michael Hanselmann
590 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
591 e2715f69 Michael Hanselmann
    row = []
592 e2715f69 Michael Hanselmann
    for fname in fields:
593 e2715f69 Michael Hanselmann
      if fname == "id":
594 e2715f69 Michael Hanselmann
        row.append(job.id)
595 e2715f69 Michael Hanselmann
      elif fname == "status":
596 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
597 af30b2fd Michael Hanselmann
      elif fname == "ops":
598 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
599 af30b2fd Michael Hanselmann
      elif fname == "opresult":
600 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
601 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
602 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
603 f1048938 Iustin Pop
      elif fname == "ticker":
604 85f03e0d Michael Hanselmann
        ji = job.run_op_index
605 f1048938 Iustin Pop
        if ji < 0:
606 f1048938 Iustin Pop
          lmsg = None
607 f1048938 Iustin Pop
        else:
608 85f03e0d Michael Hanselmann
          lmsg = job.ops[ji].RetrieveLog(-1)
609 f1048938 Iustin Pop
          # message might be empty here
610 f1048938 Iustin Pop
          if lmsg:
611 f1048938 Iustin Pop
            lmsg = lmsg[0]
612 f1048938 Iustin Pop
          else:
613 f1048938 Iustin Pop
            lmsg = None
614 f1048938 Iustin Pop
        row.append(lmsg)
615 e2715f69 Michael Hanselmann
      else:
616 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
617 e2715f69 Michael Hanselmann
    return row
618 e2715f69 Michael Hanselmann
619 85f03e0d Michael Hanselmann
  @utils.LockedMethod
620 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
621 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
622 e2715f69 Michael Hanselmann

623 e2715f69 Michael Hanselmann
    Args:
624 e2715f69 Michael Hanselmann
    - job_ids: Sequence of job identifiers or None for all
625 e2715f69 Michael Hanselmann
    - fields: Names of fields to return
626 e2715f69 Michael Hanselmann

627 e2715f69 Michael Hanselmann
    """
628 85f03e0d Michael Hanselmann
    jobs = []
629 e2715f69 Michael Hanselmann
630 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
631 85f03e0d Michael Hanselmann
      if job is None:
632 85f03e0d Michael Hanselmann
        jobs.append(None)
633 85f03e0d Michael Hanselmann
      else:
634 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
635 e2715f69 Michael Hanselmann
636 85f03e0d Michael Hanselmann
    return jobs
637 e2715f69 Michael Hanselmann
638 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
639 e2715f69 Michael Hanselmann
  def Shutdown(self):
640 e2715f69 Michael Hanselmann
    """Stops the job queue.
641 e2715f69 Michael Hanselmann

642 e2715f69 Michael Hanselmann
    """
643 85f03e0d Michael Hanselmann
    assert self.lock_fd, "Queue should be open"
644 85f03e0d Michael Hanselmann
645 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
646 85f03e0d Michael Hanselmann
647 85f03e0d Michael Hanselmann
    self.lock_fd.close()
648 85f03e0d Michael Hanselmann
    self.lock_fd = None