Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ abc1f2ce

History | View | Annotate | Download (17.3 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 498ae1cc Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 498ae1cc Iustin Pop
"""Module implementing the job queue handling."""
23 498ae1cc Iustin Pop
24 f1da30e6 Michael Hanselmann
import os
25 e2715f69 Michael Hanselmann
import logging
26 e2715f69 Michael Hanselmann
import threading
27 f1da30e6 Michael Hanselmann
import errno
28 f1da30e6 Michael Hanselmann
import re
29 f1048938 Iustin Pop
import time
30 498ae1cc Iustin Pop
31 e2715f69 Michael Hanselmann
from ganeti import constants
32 f1da30e6 Michael Hanselmann
from ganeti import serializer
33 e2715f69 Michael Hanselmann
from ganeti import workerpool
34 f1da30e6 Michael Hanselmann
from ganeti import opcodes
35 7a1ecaed Iustin Pop
from ganeti import errors
36 e2715f69 Michael Hanselmann
from ganeti import mcpu
37 7996a135 Iustin Pop
from ganeti import utils
38 04ab05ce Michael Hanselmann
from ganeti import jstore
39 c3f0a12f Iustin Pop
from ganeti import rpc
40 e2715f69 Michael Hanselmann
41 e2715f69 Michael Hanselmann
42 e2715f69 Michael Hanselmann
JOBQUEUE_THREADS = 5
43 e2715f69 Michael Hanselmann
44 498ae1cc Iustin Pop
45 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
46 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
47 e2715f69 Michael Hanselmann

48 307149a8 Iustin Pop
  Access is synchronized by the '_lock' attribute.
49 e2715f69 Michael Hanselmann

50 f1048938 Iustin Pop
  The 'log' attribute holds the execution log and consists of tuples
51 f1048938 Iustin Pop
  of the form (timestamp, level, message).
52 f1048938 Iustin Pop

53 e2715f69 Michael Hanselmann
  """
54 85f03e0d Michael Hanselmann
  def __new__(cls, *args, **kwargs):
55 85f03e0d Michael Hanselmann
    obj = object.__new__(cls, *args, **kwargs)
56 85f03e0d Michael Hanselmann
    # Create a special lock for logging
57 85f03e0d Michael Hanselmann
    obj._log_lock = threading.Lock()
58 85f03e0d Michael Hanselmann
    return obj
59 f1da30e6 Michael Hanselmann
60 85f03e0d Michael Hanselmann
  def __init__(self, op):
61 85f03e0d Michael Hanselmann
    self.input = op
62 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
63 85f03e0d Michael Hanselmann
    self.result = None
64 85f03e0d Michael Hanselmann
    self.log = []
65 f1da30e6 Michael Hanselmann
66 f1da30e6 Michael Hanselmann
  @classmethod
67 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
68 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
69 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
70 85f03e0d Michael Hanselmann
    obj.status = state["status"]
71 85f03e0d Michael Hanselmann
    obj.result = state["result"]
72 85f03e0d Michael Hanselmann
    obj.log = state["log"]
73 f1da30e6 Michael Hanselmann
    return obj
74 f1da30e6 Michael Hanselmann
75 f1da30e6 Michael Hanselmann
  def Serialize(self):
76 85f03e0d Michael Hanselmann
    self._log_lock.acquire()
77 85f03e0d Michael Hanselmann
    try:
78 85f03e0d Michael Hanselmann
      return {
79 85f03e0d Michael Hanselmann
        "input": self.input.__getstate__(),
80 85f03e0d Michael Hanselmann
        "status": self.status,
81 85f03e0d Michael Hanselmann
        "result": self.result,
82 85f03e0d Michael Hanselmann
        "log": self.log,
83 85f03e0d Michael Hanselmann
        }
84 85f03e0d Michael Hanselmann
    finally:
85 85f03e0d Michael Hanselmann
      self._log_lock.release()
86 e2715f69 Michael Hanselmann
87 f1048938 Iustin Pop
  def Log(self, *args):
88 f1048938 Iustin Pop
    """Append a log entry.
89 f1048938 Iustin Pop

90 f1048938 Iustin Pop
    """
91 85f03e0d Michael Hanselmann
    assert len(args) < 3
92 f1048938 Iustin Pop
93 f1048938 Iustin Pop
    if len(args) == 1:
94 f1048938 Iustin Pop
      log_type = constants.ELOG_MESSAGE
95 f1048938 Iustin Pop
      log_msg = args[0]
96 f1048938 Iustin Pop
    else:
97 f1048938 Iustin Pop
      log_type, log_msg = args
98 f1048938 Iustin Pop
99 85f03e0d Michael Hanselmann
    self._log_lock.acquire()
100 85f03e0d Michael Hanselmann
    try:
101 85f03e0d Michael Hanselmann
      self.log.append((time.time(), log_type, log_msg))
102 85f03e0d Michael Hanselmann
    finally:
103 85f03e0d Michael Hanselmann
      self._log_lock.release()
104 85f03e0d Michael Hanselmann
105 f1048938 Iustin Pop
  def RetrieveLog(self, start_at=0):
106 f1048938 Iustin Pop
    """Retrieve (a part of) the execution log.
107 f1048938 Iustin Pop

108 f1048938 Iustin Pop
    """
109 85f03e0d Michael Hanselmann
    self._log_lock.acquire()
110 85f03e0d Michael Hanselmann
    try:
111 85f03e0d Michael Hanselmann
      return self.log[start_at:]
112 85f03e0d Michael Hanselmann
    finally:
113 85f03e0d Michael Hanselmann
      self._log_lock.release()
114 f1048938 Iustin Pop
115 e2715f69 Michael Hanselmann
116 e2715f69 Michael Hanselmann
class _QueuedJob(object):
117 e2715f69 Michael Hanselmann
  """In-memory job representation.
118 e2715f69 Michael Hanselmann

119 e2715f69 Michael Hanselmann
  This is what we use to track the user-submitted jobs.
120 e2715f69 Michael Hanselmann

121 e2715f69 Michael Hanselmann
  """
122 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
123 e2715f69 Michael Hanselmann
    if not ops:
124 e2715f69 Michael Hanselmann
      # TODO
125 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
126 e2715f69 Michael Hanselmann
127 85f03e0d Michael Hanselmann
    self.queue = queue
128 f1da30e6 Michael Hanselmann
    self.id = job_id
129 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
130 85f03e0d Michael Hanselmann
    self.run_op_index = -1
131 f1da30e6 Michael Hanselmann
132 f1da30e6 Michael Hanselmann
  @classmethod
133 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
134 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
135 85f03e0d Michael Hanselmann
    obj.queue = queue
136 85f03e0d Michael Hanselmann
    obj.id = state["id"]
137 85f03e0d Michael Hanselmann
    obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
138 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
139 f1da30e6 Michael Hanselmann
    return obj
140 f1da30e6 Michael Hanselmann
141 f1da30e6 Michael Hanselmann
  def Serialize(self):
142 f1da30e6 Michael Hanselmann
    return {
143 f1da30e6 Michael Hanselmann
      "id": self.id,
144 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
145 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
146 f1da30e6 Michael Hanselmann
      }
147 f1da30e6 Michael Hanselmann
148 85f03e0d Michael Hanselmann
  def CalcStatus(self):
149 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
150 e2715f69 Michael Hanselmann
151 e2715f69 Michael Hanselmann
    all_success = True
152 85f03e0d Michael Hanselmann
    for op in self.ops:
153 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
154 e2715f69 Michael Hanselmann
        continue
155 e2715f69 Michael Hanselmann
156 e2715f69 Michael Hanselmann
      all_success = False
157 e2715f69 Michael Hanselmann
158 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
159 e2715f69 Michael Hanselmann
        pass
160 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
161 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
162 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
163 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
164 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
165 f1da30e6 Michael Hanselmann
        break
166 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
167 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
168 4cb1d919 Michael Hanselmann
        break
169 e2715f69 Michael Hanselmann
170 e2715f69 Michael Hanselmann
    if all_success:
171 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
172 e2715f69 Michael Hanselmann
173 e2715f69 Michael Hanselmann
    return status
174 e2715f69 Michael Hanselmann
175 f1048938 Iustin Pop
176 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
177 85f03e0d Michael Hanselmann
  def RunTask(self, job):
178 e2715f69 Michael Hanselmann
    """Job executor.
179 e2715f69 Michael Hanselmann

180 85f03e0d Michael Hanselmann
    This functions processes a job.
181 e2715f69 Michael Hanselmann

182 e2715f69 Michael Hanselmann
    """
183 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
184 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
185 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
186 85f03e0d Michael Hanselmann
    queue = job.queue
187 e2715f69 Michael Hanselmann
    try:
188 85f03e0d Michael Hanselmann
      try:
189 85f03e0d Michael Hanselmann
        count = len(job.ops)
190 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
191 85f03e0d Michael Hanselmann
          try:
192 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
193 85f03e0d Michael Hanselmann
194 85f03e0d Michael Hanselmann
            queue.acquire()
195 85f03e0d Michael Hanselmann
            try:
196 85f03e0d Michael Hanselmann
              job.run_op_index = idx
197 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_RUNNING
198 85f03e0d Michael Hanselmann
              op.result = None
199 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
200 85f03e0d Michael Hanselmann
201 38206f3c Iustin Pop
              input_opcode = op.input
202 85f03e0d Michael Hanselmann
            finally:
203 85f03e0d Michael Hanselmann
              queue.release()
204 85f03e0d Michael Hanselmann
205 38206f3c Iustin Pop
            result = proc.ExecOpCode(input_opcode, op.Log)
206 85f03e0d Michael Hanselmann
207 85f03e0d Michael Hanselmann
            queue.acquire()
208 85f03e0d Michael Hanselmann
            try:
209 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
210 85f03e0d Michael Hanselmann
              op.result = result
211 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
212 85f03e0d Michael Hanselmann
            finally:
213 85f03e0d Michael Hanselmann
              queue.release()
214 85f03e0d Michael Hanselmann
215 85f03e0d Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
216 85f03e0d Michael Hanselmann
                          idx + 1, count, op)
217 85f03e0d Michael Hanselmann
          except Exception, err:
218 85f03e0d Michael Hanselmann
            queue.acquire()
219 85f03e0d Michael Hanselmann
            try:
220 85f03e0d Michael Hanselmann
              try:
221 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
222 85f03e0d Michael Hanselmann
                op.result = str(err)
223 85f03e0d Michael Hanselmann
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
224 85f03e0d Michael Hanselmann
              finally:
225 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
226 85f03e0d Michael Hanselmann
            finally:
227 85f03e0d Michael Hanselmann
              queue.release()
228 85f03e0d Michael Hanselmann
            raise
229 85f03e0d Michael Hanselmann
230 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
231 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
232 85f03e0d Michael Hanselmann
      except:
233 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
234 e2715f69 Michael Hanselmann
    finally:
235 85f03e0d Michael Hanselmann
      queue.acquire()
236 85f03e0d Michael Hanselmann
      try:
237 85f03e0d Michael Hanselmann
        job_id = job.id
238 85f03e0d Michael Hanselmann
        status = job.CalcStatus()
239 85f03e0d Michael Hanselmann
      finally:
240 85f03e0d Michael Hanselmann
        queue.release()
241 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
242 85f03e0d Michael Hanselmann
                    self.worker_id, job_id, status)
243 e2715f69 Michael Hanselmann
244 e2715f69 Michael Hanselmann
245 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
246 5bdce580 Michael Hanselmann
  def __init__(self, queue):
247 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
248 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
249 5bdce580 Michael Hanselmann
    self.queue = queue
250 e2715f69 Michael Hanselmann
251 e2715f69 Michael Hanselmann
252 85f03e0d Michael Hanselmann
class JobQueue(object):
253 bac5ffc3 Oleksiy Mishchenko
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
254 f1da30e6 Michael Hanselmann
255 db37da70 Michael Hanselmann
  def _RequireOpenQueue(fn):
256 db37da70 Michael Hanselmann
    """Decorator for "public" functions.
257 db37da70 Michael Hanselmann

258 db37da70 Michael Hanselmann
    This function should be used for all "public" functions. That is, functions
259 db37da70 Michael Hanselmann
    usually called from other classes.
260 db37da70 Michael Hanselmann

261 db37da70 Michael Hanselmann
    Important: Use this decorator only after utils.LockedMethod!
262 db37da70 Michael Hanselmann

263 db37da70 Michael Hanselmann
    Example:
264 db37da70 Michael Hanselmann
      @utils.LockedMethod
265 db37da70 Michael Hanselmann
      @_RequireOpenQueue
266 db37da70 Michael Hanselmann
      def Example(self):
267 db37da70 Michael Hanselmann
        pass
268 db37da70 Michael Hanselmann

269 db37da70 Michael Hanselmann
    """
270 db37da70 Michael Hanselmann
    def wrapper(self, *args, **kwargs):
271 04ab05ce Michael Hanselmann
      assert self._queue_lock is not None, "Queue should be open"
272 db37da70 Michael Hanselmann
      return fn(self, *args, **kwargs)
273 db37da70 Michael Hanselmann
    return wrapper
274 db37da70 Michael Hanselmann
275 85f03e0d Michael Hanselmann
  def __init__(self, context):
276 5bdce580 Michael Hanselmann
    self.context = context
277 ac0930b9 Iustin Pop
    self._memcache = {}
278 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
279 f1da30e6 Michael Hanselmann
280 85f03e0d Michael Hanselmann
    # Locking
281 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
282 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
283 85f03e0d Michael Hanselmann
    self.release = self._lock.release
284 85f03e0d Michael Hanselmann
285 04ab05ce Michael Hanselmann
    # Initialize
286 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
287 f1da30e6 Michael Hanselmann
288 04ab05ce Michael Hanselmann
    # Read serial file
289 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
290 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
291 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
292 c4beba1c Iustin Pop
293 23752136 Michael Hanselmann
    # Get initial list of nodes
294 8e00939c Michael Hanselmann
    self._nodes = set(self.context.cfg.GetNodeList())
295 8e00939c Michael Hanselmann
296 8e00939c Michael Hanselmann
    # Remove master node
297 8e00939c Michael Hanselmann
    try:
298 8e00939c Michael Hanselmann
      self._nodes.remove(self._my_hostname)
299 8e00939c Michael Hanselmann
    except ValueError:
300 8e00939c Michael Hanselmann
      pass
301 23752136 Michael Hanselmann
302 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
303 23752136 Michael Hanselmann
304 85f03e0d Michael Hanselmann
    # Setup worker pool
305 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
306 85f03e0d Michael Hanselmann
307 85f03e0d Michael Hanselmann
    # We need to lock here because WorkerPool.AddTask() may start a job while
308 85f03e0d Michael Hanselmann
    # we're still doing our work.
309 85f03e0d Michael Hanselmann
    self.acquire()
310 85f03e0d Michael Hanselmann
    try:
311 85f03e0d Michael Hanselmann
      for job in self._GetJobsUnlocked(None):
312 85f03e0d Michael Hanselmann
        status = job.CalcStatus()
313 85f03e0d Michael Hanselmann
314 85f03e0d Michael Hanselmann
        if status in (constants.JOB_STATUS_QUEUED, ):
315 85f03e0d Michael Hanselmann
          self._wpool.AddTask(job)
316 85f03e0d Michael Hanselmann
317 85f03e0d Michael Hanselmann
        elif status in (constants.JOB_STATUS_RUNNING, ):
318 85f03e0d Michael Hanselmann
          logging.warning("Unfinished job %s found: %s", job.id, job)
319 85f03e0d Michael Hanselmann
          try:
320 85f03e0d Michael Hanselmann
            for op in job.ops:
321 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_ERROR
322 85f03e0d Michael Hanselmann
              op.result = "Unclean master daemon shutdown"
323 85f03e0d Michael Hanselmann
          finally:
324 85f03e0d Michael Hanselmann
            self.UpdateJobUnlocked(job)
325 85f03e0d Michael Hanselmann
    finally:
326 85f03e0d Michael Hanselmann
      self.release()
327 85f03e0d Michael Hanselmann
328 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
329 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
330 d2e03a33 Michael Hanselmann
  def AddNode(self, node_name):
331 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
332 23752136 Michael Hanselmann
333 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
334 9f774ee8 Michael Hanselmann
    rpc.call_jobqueue_purge(node_name)
335 23752136 Michael Hanselmann
336 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
337 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
338 23752136 Michael Hanselmann
339 d2e03a33 Michael Hanselmann
    # Upload current serial file
340 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
341 d2e03a33 Michael Hanselmann
342 d2e03a33 Michael Hanselmann
    for file_name in files:
343 9f774ee8 Michael Hanselmann
      # Read file content
344 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
345 9f774ee8 Michael Hanselmann
      try:
346 9f774ee8 Michael Hanselmann
        content = fd.read()
347 9f774ee8 Michael Hanselmann
      finally:
348 9f774ee8 Michael Hanselmann
        fd.close()
349 9f774ee8 Michael Hanselmann
350 9f774ee8 Michael Hanselmann
      result = rpc.call_jobqueue_update([node_name], file_name, content)
351 d2e03a33 Michael Hanselmann
      if not result[node_name]:
352 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
353 d2e03a33 Michael Hanselmann
354 d2e03a33 Michael Hanselmann
    self._nodes.add(node_name)
355 d2e03a33 Michael Hanselmann
356 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
357 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
358 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
359 23752136 Michael Hanselmann
    try:
360 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
361 d2e03a33 Michael Hanselmann
      self._nodes.remove(node_name)
362 d2e03a33 Michael Hanselmann
    except KeyError:
363 23752136 Michael Hanselmann
      pass
364 23752136 Michael Hanselmann
365 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
366 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
367 8e00939c Michael Hanselmann

368 8e00939c Michael Hanselmann
    """
369 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
370 8e00939c Michael Hanselmann
371 23752136 Michael Hanselmann
    failed_nodes = 0
372 9f774ee8 Michael Hanselmann
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
373 8e00939c Michael Hanselmann
    for node in self._nodes:
374 23752136 Michael Hanselmann
      if not result[node]:
375 23752136 Michael Hanselmann
        failed_nodes += 1
376 23752136 Michael Hanselmann
        logging.error("Copy of job queue file to node %s failed", node)
377 23752136 Michael Hanselmann
378 23752136 Michael Hanselmann
    # TODO: check failed_nodes
379 23752136 Michael Hanselmann
380 abc1f2ce Michael Hanselmann
  def _RenameFileUnlocked(self, old, new):
381 abc1f2ce Michael Hanselmann
    os.rename(old, new)
382 abc1f2ce Michael Hanselmann
383 abc1f2ce Michael Hanselmann
    result = rpc.call_jobqueue_rename(self._nodes, old, new)
384 abc1f2ce Michael Hanselmann
    for node in self._nodes:
385 abc1f2ce Michael Hanselmann
      if not result[node]:
386 abc1f2ce Michael Hanselmann
        logging.error("Moving %s to %s failed on %s", old, new, node)
387 abc1f2ce Michael Hanselmann
388 abc1f2ce Michael Hanselmann
    # TODO: check failed nodes
389 abc1f2ce Michael Hanselmann
390 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
391 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
392 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
393 85f03e0d Michael Hanselmann
    if job_id < 0:
394 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
395 85f03e0d Michael Hanselmann
396 85f03e0d Michael Hanselmann
    return str(job_id)
397 85f03e0d Michael Hanselmann
398 4c848b18 Michael Hanselmann
  def _NewSerialUnlocked(self):
399 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
400 f1da30e6 Michael Hanselmann

401 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
402 f1da30e6 Michael Hanselmann

403 f1da30e6 Michael Hanselmann
    Returns: A string representing the job identifier.
404 f1da30e6 Michael Hanselmann

405 f1da30e6 Michael Hanselmann
    """
406 f1da30e6 Michael Hanselmann
    # New number
407 f1da30e6 Michael Hanselmann
    serial = self._last_serial + 1
408 f1da30e6 Michael Hanselmann
409 f1da30e6 Michael Hanselmann
    # Write to file
410 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
411 23752136 Michael Hanselmann
                                        "%s\n" % serial)
412 f1da30e6 Michael Hanselmann
413 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
414 f1da30e6 Michael Hanselmann
    self._last_serial = serial
415 f1da30e6 Michael Hanselmann
416 85f03e0d Michael Hanselmann
    return self._FormatJobID(serial)
417 f1da30e6 Michael Hanselmann
418 85f03e0d Michael Hanselmann
  @staticmethod
419 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
420 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
421 f1da30e6 Michael Hanselmann
422 85f03e0d Michael Hanselmann
  @staticmethod
423 85f03e0d Michael Hanselmann
  def _GetArchivedJobPath(job_id):
424 0cb94105 Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
425 0cb94105 Michael Hanselmann
426 85f03e0d Michael Hanselmann
  @classmethod
427 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
428 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
429 fae737ac Michael Hanselmann
    if m:
430 fae737ac Michael Hanselmann
      return m.group(1)
431 fae737ac Michael Hanselmann
    else:
432 fae737ac Michael Hanselmann
      return None
433 fae737ac Michael Hanselmann
434 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
435 911a495b Iustin Pop
    """Return all known job IDs.
436 911a495b Iustin Pop

437 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
438 911a495b Iustin Pop
    included. Currently this argument is unused.
439 911a495b Iustin Pop

440 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
441 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
442 ac0930b9 Iustin Pop
    extra IDs).
443 ac0930b9 Iustin Pop

444 911a495b Iustin Pop
    """
445 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
446 f0d874fe Iustin Pop
    jlist.sort()
447 f0d874fe Iustin Pop
    return jlist
448 911a495b Iustin Pop
449 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
450 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
451 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
452 f1da30e6 Michael Hanselmann
453 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
454 ac0930b9 Iustin Pop
    if job_id in self._memcache:
455 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
456 ac0930b9 Iustin Pop
      return self._memcache[job_id]
457 ac0930b9 Iustin Pop
458 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
459 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
460 f1da30e6 Michael Hanselmann
    try:
461 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
462 f1da30e6 Michael Hanselmann
    except IOError, err:
463 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
464 f1da30e6 Michael Hanselmann
        return None
465 f1da30e6 Michael Hanselmann
      raise
466 f1da30e6 Michael Hanselmann
    try:
467 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
468 f1da30e6 Michael Hanselmann
    finally:
469 f1da30e6 Michael Hanselmann
      fd.close()
470 f1da30e6 Michael Hanselmann
471 ac0930b9 Iustin Pop
    job = _QueuedJob.Restore(self, data)
472 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
473 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
474 ac0930b9 Iustin Pop
    return job
475 f1da30e6 Michael Hanselmann
476 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
477 911a495b Iustin Pop
    if not job_ids:
478 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
479 f1da30e6 Michael Hanselmann
480 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
481 f1da30e6 Michael Hanselmann
482 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
483 db37da70 Michael Hanselmann
  @_RequireOpenQueue
484 4c848b18 Michael Hanselmann
  def SubmitJob(self, ops):
485 85f03e0d Michael Hanselmann
    """Create and store a new job.
486 f1da30e6 Michael Hanselmann

487 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
488 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
489 c3f0a12f Iustin Pop

490 c3f0a12f Iustin Pop
    @type ops: list
491 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
492 c3f0a12f Iustin Pop

493 c3f0a12f Iustin Pop
    """
494 f1da30e6 Michael Hanselmann
    # Get job identifier
495 4c848b18 Michael Hanselmann
    job_id = self._NewSerialUnlocked()
496 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
497 f1da30e6 Michael Hanselmann
498 f1da30e6 Michael Hanselmann
    # Write to disk
499 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
500 f1da30e6 Michael Hanselmann
501 205d71fd Michael Hanselmann
    logging.debug("Added new job %s to the cache", job_id)
502 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
503 ac0930b9 Iustin Pop
504 85f03e0d Michael Hanselmann
    # Add to worker pool
505 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
506 85f03e0d Michael Hanselmann
507 85f03e0d Michael Hanselmann
    return job.id
508 f1da30e6 Michael Hanselmann
509 db37da70 Michael Hanselmann
  @_RequireOpenQueue
510 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
511 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
512 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
513 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
514 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
515 57f8615f Michael Hanselmann
    self._CleanCacheUnlocked([job.id])
516 ac0930b9 Iustin Pop
517 57f8615f Michael Hanselmann
  def _CleanCacheUnlocked(self, exclude):
518 ac0930b9 Iustin Pop
    """Clean the memory cache.
519 ac0930b9 Iustin Pop

520 ac0930b9 Iustin Pop
    The exceptions argument contains job IDs that should not be
521 ac0930b9 Iustin Pop
    cleaned.
522 ac0930b9 Iustin Pop

523 ac0930b9 Iustin Pop
    """
524 57f8615f Michael Hanselmann
    assert isinstance(exclude, list)
525 85f03e0d Michael Hanselmann
526 ac0930b9 Iustin Pop
    for job in self._memcache.values():
527 57f8615f Michael Hanselmann
      if job.id in exclude:
528 ac0930b9 Iustin Pop
        continue
529 85f03e0d Michael Hanselmann
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
530 85f03e0d Michael Hanselmann
                                  constants.JOB_STATUS_RUNNING):
531 205d71fd Michael Hanselmann
        logging.debug("Cleaning job %s from the cache", job.id)
532 ac0930b9 Iustin Pop
        try:
533 ac0930b9 Iustin Pop
          del self._memcache[job.id]
534 ac0930b9 Iustin Pop
        except KeyError:
535 ac0930b9 Iustin Pop
          pass
536 f1da30e6 Michael Hanselmann
537 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
538 db37da70 Michael Hanselmann
  @_RequireOpenQueue
539 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
540 188c5e0a Michael Hanselmann
    """Cancels a job.
541 188c5e0a Michael Hanselmann

542 188c5e0a Michael Hanselmann
    @type job_id: string
543 188c5e0a Michael Hanselmann
    @param job_id: Job ID of job to be cancelled.
544 188c5e0a Michael Hanselmann

545 188c5e0a Michael Hanselmann
    """
546 188c5e0a Michael Hanselmann
    logging.debug("Cancelling job %s", job_id)
547 188c5e0a Michael Hanselmann
548 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
549 188c5e0a Michael Hanselmann
    if not job:
550 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
551 188c5e0a Michael Hanselmann
      return
552 188c5e0a Michael Hanselmann
553 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
554 188c5e0a Michael Hanselmann
      logging.debug("Job %s is no longer in the queue", job.id)
555 188c5e0a Michael Hanselmann
      return
556 188c5e0a Michael Hanselmann
557 85f03e0d Michael Hanselmann
    try:
558 85f03e0d Michael Hanselmann
      for op in job.ops:
559 85f03e0d Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
560 85f03e0d Michael Hanselmann
        op.result = "Job cancelled by request"
561 85f03e0d Michael Hanselmann
    finally:
562 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
563 188c5e0a Michael Hanselmann
564 c609f802 Michael Hanselmann
  @utils.LockedMethod
565 db37da70 Michael Hanselmann
  @_RequireOpenQueue
566 f1da30e6 Michael Hanselmann
  def ArchiveJob(self, job_id):
567 c609f802 Michael Hanselmann
    """Archives a job.
568 c609f802 Michael Hanselmann

569 c609f802 Michael Hanselmann
    @type job_id: string
570 c609f802 Michael Hanselmann
    @param job_id: Job ID of job to be archived.
571 c609f802 Michael Hanselmann

572 c609f802 Michael Hanselmann
    """
573 c609f802 Michael Hanselmann
    logging.debug("Archiving job %s", job_id)
574 c609f802 Michael Hanselmann
575 c609f802 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
576 c609f802 Michael Hanselmann
    if not job:
577 c609f802 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
578 c609f802 Michael Hanselmann
      return
579 c609f802 Michael Hanselmann
580 85f03e0d Michael Hanselmann
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
581 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_SUCCESS,
582 85f03e0d Michael Hanselmann
                                constants.JOB_STATUS_ERROR):
583 85f03e0d Michael Hanselmann
      logging.debug("Job %s is not yet done", job.id)
584 c609f802 Michael Hanselmann
      return
585 c609f802 Michael Hanselmann
586 c609f802 Michael Hanselmann
    try:
587 c609f802 Michael Hanselmann
      old = self._GetJobPath(job.id)
588 c609f802 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
589 c609f802 Michael Hanselmann
590 abc1f2ce Michael Hanselmann
      self._RenameFileUnlocked(old, new)
591 c609f802 Michael Hanselmann
592 c609f802 Michael Hanselmann
      logging.debug("Successfully archived job %s", job.id)
593 c609f802 Michael Hanselmann
    finally:
594 c609f802 Michael Hanselmann
      # Cleaning the cache because we don't know what os.rename actually did
595 c609f802 Michael Hanselmann
      # and to be on the safe side.
596 c609f802 Michael Hanselmann
      self._CleanCacheUnlocked([])
597 f1da30e6 Michael Hanselmann
598 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
599 e2715f69 Michael Hanselmann
    row = []
600 e2715f69 Michael Hanselmann
    for fname in fields:
601 e2715f69 Michael Hanselmann
      if fname == "id":
602 e2715f69 Michael Hanselmann
        row.append(job.id)
603 e2715f69 Michael Hanselmann
      elif fname == "status":
604 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
605 af30b2fd Michael Hanselmann
      elif fname == "ops":
606 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
607 af30b2fd Michael Hanselmann
      elif fname == "opresult":
608 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
609 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
610 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
611 f1048938 Iustin Pop
      elif fname == "ticker":
612 85f03e0d Michael Hanselmann
        ji = job.run_op_index
613 f1048938 Iustin Pop
        if ji < 0:
614 f1048938 Iustin Pop
          lmsg = None
615 f1048938 Iustin Pop
        else:
616 85f03e0d Michael Hanselmann
          lmsg = job.ops[ji].RetrieveLog(-1)
617 f1048938 Iustin Pop
          # message might be empty here
618 f1048938 Iustin Pop
          if lmsg:
619 f1048938 Iustin Pop
            lmsg = lmsg[0]
620 f1048938 Iustin Pop
          else:
621 f1048938 Iustin Pop
            lmsg = None
622 f1048938 Iustin Pop
        row.append(lmsg)
623 e2715f69 Michael Hanselmann
      else:
624 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
625 e2715f69 Michael Hanselmann
    return row
626 e2715f69 Michael Hanselmann
627 85f03e0d Michael Hanselmann
  @utils.LockedMethod
628 db37da70 Michael Hanselmann
  @_RequireOpenQueue
629 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
630 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
631 e2715f69 Michael Hanselmann

632 e2715f69 Michael Hanselmann
    Args:
633 e2715f69 Michael Hanselmann
    - job_ids: Sequence of job identifiers or None for all
634 e2715f69 Michael Hanselmann
    - fields: Names of fields to return
635 e2715f69 Michael Hanselmann

636 e2715f69 Michael Hanselmann
    """
637 85f03e0d Michael Hanselmann
    jobs = []
638 e2715f69 Michael Hanselmann
639 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
640 85f03e0d Michael Hanselmann
      if job is None:
641 85f03e0d Michael Hanselmann
        jobs.append(None)
642 85f03e0d Michael Hanselmann
      else:
643 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
644 e2715f69 Michael Hanselmann
645 85f03e0d Michael Hanselmann
    return jobs
646 e2715f69 Michael Hanselmann
647 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
648 db37da70 Michael Hanselmann
  @_RequireOpenQueue
649 e2715f69 Michael Hanselmann
  def Shutdown(self):
650 e2715f69 Michael Hanselmann
    """Stops the job queue.
651 e2715f69 Michael Hanselmann

652 e2715f69 Michael Hanselmann
    """
653 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
654 85f03e0d Michael Hanselmann
655 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
656 04ab05ce Michael Hanselmann
    self._queue_lock = None