Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ caeffaa0

History | View | Annotate | Download (40.7 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 5685c1a5 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 6c5a7090 Michael Hanselmann
"""Module implementing the job queue handling.
23 6c5a7090 Michael Hanselmann

24 ea03467c Iustin Pop
Locking: there's a single, large lock in the L{JobQueue} class. It's
25 ea03467c Iustin Pop
used by all other classes in this module.
26 ea03467c Iustin Pop

27 ea03467c Iustin Pop
@var JOBQUEUE_THREADS: the number of worker threads we start for
28 ea03467c Iustin Pop
    processing jobs
29 6c5a7090 Michael Hanselmann

30 6c5a7090 Michael Hanselmann
"""
31 498ae1cc Iustin Pop
32 f1da30e6 Michael Hanselmann
import os
33 e2715f69 Michael Hanselmann
import logging
34 e2715f69 Michael Hanselmann
import threading
35 f1da30e6 Michael Hanselmann
import errno
36 f1da30e6 Michael Hanselmann
import re
37 f1048938 Iustin Pop
import time
38 5685c1a5 Michael Hanselmann
import weakref
39 498ae1cc Iustin Pop
40 e2715f69 Michael Hanselmann
from ganeti import constants
41 f1da30e6 Michael Hanselmann
from ganeti import serializer
42 e2715f69 Michael Hanselmann
from ganeti import workerpool
43 f1da30e6 Michael Hanselmann
from ganeti import opcodes
44 7a1ecaed Iustin Pop
from ganeti import errors
45 e2715f69 Michael Hanselmann
from ganeti import mcpu
46 7996a135 Iustin Pop
from ganeti import utils
47 04ab05ce Michael Hanselmann
from ganeti import jstore
48 c3f0a12f Iustin Pop
from ganeti import rpc
49 e2715f69 Michael Hanselmann
50 fbf0262f Michael Hanselmann
51 1daae384 Iustin Pop
JOBQUEUE_THREADS = 25
52 58b22b6e Michael Hanselmann
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53 e2715f69 Michael Hanselmann
54 498ae1cc Iustin Pop
55 9728ae5d Iustin Pop
class CancelJob(Exception):
56 fbf0262f Michael Hanselmann
  """Special exception to cancel a job.
57 fbf0262f Michael Hanselmann

58 fbf0262f Michael Hanselmann
  """
59 fbf0262f Michael Hanselmann
60 fbf0262f Michael Hanselmann
61 70552c46 Michael Hanselmann
def TimeStampNow():
62 ea03467c Iustin Pop
  """Returns the current timestamp.
63 ea03467c Iustin Pop

64 ea03467c Iustin Pop
  @rtype: tuple
65 ea03467c Iustin Pop
  @return: the current time in the (seconds, microseconds) format
66 ea03467c Iustin Pop

67 ea03467c Iustin Pop
  """
68 70552c46 Michael Hanselmann
  return utils.SplitTime(time.time())
69 70552c46 Michael Hanselmann
70 70552c46 Michael Hanselmann
71 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
72 5bbd3f7f Michael Hanselmann
  """Encapsulates an opcode object.
73 e2715f69 Michael Hanselmann

74 ea03467c Iustin Pop
  @ivar log: holds the execution log and consists of tuples
75 ea03467c Iustin Pop
  of the form C{(log_serial, timestamp, level, message)}
76 ea03467c Iustin Pop
  @ivar input: the OpCode we encapsulate
77 ea03467c Iustin Pop
  @ivar status: the current status
78 ea03467c Iustin Pop
  @ivar result: the result of the LU execution
79 ea03467c Iustin Pop
  @ivar start_timestamp: timestamp for the start of the execution
80 ea03467c Iustin Pop
  @ivar stop_timestamp: timestamp for the end of the execution
81 f1048938 Iustin Pop

82 e2715f69 Michael Hanselmann
  """
83 66d895a8 Iustin Pop
  __slots__ = ["input", "status", "result", "log",
84 66d895a8 Iustin Pop
               "start_timestamp", "end_timestamp",
85 66d895a8 Iustin Pop
               "__weakref__"]
86 66d895a8 Iustin Pop
87 85f03e0d Michael Hanselmann
  def __init__(self, op):
88 ea03467c Iustin Pop
    """Constructor for the _QuededOpCode.
89 ea03467c Iustin Pop

90 ea03467c Iustin Pop
    @type op: L{opcodes.OpCode}
91 ea03467c Iustin Pop
    @param op: the opcode we encapsulate
92 ea03467c Iustin Pop

93 ea03467c Iustin Pop
    """
94 85f03e0d Michael Hanselmann
    self.input = op
95 85f03e0d Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
96 85f03e0d Michael Hanselmann
    self.result = None
97 85f03e0d Michael Hanselmann
    self.log = []
98 70552c46 Michael Hanselmann
    self.start_timestamp = None
99 70552c46 Michael Hanselmann
    self.end_timestamp = None
100 f1da30e6 Michael Hanselmann
101 f1da30e6 Michael Hanselmann
  @classmethod
102 f1da30e6 Michael Hanselmann
  def Restore(cls, state):
103 ea03467c Iustin Pop
    """Restore the _QueuedOpCode from the serialized form.
104 ea03467c Iustin Pop

105 ea03467c Iustin Pop
    @type state: dict
106 ea03467c Iustin Pop
    @param state: the serialized state
107 ea03467c Iustin Pop
    @rtype: _QueuedOpCode
108 ea03467c Iustin Pop
    @return: a new _QueuedOpCode instance
109 ea03467c Iustin Pop

110 ea03467c Iustin Pop
    """
111 85f03e0d Michael Hanselmann
    obj = _QueuedOpCode.__new__(cls)
112 85f03e0d Michael Hanselmann
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113 85f03e0d Michael Hanselmann
    obj.status = state["status"]
114 85f03e0d Michael Hanselmann
    obj.result = state["result"]
115 85f03e0d Michael Hanselmann
    obj.log = state["log"]
116 70552c46 Michael Hanselmann
    obj.start_timestamp = state.get("start_timestamp", None)
117 70552c46 Michael Hanselmann
    obj.end_timestamp = state.get("end_timestamp", None)
118 f1da30e6 Michael Hanselmann
    return obj
119 f1da30e6 Michael Hanselmann
120 f1da30e6 Michael Hanselmann
  def Serialize(self):
121 ea03467c Iustin Pop
    """Serializes this _QueuedOpCode.
122 ea03467c Iustin Pop

123 ea03467c Iustin Pop
    @rtype: dict
124 ea03467c Iustin Pop
    @return: the dictionary holding the serialized state
125 ea03467c Iustin Pop

126 ea03467c Iustin Pop
    """
127 6c5a7090 Michael Hanselmann
    return {
128 6c5a7090 Michael Hanselmann
      "input": self.input.__getstate__(),
129 6c5a7090 Michael Hanselmann
      "status": self.status,
130 6c5a7090 Michael Hanselmann
      "result": self.result,
131 6c5a7090 Michael Hanselmann
      "log": self.log,
132 70552c46 Michael Hanselmann
      "start_timestamp": self.start_timestamp,
133 70552c46 Michael Hanselmann
      "end_timestamp": self.end_timestamp,
134 6c5a7090 Michael Hanselmann
      }
135 f1048938 Iustin Pop
136 e2715f69 Michael Hanselmann
137 e2715f69 Michael Hanselmann
class _QueuedJob(object):
138 e2715f69 Michael Hanselmann
  """In-memory job representation.
139 e2715f69 Michael Hanselmann

140 ea03467c Iustin Pop
  This is what we use to track the user-submitted jobs. Locking must
141 ea03467c Iustin Pop
  be taken care of by users of this class.
142 ea03467c Iustin Pop

143 ea03467c Iustin Pop
  @type queue: L{JobQueue}
144 ea03467c Iustin Pop
  @ivar queue: the parent queue
145 ea03467c Iustin Pop
  @ivar id: the job ID
146 ea03467c Iustin Pop
  @type ops: list
147 ea03467c Iustin Pop
  @ivar ops: the list of _QueuedOpCode that constitute the job
148 ea03467c Iustin Pop
  @type run_op_index: int
149 ea03467c Iustin Pop
  @ivar run_op_index: the currently executing opcode, or -1 if
150 ea03467c Iustin Pop
      we didn't yet start executing
151 ea03467c Iustin Pop
  @type log_serial: int
152 ea03467c Iustin Pop
  @ivar log_serial: holds the index for the next log entry
153 ea03467c Iustin Pop
  @ivar received_timestamp: the timestamp for when the job was received
154 ea03467c Iustin Pop
  @ivar start_timestmap: the timestamp for start of execution
155 ea03467c Iustin Pop
  @ivar end_timestamp: the timestamp for end of execution
156 ea03467c Iustin Pop
  @ivar change: a Condition variable we use for waiting for job changes
157 e2715f69 Michael Hanselmann

158 e2715f69 Michael Hanselmann
  """
159 66d895a8 Iustin Pop
  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160 66d895a8 Iustin Pop
               "received_timestamp", "start_timestamp", "end_timestamp",
161 66d895a8 Iustin Pop
               "change",
162 66d895a8 Iustin Pop
               "__weakref__"]
163 66d895a8 Iustin Pop
164 85f03e0d Michael Hanselmann
  def __init__(self, queue, job_id, ops):
165 ea03467c Iustin Pop
    """Constructor for the _QueuedJob.
166 ea03467c Iustin Pop

167 ea03467c Iustin Pop
    @type queue: L{JobQueue}
168 ea03467c Iustin Pop
    @param queue: our parent queue
169 ea03467c Iustin Pop
    @type job_id: job_id
170 ea03467c Iustin Pop
    @param job_id: our job id
171 ea03467c Iustin Pop
    @type ops: list
172 ea03467c Iustin Pop
    @param ops: the list of opcodes we hold, which will be encapsulated
173 ea03467c Iustin Pop
        in _QueuedOpCodes
174 ea03467c Iustin Pop

175 ea03467c Iustin Pop
    """
176 e2715f69 Michael Hanselmann
    if not ops:
177 ea03467c Iustin Pop
      # TODO: use a better exception
178 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
179 e2715f69 Michael Hanselmann
180 85f03e0d Michael Hanselmann
    self.queue = queue
181 f1da30e6 Michael Hanselmann
    self.id = job_id
182 85f03e0d Michael Hanselmann
    self.ops = [_QueuedOpCode(op) for op in ops]
183 85f03e0d Michael Hanselmann
    self.run_op_index = -1
184 6c5a7090 Michael Hanselmann
    self.log_serial = 0
185 c56ec146 Iustin Pop
    self.received_timestamp = TimeStampNow()
186 c56ec146 Iustin Pop
    self.start_timestamp = None
187 c56ec146 Iustin Pop
    self.end_timestamp = None
188 6c5a7090 Michael Hanselmann
189 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
190 6c5a7090 Michael Hanselmann
    self.change = threading.Condition(self.queue._lock)
191 f1da30e6 Michael Hanselmann
192 f1da30e6 Michael Hanselmann
  @classmethod
193 85f03e0d Michael Hanselmann
  def Restore(cls, queue, state):
194 ea03467c Iustin Pop
    """Restore a _QueuedJob from serialized state:
195 ea03467c Iustin Pop

196 ea03467c Iustin Pop
    @type queue: L{JobQueue}
197 ea03467c Iustin Pop
    @param queue: to which queue the restored job belongs
198 ea03467c Iustin Pop
    @type state: dict
199 ea03467c Iustin Pop
    @param state: the serialized state
200 ea03467c Iustin Pop
    @rtype: _JobQueue
201 ea03467c Iustin Pop
    @return: the restored _JobQueue instance
202 ea03467c Iustin Pop

203 ea03467c Iustin Pop
    """
204 85f03e0d Michael Hanselmann
    obj = _QueuedJob.__new__(cls)
205 85f03e0d Michael Hanselmann
    obj.queue = queue
206 85f03e0d Michael Hanselmann
    obj.id = state["id"]
207 85f03e0d Michael Hanselmann
    obj.run_op_index = state["run_op_index"]
208 c56ec146 Iustin Pop
    obj.received_timestamp = state.get("received_timestamp", None)
209 c56ec146 Iustin Pop
    obj.start_timestamp = state.get("start_timestamp", None)
210 c56ec146 Iustin Pop
    obj.end_timestamp = state.get("end_timestamp", None)
211 6c5a7090 Michael Hanselmann
212 6c5a7090 Michael Hanselmann
    obj.ops = []
213 6c5a7090 Michael Hanselmann
    obj.log_serial = 0
214 6c5a7090 Michael Hanselmann
    for op_state in state["ops"]:
215 6c5a7090 Michael Hanselmann
      op = _QueuedOpCode.Restore(op_state)
216 6c5a7090 Michael Hanselmann
      for log_entry in op.log:
217 6c5a7090 Michael Hanselmann
        obj.log_serial = max(obj.log_serial, log_entry[0])
218 6c5a7090 Michael Hanselmann
      obj.ops.append(op)
219 6c5a7090 Michael Hanselmann
220 6c5a7090 Michael Hanselmann
    # Condition to wait for changes
221 6c5a7090 Michael Hanselmann
    obj.change = threading.Condition(obj.queue._lock)
222 6c5a7090 Michael Hanselmann
223 f1da30e6 Michael Hanselmann
    return obj
224 f1da30e6 Michael Hanselmann
225 f1da30e6 Michael Hanselmann
  def Serialize(self):
226 ea03467c Iustin Pop
    """Serialize the _JobQueue instance.
227 ea03467c Iustin Pop

228 ea03467c Iustin Pop
    @rtype: dict
229 ea03467c Iustin Pop
    @return: the serialized state
230 ea03467c Iustin Pop

231 ea03467c Iustin Pop
    """
232 f1da30e6 Michael Hanselmann
    return {
233 f1da30e6 Michael Hanselmann
      "id": self.id,
234 85f03e0d Michael Hanselmann
      "ops": [op.Serialize() for op in self.ops],
235 f1048938 Iustin Pop
      "run_op_index": self.run_op_index,
236 c56ec146 Iustin Pop
      "start_timestamp": self.start_timestamp,
237 c56ec146 Iustin Pop
      "end_timestamp": self.end_timestamp,
238 c56ec146 Iustin Pop
      "received_timestamp": self.received_timestamp,
239 f1da30e6 Michael Hanselmann
      }
240 f1da30e6 Michael Hanselmann
241 85f03e0d Michael Hanselmann
  def CalcStatus(self):
242 ea03467c Iustin Pop
    """Compute the status of this job.
243 ea03467c Iustin Pop

244 ea03467c Iustin Pop
    This function iterates over all the _QueuedOpCodes in the job and
245 ea03467c Iustin Pop
    based on their status, computes the job status.
246 ea03467c Iustin Pop

247 ea03467c Iustin Pop
    The algorithm is:
248 ea03467c Iustin Pop
      - if we find a cancelled, or finished with error, the job
249 ea03467c Iustin Pop
        status will be the same
250 ea03467c Iustin Pop
      - otherwise, the last opcode with the status one of:
251 ea03467c Iustin Pop
          - waitlock
252 fbf0262f Michael Hanselmann
          - canceling
253 ea03467c Iustin Pop
          - running
254 ea03467c Iustin Pop

255 ea03467c Iustin Pop
        will determine the job status
256 ea03467c Iustin Pop

257 ea03467c Iustin Pop
      - otherwise, it means either all opcodes are queued, or success,
258 ea03467c Iustin Pop
        and the job status will be the same
259 ea03467c Iustin Pop

260 ea03467c Iustin Pop
    @return: the job status
261 ea03467c Iustin Pop

262 ea03467c Iustin Pop
    """
263 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
264 e2715f69 Michael Hanselmann
265 e2715f69 Michael Hanselmann
    all_success = True
266 85f03e0d Michael Hanselmann
    for op in self.ops:
267 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
268 e2715f69 Michael Hanselmann
        continue
269 e2715f69 Michael Hanselmann
270 e2715f69 Michael Hanselmann
      all_success = False
271 e2715f69 Michael Hanselmann
272 85f03e0d Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
273 e2715f69 Michael Hanselmann
        pass
274 e92376d7 Iustin Pop
      elif op.status == constants.OP_STATUS_WAITLOCK:
275 e92376d7 Iustin Pop
        status = constants.JOB_STATUS_WAITLOCK
276 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
277 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
278 fbf0262f Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELING:
279 fbf0262f Michael Hanselmann
        status = constants.JOB_STATUS_CANCELING
280 fbf0262f Michael Hanselmann
        break
281 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
282 f1da30e6 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
283 f1da30e6 Michael Hanselmann
        # The whole job fails if one opcode failed
284 f1da30e6 Michael Hanselmann
        break
285 85f03e0d Michael Hanselmann
      elif op.status == constants.OP_STATUS_CANCELED:
286 4cb1d919 Michael Hanselmann
        status = constants.OP_STATUS_CANCELED
287 4cb1d919 Michael Hanselmann
        break
288 e2715f69 Michael Hanselmann
289 e2715f69 Michael Hanselmann
    if all_success:
290 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
291 e2715f69 Michael Hanselmann
292 e2715f69 Michael Hanselmann
    return status
293 e2715f69 Michael Hanselmann
294 6c5a7090 Michael Hanselmann
  def GetLogEntries(self, newer_than):
295 ea03467c Iustin Pop
    """Selectively returns the log entries.
296 ea03467c Iustin Pop

297 ea03467c Iustin Pop
    @type newer_than: None or int
298 5bbd3f7f Michael Hanselmann
    @param newer_than: if this is None, return all log entries,
299 ea03467c Iustin Pop
        otherwise return only the log entries with serial higher
300 ea03467c Iustin Pop
        than this value
301 ea03467c Iustin Pop
    @rtype: list
302 ea03467c Iustin Pop
    @return: the list of the log entries selected
303 ea03467c Iustin Pop

304 ea03467c Iustin Pop
    """
305 6c5a7090 Michael Hanselmann
    if newer_than is None:
306 6c5a7090 Michael Hanselmann
      serial = -1
307 6c5a7090 Michael Hanselmann
    else:
308 6c5a7090 Michael Hanselmann
      serial = newer_than
309 6c5a7090 Michael Hanselmann
310 6c5a7090 Michael Hanselmann
    entries = []
311 6c5a7090 Michael Hanselmann
    for op in self.ops:
312 63712a09 Iustin Pop
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313 6c5a7090 Michael Hanselmann
314 6c5a7090 Michael Hanselmann
    return entries
315 6c5a7090 Michael Hanselmann
316 34327f51 Iustin Pop
  def MarkUnfinishedOps(self, status, result):
317 34327f51 Iustin Pop
    """Mark unfinished opcodes with a given status and result.
318 34327f51 Iustin Pop

319 34327f51 Iustin Pop
    This is an utility function for marking all running or waiting to
320 34327f51 Iustin Pop
    be run opcodes with a given status. Opcodes which are already
321 34327f51 Iustin Pop
    finalised are not changed.
322 34327f51 Iustin Pop

323 34327f51 Iustin Pop
    @param status: a given opcode status
324 34327f51 Iustin Pop
    @param result: the opcode result
325 34327f51 Iustin Pop

326 34327f51 Iustin Pop
    """
327 34327f51 Iustin Pop
    not_marked = True
328 34327f51 Iustin Pop
    for op in self.ops:
329 34327f51 Iustin Pop
      if op.status in constants.OPS_FINALIZED:
330 34327f51 Iustin Pop
        assert not_marked, "Finalized opcodes found after non-finalized ones"
331 34327f51 Iustin Pop
        continue
332 34327f51 Iustin Pop
      op.status = status
333 34327f51 Iustin Pop
      op.result = result
334 34327f51 Iustin Pop
      not_marked = False
335 34327f51 Iustin Pop
336 f1048938 Iustin Pop
337 85f03e0d Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
338 ea03467c Iustin Pop
  """The actual job workers.
339 ea03467c Iustin Pop

340 ea03467c Iustin Pop
  """
341 e92376d7 Iustin Pop
  def _NotifyStart(self):
342 e92376d7 Iustin Pop
    """Mark the opcode as running, not lock-waiting.
343 e92376d7 Iustin Pop

344 e92376d7 Iustin Pop
    This is called from the mcpu code as a notifier function, when the
345 e92376d7 Iustin Pop
    LU is finally about to start the Exec() method. Of course, to have
346 e92376d7 Iustin Pop
    end-user visible results, the opcode must be initially (before
347 e92376d7 Iustin Pop
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
348 e92376d7 Iustin Pop

349 e92376d7 Iustin Pop
    """
350 e92376d7 Iustin Pop
    assert self.queue, "Queue attribute is missing"
351 e92376d7 Iustin Pop
    assert self.opcode, "Opcode attribute is missing"
352 e92376d7 Iustin Pop
353 e92376d7 Iustin Pop
    self.queue.acquire()
354 e92376d7 Iustin Pop
    try:
355 fbf0262f Michael Hanselmann
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
356 fbf0262f Michael Hanselmann
                                    constants.OP_STATUS_CANCELING)
357 fbf0262f Michael Hanselmann
358 fbf0262f Michael Hanselmann
      # Cancel here if we were asked to
359 fbf0262f Michael Hanselmann
      if self.opcode.status == constants.OP_STATUS_CANCELING:
360 fbf0262f Michael Hanselmann
        raise CancelJob()
361 fbf0262f Michael Hanselmann
362 e92376d7 Iustin Pop
      self.opcode.status = constants.OP_STATUS_RUNNING
363 e92376d7 Iustin Pop
    finally:
364 e92376d7 Iustin Pop
      self.queue.release()
365 e92376d7 Iustin Pop
366 85f03e0d Michael Hanselmann
  def RunTask(self, job):
367 e2715f69 Michael Hanselmann
    """Job executor.
368 e2715f69 Michael Hanselmann

369 6c5a7090 Michael Hanselmann
    This functions processes a job. It is closely tied to the _QueuedJob and
370 6c5a7090 Michael Hanselmann
    _QueuedOpCode classes.
371 e2715f69 Michael Hanselmann

372 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
373 ea03467c Iustin Pop
    @param job: the job to be processed
374 ea03467c Iustin Pop

375 e2715f69 Michael Hanselmann
    """
376 d21d09d6 Iustin Pop
    logging.info("Worker %s processing job %s",
377 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
378 5bdce580 Michael Hanselmann
    proc = mcpu.Processor(self.pool.queue.context)
379 e92376d7 Iustin Pop
    self.queue = queue = job.queue
380 e2715f69 Michael Hanselmann
    try:
381 85f03e0d Michael Hanselmann
      try:
382 85f03e0d Michael Hanselmann
        count = len(job.ops)
383 85f03e0d Michael Hanselmann
        for idx, op in enumerate(job.ops):
384 d21d09d6 Iustin Pop
          op_summary = op.input.Summary()
385 f6424741 Iustin Pop
          if op.status == constants.OP_STATUS_SUCCESS:
386 f6424741 Iustin Pop
            # this is a job that was partially completed before master
387 f6424741 Iustin Pop
            # daemon shutdown, so it can be expected that some opcodes
388 f6424741 Iustin Pop
            # are already completed successfully (if any did error
389 f6424741 Iustin Pop
            # out, then the whole job should have been aborted and not
390 f6424741 Iustin Pop
            # resubmitted for processing)
391 f6424741 Iustin Pop
            logging.info("Op %s/%s: opcode %s already processed, skipping",
392 f6424741 Iustin Pop
                         idx + 1, count, op_summary)
393 f6424741 Iustin Pop
            continue
394 85f03e0d Michael Hanselmann
          try:
395 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
396 d21d09d6 Iustin Pop
                         op_summary)
397 85f03e0d Michael Hanselmann
398 85f03e0d Michael Hanselmann
            queue.acquire()
399 85f03e0d Michael Hanselmann
            try:
400 df0fb067 Iustin Pop
              if op.status == constants.OP_STATUS_CANCELED:
401 df0fb067 Iustin Pop
                raise CancelJob()
402 fbf0262f Michael Hanselmann
              assert op.status == constants.OP_STATUS_QUEUED
403 85f03e0d Michael Hanselmann
              job.run_op_index = idx
404 e92376d7 Iustin Pop
              op.status = constants.OP_STATUS_WAITLOCK
405 85f03e0d Michael Hanselmann
              op.result = None
406 70552c46 Michael Hanselmann
              op.start_timestamp = TimeStampNow()
407 c56ec146 Iustin Pop
              if idx == 0: # first opcode
408 c56ec146 Iustin Pop
                job.start_timestamp = op.start_timestamp
409 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
410 85f03e0d Michael Hanselmann
411 38206f3c Iustin Pop
              input_opcode = op.input
412 85f03e0d Michael Hanselmann
            finally:
413 85f03e0d Michael Hanselmann
              queue.release()
414 85f03e0d Michael Hanselmann
415 dfe57c22 Michael Hanselmann
            def _Log(*args):
416 6c5a7090 Michael Hanselmann
              """Append a log entry.
417 6c5a7090 Michael Hanselmann

418 6c5a7090 Michael Hanselmann
              """
419 6c5a7090 Michael Hanselmann
              assert len(args) < 3
420 6c5a7090 Michael Hanselmann
421 6c5a7090 Michael Hanselmann
              if len(args) == 1:
422 6c5a7090 Michael Hanselmann
                log_type = constants.ELOG_MESSAGE
423 6c5a7090 Michael Hanselmann
                log_msg = args[0]
424 6c5a7090 Michael Hanselmann
              else:
425 6c5a7090 Michael Hanselmann
                log_type, log_msg = args
426 6c5a7090 Michael Hanselmann
427 6c5a7090 Michael Hanselmann
              # The time is split to make serialization easier and not lose
428 6c5a7090 Michael Hanselmann
              # precision.
429 6c5a7090 Michael Hanselmann
              timestamp = utils.SplitTime(time.time())
430 dfe57c22 Michael Hanselmann
431 6c5a7090 Michael Hanselmann
              queue.acquire()
432 dfe57c22 Michael Hanselmann
              try:
433 6c5a7090 Michael Hanselmann
                job.log_serial += 1
434 6c5a7090 Michael Hanselmann
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
435 6c5a7090 Michael Hanselmann
436 dfe57c22 Michael Hanselmann
                job.change.notifyAll()
437 dfe57c22 Michael Hanselmann
              finally:
438 6c5a7090 Michael Hanselmann
                queue.release()
439 dfe57c22 Michael Hanselmann
440 6c5a7090 Michael Hanselmann
            # Make sure not to hold lock while _Log is called
441 e92376d7 Iustin Pop
            self.opcode = op
442 e92376d7 Iustin Pop
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
443 85f03e0d Michael Hanselmann
444 85f03e0d Michael Hanselmann
            queue.acquire()
445 85f03e0d Michael Hanselmann
            try:
446 85f03e0d Michael Hanselmann
              op.status = constants.OP_STATUS_SUCCESS
447 85f03e0d Michael Hanselmann
              op.result = result
448 70552c46 Michael Hanselmann
              op.end_timestamp = TimeStampNow()
449 85f03e0d Michael Hanselmann
              queue.UpdateJobUnlocked(job)
450 85f03e0d Michael Hanselmann
            finally:
451 85f03e0d Michael Hanselmann
              queue.release()
452 85f03e0d Michael Hanselmann
453 d21d09d6 Iustin Pop
            logging.info("Op %s/%s: Successfully finished opcode %s",
454 d21d09d6 Iustin Pop
                         idx + 1, count, op_summary)
455 fbf0262f Michael Hanselmann
          except CancelJob:
456 fbf0262f Michael Hanselmann
            # Will be handled further up
457 fbf0262f Michael Hanselmann
            raise
458 85f03e0d Michael Hanselmann
          except Exception, err:
459 85f03e0d Michael Hanselmann
            queue.acquire()
460 85f03e0d Michael Hanselmann
            try:
461 85f03e0d Michael Hanselmann
              try:
462 85f03e0d Michael Hanselmann
                op.status = constants.OP_STATUS_ERROR
463 e6345c35 Iustin Pop
                if isinstance(err, errors.GenericError):
464 e6345c35 Iustin Pop
                  op.result = errors.EncodeException(err)
465 e6345c35 Iustin Pop
                else:
466 e6345c35 Iustin Pop
                  op.result = str(err)
467 70552c46 Michael Hanselmann
                op.end_timestamp = TimeStampNow()
468 0f6be82a Iustin Pop
                logging.info("Op %s/%s: Error in opcode %s: %s",
469 0f6be82a Iustin Pop
                             idx + 1, count, op_summary, err)
470 85f03e0d Michael Hanselmann
              finally:
471 85f03e0d Michael Hanselmann
                queue.UpdateJobUnlocked(job)
472 85f03e0d Michael Hanselmann
            finally:
473 85f03e0d Michael Hanselmann
              queue.release()
474 85f03e0d Michael Hanselmann
            raise
475 85f03e0d Michael Hanselmann
476 fbf0262f Michael Hanselmann
      except CancelJob:
477 fbf0262f Michael Hanselmann
        queue.acquire()
478 fbf0262f Michael Hanselmann
        try:
479 fbf0262f Michael Hanselmann
          queue.CancelJobUnlocked(job)
480 fbf0262f Michael Hanselmann
        finally:
481 fbf0262f Michael Hanselmann
          queue.release()
482 85f03e0d Michael Hanselmann
      except errors.GenericError, err:
483 85f03e0d Michael Hanselmann
        logging.exception("Ganeti exception")
484 85f03e0d Michael Hanselmann
      except:
485 85f03e0d Michael Hanselmann
        logging.exception("Unhandled exception")
486 e2715f69 Michael Hanselmann
    finally:
487 85f03e0d Michael Hanselmann
      queue.acquire()
488 85f03e0d Michael Hanselmann
      try:
489 65548ed5 Michael Hanselmann
        try:
490 ed21712b Iustin Pop
          job.run_op_index = -1
491 c56ec146 Iustin Pop
          job.end_timestamp = TimeStampNow()
492 65548ed5 Michael Hanselmann
          queue.UpdateJobUnlocked(job)
493 65548ed5 Michael Hanselmann
        finally:
494 65548ed5 Michael Hanselmann
          job_id = job.id
495 65548ed5 Michael Hanselmann
          status = job.CalcStatus()
496 85f03e0d Michael Hanselmann
      finally:
497 85f03e0d Michael Hanselmann
        queue.release()
498 d21d09d6 Iustin Pop
      logging.info("Worker %s finished job %s, status = %s",
499 d21d09d6 Iustin Pop
                   self.worker_id, job_id, status)
500 e2715f69 Michael Hanselmann
501 e2715f69 Michael Hanselmann
502 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
503 ea03467c Iustin Pop
  """Simple class implementing a job-processing workerpool.
504 ea03467c Iustin Pop

505 ea03467c Iustin Pop
  """
506 5bdce580 Michael Hanselmann
  def __init__(self, queue):
507 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
508 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
509 5bdce580 Michael Hanselmann
    self.queue = queue
510 e2715f69 Michael Hanselmann
511 e2715f69 Michael Hanselmann
512 caeffaa0 Iustin Pop
def _RequireOpenQueue(fn):
513 caeffaa0 Iustin Pop
  """Decorator for "public" functions.
514 ea03467c Iustin Pop

515 caeffaa0 Iustin Pop
  This function should be used for all 'public' functions. That is,
516 caeffaa0 Iustin Pop
  functions usually called from other classes. Note that this should
517 caeffaa0 Iustin Pop
  be applied only to methods (not plain functions), since it expects
518 caeffaa0 Iustin Pop
  that the decorated function is called with a first argument that has
519 caeffaa0 Iustin Pop
  a '_queue_lock' argument.
520 ea03467c Iustin Pop

521 caeffaa0 Iustin Pop
  @warning: Use this decorator only after utils.LockedMethod!
522 f1da30e6 Michael Hanselmann

523 caeffaa0 Iustin Pop
  Example::
524 caeffaa0 Iustin Pop
    @utils.LockedMethod
525 caeffaa0 Iustin Pop
    @_RequireOpenQueue
526 caeffaa0 Iustin Pop
    def Example(self):
527 caeffaa0 Iustin Pop
      pass
528 db37da70 Michael Hanselmann

529 caeffaa0 Iustin Pop
  """
530 caeffaa0 Iustin Pop
  def wrapper(self, *args, **kwargs):
531 caeffaa0 Iustin Pop
    assert self._queue_lock is not None, "Queue should be open"
532 caeffaa0 Iustin Pop
    return fn(self, *args, **kwargs)
533 caeffaa0 Iustin Pop
  return wrapper
534 db37da70 Michael Hanselmann
535 db37da70 Michael Hanselmann
536 caeffaa0 Iustin Pop
class JobQueue(object):
537 caeffaa0 Iustin Pop
  """Queue used to manage the jobs.
538 db37da70 Michael Hanselmann

539 caeffaa0 Iustin Pop
  @cvar _RE_JOB_FILE: regex matching the valid job file names
540 caeffaa0 Iustin Pop

541 caeffaa0 Iustin Pop
  """
542 caeffaa0 Iustin Pop
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
543 db37da70 Michael Hanselmann
544 85f03e0d Michael Hanselmann
  def __init__(self, context):
545 ea03467c Iustin Pop
    """Constructor for JobQueue.
546 ea03467c Iustin Pop

547 ea03467c Iustin Pop
    The constructor will initialize the job queue object and then
548 ea03467c Iustin Pop
    start loading the current jobs from disk, either for starting them
549 ea03467c Iustin Pop
    (if they were queue) or for aborting them (if they were already
550 ea03467c Iustin Pop
    running).
551 ea03467c Iustin Pop

552 ea03467c Iustin Pop
    @type context: GanetiContext
553 ea03467c Iustin Pop
    @param context: the context object for access to the configuration
554 ea03467c Iustin Pop
        data and other ganeti objects
555 ea03467c Iustin Pop

556 ea03467c Iustin Pop
    """
557 5bdce580 Michael Hanselmann
    self.context = context
558 5685c1a5 Michael Hanselmann
    self._memcache = weakref.WeakValueDictionary()
559 c3f0a12f Iustin Pop
    self._my_hostname = utils.HostInfo().name
560 f1da30e6 Michael Hanselmann
561 85f03e0d Michael Hanselmann
    # Locking
562 85f03e0d Michael Hanselmann
    self._lock = threading.Lock()
563 85f03e0d Michael Hanselmann
    self.acquire = self._lock.acquire
564 85f03e0d Michael Hanselmann
    self.release = self._lock.release
565 85f03e0d Michael Hanselmann
566 04ab05ce Michael Hanselmann
    # Initialize
567 5d6fb8eb Michael Hanselmann
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
568 f1da30e6 Michael Hanselmann
569 04ab05ce Michael Hanselmann
    # Read serial file
570 04ab05ce Michael Hanselmann
    self._last_serial = jstore.ReadSerial()
571 04ab05ce Michael Hanselmann
    assert self._last_serial is not None, ("Serial file was modified between"
572 04ab05ce Michael Hanselmann
                                           " check in jstore and here")
573 c4beba1c Iustin Pop
574 23752136 Michael Hanselmann
    # Get initial list of nodes
575 99aabbed Iustin Pop
    self._nodes = dict((n.name, n.primary_ip)
576 59303563 Iustin Pop
                       for n in self.context.cfg.GetAllNodesInfo().values()
577 59303563 Iustin Pop
                       if n.master_candidate)
578 8e00939c Michael Hanselmann
579 8e00939c Michael Hanselmann
    # Remove master node
580 8e00939c Michael Hanselmann
    try:
581 99aabbed Iustin Pop
      del self._nodes[self._my_hostname]
582 33987705 Iustin Pop
    except KeyError:
583 8e00939c Michael Hanselmann
      pass
584 23752136 Michael Hanselmann
585 23752136 Michael Hanselmann
    # TODO: Check consistency across nodes
586 23752136 Michael Hanselmann
587 85f03e0d Michael Hanselmann
    # Setup worker pool
588 5bdce580 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(self)
589 85f03e0d Michael Hanselmann
    try:
590 16714921 Michael Hanselmann
      # We need to lock here because WorkerPool.AddTask() may start a job while
591 16714921 Michael Hanselmann
      # we're still doing our work.
592 16714921 Michael Hanselmann
      self.acquire()
593 16714921 Michael Hanselmann
      try:
594 711b5124 Michael Hanselmann
        logging.info("Inspecting job queue")
595 711b5124 Michael Hanselmann
596 711b5124 Michael Hanselmann
        all_job_ids = self._GetJobIDsUnlocked()
597 b7cb9024 Michael Hanselmann
        jobs_count = len(all_job_ids)
598 711b5124 Michael Hanselmann
        lastinfo = time.time()
599 711b5124 Michael Hanselmann
        for idx, job_id in enumerate(all_job_ids):
600 711b5124 Michael Hanselmann
          # Give an update every 1000 jobs or 10 seconds
601 b7cb9024 Michael Hanselmann
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
602 b7cb9024 Michael Hanselmann
              idx == (jobs_count - 1)):
603 711b5124 Michael Hanselmann
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
604 b7cb9024 Michael Hanselmann
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
605 711b5124 Michael Hanselmann
            lastinfo = time.time()
606 711b5124 Michael Hanselmann
607 711b5124 Michael Hanselmann
          job = self._LoadJobUnlocked(job_id)
608 711b5124 Michael Hanselmann
609 16714921 Michael Hanselmann
          # a failure in loading the job can cause 'None' to be returned
610 16714921 Michael Hanselmann
          if job is None:
611 16714921 Michael Hanselmann
            continue
612 94ed59a5 Iustin Pop
613 16714921 Michael Hanselmann
          status = job.CalcStatus()
614 85f03e0d Michael Hanselmann
615 16714921 Michael Hanselmann
          if status in (constants.JOB_STATUS_QUEUED, ):
616 16714921 Michael Hanselmann
            self._wpool.AddTask(job)
617 85f03e0d Michael Hanselmann
618 16714921 Michael Hanselmann
          elif status in (constants.JOB_STATUS_RUNNING,
619 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK,
620 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_CANCELING):
621 16714921 Michael Hanselmann
            logging.warning("Unfinished job %s found: %s", job.id, job)
622 16714921 Michael Hanselmann
            try:
623 34327f51 Iustin Pop
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
624 34327f51 Iustin Pop
                                    "Unclean master daemon shutdown")
625 16714921 Michael Hanselmann
            finally:
626 16714921 Michael Hanselmann
              self.UpdateJobUnlocked(job)
627 711b5124 Michael Hanselmann
628 711b5124 Michael Hanselmann
        logging.info("Job queue inspection finished")
629 16714921 Michael Hanselmann
      finally:
630 16714921 Michael Hanselmann
        self.release()
631 16714921 Michael Hanselmann
    except:
632 16714921 Michael Hanselmann
      self._wpool.TerminateWorkers()
633 16714921 Michael Hanselmann
      raise
634 85f03e0d Michael Hanselmann
635 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
636 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
637 99aabbed Iustin Pop
  def AddNode(self, node):
638 99aabbed Iustin Pop
    """Register a new node with the queue.
639 99aabbed Iustin Pop

640 99aabbed Iustin Pop
    @type node: L{objects.Node}
641 99aabbed Iustin Pop
    @param node: the node object to be added
642 99aabbed Iustin Pop

643 99aabbed Iustin Pop
    """
644 99aabbed Iustin Pop
    node_name = node.name
645 d2e03a33 Michael Hanselmann
    assert node_name != self._my_hostname
646 23752136 Michael Hanselmann
647 9f774ee8 Michael Hanselmann
    # Clean queue directory on added node
648 a3811745 Michael Hanselmann
    rpc.RpcRunner.call_jobqueue_purge(node_name)
649 23752136 Michael Hanselmann
650 59303563 Iustin Pop
    if not node.master_candidate:
651 59303563 Iustin Pop
      # remove if existing, ignoring errors
652 59303563 Iustin Pop
      self._nodes.pop(node_name, None)
653 59303563 Iustin Pop
      # and skip the replication of the job ids
654 59303563 Iustin Pop
      return
655 59303563 Iustin Pop
656 d2e03a33 Michael Hanselmann
    # Upload the whole queue excluding archived jobs
657 d2e03a33 Michael Hanselmann
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
658 23752136 Michael Hanselmann
659 d2e03a33 Michael Hanselmann
    # Upload current serial file
660 d2e03a33 Michael Hanselmann
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
661 d2e03a33 Michael Hanselmann
662 d2e03a33 Michael Hanselmann
    for file_name in files:
663 9f774ee8 Michael Hanselmann
      # Read file content
664 9f774ee8 Michael Hanselmann
      fd = open(file_name, "r")
665 9f774ee8 Michael Hanselmann
      try:
666 9f774ee8 Michael Hanselmann
        content = fd.read()
667 9f774ee8 Michael Hanselmann
      finally:
668 9f774ee8 Michael Hanselmann
        fd.close()
669 9f774ee8 Michael Hanselmann
670 a3811745 Michael Hanselmann
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
671 a3811745 Michael Hanselmann
                                                  [node.primary_ip],
672 a3811745 Michael Hanselmann
                                                  file_name, content)
673 d2e03a33 Michael Hanselmann
      if not result[node_name]:
674 d2e03a33 Michael Hanselmann
        logging.error("Failed to upload %s to %s", file_name, node_name)
675 d2e03a33 Michael Hanselmann
676 99aabbed Iustin Pop
    self._nodes[node_name] = node.primary_ip
677 d2e03a33 Michael Hanselmann
678 d2e03a33 Michael Hanselmann
  @utils.LockedMethod
679 d2e03a33 Michael Hanselmann
  @_RequireOpenQueue
680 d2e03a33 Michael Hanselmann
  def RemoveNode(self, node_name):
681 ea03467c Iustin Pop
    """Callback called when removing nodes from the cluster.
682 ea03467c Iustin Pop

683 ea03467c Iustin Pop
    @type node_name: str
684 ea03467c Iustin Pop
    @param node_name: the name of the node to remove
685 ea03467c Iustin Pop

686 ea03467c Iustin Pop
    """
687 23752136 Michael Hanselmann
    try:
688 d2e03a33 Michael Hanselmann
      # The queue is removed by the "leave node" RPC call.
689 99aabbed Iustin Pop
      del self._nodes[node_name]
690 d2e03a33 Michael Hanselmann
    except KeyError:
691 23752136 Michael Hanselmann
      pass
692 23752136 Michael Hanselmann
693 e74798c1 Michael Hanselmann
  def _CheckRpcResult(self, result, nodes, failmsg):
694 ea03467c Iustin Pop
    """Verifies the status of an RPC call.
695 ea03467c Iustin Pop

696 ea03467c Iustin Pop
    Since we aim to keep consistency should this node (the current
697 ea03467c Iustin Pop
    master) fail, we will log errors if our rpc fail, and especially
698 5bbd3f7f Michael Hanselmann
    log the case when more than half of the nodes fails.
699 ea03467c Iustin Pop

700 ea03467c Iustin Pop
    @param result: the data as returned from the rpc call
701 ea03467c Iustin Pop
    @type nodes: list
702 ea03467c Iustin Pop
    @param nodes: the list of nodes we made the call to
703 ea03467c Iustin Pop
    @type failmsg: str
704 ea03467c Iustin Pop
    @param failmsg: the identifier to be used for logging
705 ea03467c Iustin Pop

706 ea03467c Iustin Pop
    """
707 e74798c1 Michael Hanselmann
    failed = []
708 e74798c1 Michael Hanselmann
    success = []
709 e74798c1 Michael Hanselmann
710 e74798c1 Michael Hanselmann
    for node in nodes:
711 e74798c1 Michael Hanselmann
      if result[node]:
712 e74798c1 Michael Hanselmann
        success.append(node)
713 e74798c1 Michael Hanselmann
      else:
714 e74798c1 Michael Hanselmann
        failed.append(node)
715 e74798c1 Michael Hanselmann
716 e74798c1 Michael Hanselmann
    if failed:
717 e74798c1 Michael Hanselmann
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
718 e74798c1 Michael Hanselmann
719 e74798c1 Michael Hanselmann
    # +1 for the master node
720 e74798c1 Michael Hanselmann
    if (len(success) + 1) < len(failed):
721 e74798c1 Michael Hanselmann
      # TODO: Handle failing nodes
722 e74798c1 Michael Hanselmann
      logging.error("More than half of the nodes failed")
723 e74798c1 Michael Hanselmann
724 99aabbed Iustin Pop
  def _GetNodeIp(self):
725 99aabbed Iustin Pop
    """Helper for returning the node name/ip list.
726 99aabbed Iustin Pop

727 ea03467c Iustin Pop
    @rtype: (list, list)
728 ea03467c Iustin Pop
    @return: a tuple of two lists, the first one with the node
729 ea03467c Iustin Pop
        names and the second one with the node addresses
730 ea03467c Iustin Pop

731 99aabbed Iustin Pop
    """
732 99aabbed Iustin Pop
    name_list = self._nodes.keys()
733 99aabbed Iustin Pop
    addr_list = [self._nodes[name] for name in name_list]
734 99aabbed Iustin Pop
    return name_list, addr_list
735 99aabbed Iustin Pop
736 8e00939c Michael Hanselmann
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
737 8e00939c Michael Hanselmann
    """Writes a file locally and then replicates it to all nodes.
738 8e00939c Michael Hanselmann

739 ea03467c Iustin Pop
    This function will replace the contents of a file on the local
740 ea03467c Iustin Pop
    node and then replicate it to all the other nodes we have.
741 ea03467c Iustin Pop

742 ea03467c Iustin Pop
    @type file_name: str
743 ea03467c Iustin Pop
    @param file_name: the path of the file to be replicated
744 ea03467c Iustin Pop
    @type data: str
745 ea03467c Iustin Pop
    @param data: the new contents of the file
746 ea03467c Iustin Pop

747 8e00939c Michael Hanselmann
    """
748 8e00939c Michael Hanselmann
    utils.WriteFile(file_name, data=data)
749 8e00939c Michael Hanselmann
750 99aabbed Iustin Pop
    names, addrs = self._GetNodeIp()
751 a3811745 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
752 e74798c1 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes,
753 e74798c1 Michael Hanselmann
                         "Updating %s" % file_name)
754 23752136 Michael Hanselmann
755 d7fd1f28 Michael Hanselmann
  def _RenameFilesUnlocked(self, rename):
756 ea03467c Iustin Pop
    """Renames a file locally and then replicate the change.
757 ea03467c Iustin Pop

758 ea03467c Iustin Pop
    This function will rename a file in the local queue directory
759 ea03467c Iustin Pop
    and then replicate this rename to all the other nodes we have.
760 ea03467c Iustin Pop

761 d7fd1f28 Michael Hanselmann
    @type rename: list of (old, new)
762 d7fd1f28 Michael Hanselmann
    @param rename: List containing tuples mapping old to new names
763 ea03467c Iustin Pop

764 ea03467c Iustin Pop
    """
765 dd875d32 Michael Hanselmann
    # Rename them locally
766 d7fd1f28 Michael Hanselmann
    for old, new in rename:
767 d7fd1f28 Michael Hanselmann
      utils.RenameFile(old, new, mkdir=True)
768 abc1f2ce Michael Hanselmann
769 dd875d32 Michael Hanselmann
    # ... and on all nodes
770 dd875d32 Michael Hanselmann
    names, addrs = self._GetNodeIp()
771 dd875d32 Michael Hanselmann
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
772 dd875d32 Michael Hanselmann
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
773 abc1f2ce Michael Hanselmann
774 85f03e0d Michael Hanselmann
  def _FormatJobID(self, job_id):
775 ea03467c Iustin Pop
    """Convert a job ID to string format.
776 ea03467c Iustin Pop

777 ea03467c Iustin Pop
    Currently this just does C{str(job_id)} after performing some
778 ea03467c Iustin Pop
    checks, but if we want to change the job id format this will
779 ea03467c Iustin Pop
    abstract this change.
780 ea03467c Iustin Pop

781 ea03467c Iustin Pop
    @type job_id: int or long
782 ea03467c Iustin Pop
    @param job_id: the numeric job id
783 ea03467c Iustin Pop
    @rtype: str
784 ea03467c Iustin Pop
    @return: the formatted job id
785 ea03467c Iustin Pop

786 ea03467c Iustin Pop
    """
787 85f03e0d Michael Hanselmann
    if not isinstance(job_id, (int, long)):
788 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
789 85f03e0d Michael Hanselmann
    if job_id < 0:
790 85f03e0d Michael Hanselmann
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
791 85f03e0d Michael Hanselmann
792 85f03e0d Michael Hanselmann
    return str(job_id)
793 85f03e0d Michael Hanselmann
794 58b22b6e Michael Hanselmann
  @classmethod
795 58b22b6e Michael Hanselmann
  def _GetArchiveDirectory(cls, job_id):
796 58b22b6e Michael Hanselmann
    """Returns the archive directory for a job.
797 58b22b6e Michael Hanselmann

798 58b22b6e Michael Hanselmann
    @type job_id: str
799 58b22b6e Michael Hanselmann
    @param job_id: Job identifier
800 58b22b6e Michael Hanselmann
    @rtype: str
801 58b22b6e Michael Hanselmann
    @return: Directory name
802 58b22b6e Michael Hanselmann

803 58b22b6e Michael Hanselmann
    """
804 58b22b6e Michael Hanselmann
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
805 58b22b6e Michael Hanselmann
806 009e73d0 Iustin Pop
  def _NewSerialsUnlocked(self, count):
807 f1da30e6 Michael Hanselmann
    """Generates a new job identifier.
808 f1da30e6 Michael Hanselmann

809 f1da30e6 Michael Hanselmann
    Job identifiers are unique during the lifetime of a cluster.
810 f1da30e6 Michael Hanselmann

811 009e73d0 Iustin Pop
    @type count: integer
812 009e73d0 Iustin Pop
    @param count: how many serials to return
813 ea03467c Iustin Pop
    @rtype: str
814 ea03467c Iustin Pop
    @return: a string representing the job identifier.
815 f1da30e6 Michael Hanselmann

816 f1da30e6 Michael Hanselmann
    """
817 009e73d0 Iustin Pop
    assert count > 0
818 f1da30e6 Michael Hanselmann
    # New number
819 009e73d0 Iustin Pop
    serial = self._last_serial + count
820 f1da30e6 Michael Hanselmann
821 f1da30e6 Michael Hanselmann
    # Write to file
822 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
823 23752136 Michael Hanselmann
                                        "%s\n" % serial)
824 f1da30e6 Michael Hanselmann
825 009e73d0 Iustin Pop
    result = [self._FormatJobID(v)
826 009e73d0 Iustin Pop
              for v in range(self._last_serial, serial + 1)]
827 f1da30e6 Michael Hanselmann
    # Keep it only if we were able to write the file
828 f1da30e6 Michael Hanselmann
    self._last_serial = serial
829 f1da30e6 Michael Hanselmann
830 009e73d0 Iustin Pop
    return result
831 f1da30e6 Michael Hanselmann
832 85f03e0d Michael Hanselmann
  @staticmethod
833 85f03e0d Michael Hanselmann
  def _GetJobPath(job_id):
834 ea03467c Iustin Pop
    """Returns the job file for a given job id.
835 ea03467c Iustin Pop

836 ea03467c Iustin Pop
    @type job_id: str
837 ea03467c Iustin Pop
    @param job_id: the job identifier
838 ea03467c Iustin Pop
    @rtype: str
839 ea03467c Iustin Pop
    @return: the path to the job file
840 ea03467c Iustin Pop

841 ea03467c Iustin Pop
    """
842 f1da30e6 Michael Hanselmann
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
843 f1da30e6 Michael Hanselmann
844 58b22b6e Michael Hanselmann
  @classmethod
845 58b22b6e Michael Hanselmann
  def _GetArchivedJobPath(cls, job_id):
846 ea03467c Iustin Pop
    """Returns the archived job file for a give job id.
847 ea03467c Iustin Pop

848 ea03467c Iustin Pop
    @type job_id: str
849 ea03467c Iustin Pop
    @param job_id: the job identifier
850 ea03467c Iustin Pop
    @rtype: str
851 ea03467c Iustin Pop
    @return: the path to the archived job file
852 ea03467c Iustin Pop

853 ea03467c Iustin Pop
    """
854 58b22b6e Michael Hanselmann
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
855 58b22b6e Michael Hanselmann
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
856 0cb94105 Michael Hanselmann
857 85f03e0d Michael Hanselmann
  @classmethod
858 85f03e0d Michael Hanselmann
  def _ExtractJobID(cls, name):
859 ea03467c Iustin Pop
    """Extract the job id from a filename.
860 ea03467c Iustin Pop

861 ea03467c Iustin Pop
    @type name: str
862 ea03467c Iustin Pop
    @param name: the job filename
863 ea03467c Iustin Pop
    @rtype: job id or None
864 ea03467c Iustin Pop
    @return: the job id corresponding to the given filename,
865 ea03467c Iustin Pop
        or None if the filename does not represent a valid
866 ea03467c Iustin Pop
        job file
867 ea03467c Iustin Pop

868 ea03467c Iustin Pop
    """
869 85f03e0d Michael Hanselmann
    m = cls._RE_JOB_FILE.match(name)
870 fae737ac Michael Hanselmann
    if m:
871 fae737ac Michael Hanselmann
      return m.group(1)
872 fae737ac Michael Hanselmann
    else:
873 fae737ac Michael Hanselmann
      return None
874 fae737ac Michael Hanselmann
875 911a495b Iustin Pop
  def _GetJobIDsUnlocked(self, archived=False):
876 911a495b Iustin Pop
    """Return all known job IDs.
877 911a495b Iustin Pop

878 911a495b Iustin Pop
    If the parameter archived is True, archived jobs IDs will be
879 911a495b Iustin Pop
    included. Currently this argument is unused.
880 911a495b Iustin Pop

881 ac0930b9 Iustin Pop
    The method only looks at disk because it's a requirement that all
882 ac0930b9 Iustin Pop
    jobs are present on disk (so in the _memcache we don't have any
883 ac0930b9 Iustin Pop
    extra IDs).
884 ac0930b9 Iustin Pop

885 ea03467c Iustin Pop
    @rtype: list
886 ea03467c Iustin Pop
    @return: the list of job IDs
887 ea03467c Iustin Pop

888 911a495b Iustin Pop
    """
889 fae737ac Michael Hanselmann
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
890 3b87986e Iustin Pop
    jlist = utils.NiceSort(jlist)
891 f0d874fe Iustin Pop
    return jlist
892 911a495b Iustin Pop
893 f1da30e6 Michael Hanselmann
  def _ListJobFiles(self):
894 ea03467c Iustin Pop
    """Returns the list of current job files.
895 ea03467c Iustin Pop

896 ea03467c Iustin Pop
    @rtype: list
897 ea03467c Iustin Pop
    @return: the list of job file names
898 ea03467c Iustin Pop

899 ea03467c Iustin Pop
    """
900 f1da30e6 Michael Hanselmann
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
901 f1da30e6 Michael Hanselmann
            if self._RE_JOB_FILE.match(name)]
902 f1da30e6 Michael Hanselmann
903 911a495b Iustin Pop
  def _LoadJobUnlocked(self, job_id):
904 ea03467c Iustin Pop
    """Loads a job from the disk or memory.
905 ea03467c Iustin Pop

906 ea03467c Iustin Pop
    Given a job id, this will return the cached job object if
907 ea03467c Iustin Pop
    existing, or try to load the job from the disk. If loading from
908 ea03467c Iustin Pop
    disk, it will also add the job to the cache.
909 ea03467c Iustin Pop

910 ea03467c Iustin Pop
    @param job_id: the job id
911 ea03467c Iustin Pop
    @rtype: L{_QueuedJob} or None
912 ea03467c Iustin Pop
    @return: either None or the job object
913 ea03467c Iustin Pop

914 ea03467c Iustin Pop
    """
915 5685c1a5 Michael Hanselmann
    job = self._memcache.get(job_id, None)
916 5685c1a5 Michael Hanselmann
    if job:
917 205d71fd Michael Hanselmann
      logging.debug("Found job %s in memcache", job_id)
918 5685c1a5 Michael Hanselmann
      return job
919 ac0930b9 Iustin Pop
920 911a495b Iustin Pop
    filepath = self._GetJobPath(job_id)
921 f1da30e6 Michael Hanselmann
    logging.debug("Loading job from %s", filepath)
922 f1da30e6 Michael Hanselmann
    try:
923 f1da30e6 Michael Hanselmann
      fd = open(filepath, "r")
924 f1da30e6 Michael Hanselmann
    except IOError, err:
925 f1da30e6 Michael Hanselmann
      if err.errno in (errno.ENOENT, ):
926 f1da30e6 Michael Hanselmann
        return None
927 f1da30e6 Michael Hanselmann
      raise
928 f1da30e6 Michael Hanselmann
    try:
929 f1da30e6 Michael Hanselmann
      data = serializer.LoadJson(fd.read())
930 f1da30e6 Michael Hanselmann
    finally:
931 f1da30e6 Michael Hanselmann
      fd.close()
932 f1da30e6 Michael Hanselmann
933 94ed59a5 Iustin Pop
    try:
934 94ed59a5 Iustin Pop
      job = _QueuedJob.Restore(self, data)
935 94ed59a5 Iustin Pop
    except Exception, err:
936 94ed59a5 Iustin Pop
      new_path = self._GetArchivedJobPath(job_id)
937 94ed59a5 Iustin Pop
      if filepath == new_path:
938 94ed59a5 Iustin Pop
        # job already archived (future case)
939 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s", job_id)
940 94ed59a5 Iustin Pop
      else:
941 94ed59a5 Iustin Pop
        # non-archived case
942 94ed59a5 Iustin Pop
        logging.exception("Can't parse job %s, will archive.", job_id)
943 d7fd1f28 Michael Hanselmann
        self._RenameFilesUnlocked([(filepath, new_path)])
944 94ed59a5 Iustin Pop
      return None
945 94ed59a5 Iustin Pop
946 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
947 205d71fd Michael Hanselmann
    logging.debug("Added job %s to the cache", job_id)
948 ac0930b9 Iustin Pop
    return job
949 f1da30e6 Michael Hanselmann
950 f1da30e6 Michael Hanselmann
  def _GetJobsUnlocked(self, job_ids):
951 ea03467c Iustin Pop
    """Return a list of jobs based on their IDs.
952 ea03467c Iustin Pop

953 ea03467c Iustin Pop
    @type job_ids: list
954 ea03467c Iustin Pop
    @param job_ids: either an empty list (meaning all jobs),
955 ea03467c Iustin Pop
        or a list of job IDs
956 ea03467c Iustin Pop
    @rtype: list
957 ea03467c Iustin Pop
    @return: the list of job objects
958 ea03467c Iustin Pop

959 ea03467c Iustin Pop
    """
960 911a495b Iustin Pop
    if not job_ids:
961 911a495b Iustin Pop
      job_ids = self._GetJobIDsUnlocked()
962 f1da30e6 Michael Hanselmann
963 911a495b Iustin Pop
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
964 f1da30e6 Michael Hanselmann
965 686d7433 Iustin Pop
  @staticmethod
966 686d7433 Iustin Pop
  def _IsQueueMarkedDrain():
967 686d7433 Iustin Pop
    """Check if the queue is marked from drain.
968 686d7433 Iustin Pop

969 686d7433 Iustin Pop
    This currently uses the queue drain file, which makes it a
970 686d7433 Iustin Pop
    per-node flag. In the future this can be moved to the config file.
971 686d7433 Iustin Pop

972 ea03467c Iustin Pop
    @rtype: boolean
973 ea03467c Iustin Pop
    @return: True of the job queue is marked for draining
974 ea03467c Iustin Pop

975 686d7433 Iustin Pop
    """
976 686d7433 Iustin Pop
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
977 686d7433 Iustin Pop
978 3ccafd0e Iustin Pop
  @staticmethod
979 3ccafd0e Iustin Pop
  def SetDrainFlag(drain_flag):
980 3ccafd0e Iustin Pop
    """Sets the drain flag for the queue.
981 3ccafd0e Iustin Pop

982 3ccafd0e Iustin Pop
    This is similar to the function L{backend.JobQueueSetDrainFlag},
983 3ccafd0e Iustin Pop
    and in the future we might merge them.
984 3ccafd0e Iustin Pop

985 ea03467c Iustin Pop
    @type drain_flag: boolean
986 5bbd3f7f Michael Hanselmann
    @param drain_flag: Whether to set or unset the drain flag
987 ea03467c Iustin Pop

988 3ccafd0e Iustin Pop
    """
989 3ccafd0e Iustin Pop
    if drain_flag:
990 3ccafd0e Iustin Pop
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
991 3ccafd0e Iustin Pop
    else:
992 3ccafd0e Iustin Pop
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
993 3ccafd0e Iustin Pop
    return True
994 3ccafd0e Iustin Pop
995 db37da70 Michael Hanselmann
  @_RequireOpenQueue
996 009e73d0 Iustin Pop
  def _SubmitJobUnlocked(self, job_id, ops):
997 85f03e0d Michael Hanselmann
    """Create and store a new job.
998 f1da30e6 Michael Hanselmann

999 85f03e0d Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
1000 85f03e0d Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
1001 c3f0a12f Iustin Pop

1002 009e73d0 Iustin Pop
    @type job_id: job ID
1003 009e73d0 Iustin Pop
    @param jod_id: the job ID for the new job
1004 c3f0a12f Iustin Pop
    @type ops: list
1005 205d71fd Michael Hanselmann
    @param ops: The list of OpCodes that will become the new job.
1006 ea03467c Iustin Pop
    @rtype: job ID
1007 ea03467c Iustin Pop
    @return: the job ID of the newly created job
1008 ea03467c Iustin Pop
    @raise errors.JobQueueDrainError: if the job is marked for draining
1009 c3f0a12f Iustin Pop

1010 c3f0a12f Iustin Pop
    """
1011 686d7433 Iustin Pop
    if self._IsQueueMarkedDrain():
1012 56d8ff91 Iustin Pop
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1013 f87b405e Michael Hanselmann
1014 f87b405e Michael Hanselmann
    # Check job queue size
1015 f87b405e Michael Hanselmann
    size = len(self._ListJobFiles())
1016 f87b405e Michael Hanselmann
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1017 f87b405e Michael Hanselmann
      # TODO: Autoarchive jobs. Make sure it's not done on every job
1018 f87b405e Michael Hanselmann
      # submission, though.
1019 f87b405e Michael Hanselmann
      #size = ...
1020 f87b405e Michael Hanselmann
      pass
1021 f87b405e Michael Hanselmann
1022 f87b405e Michael Hanselmann
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1023 f87b405e Michael Hanselmann
      raise errors.JobQueueFull()
1024 f87b405e Michael Hanselmann
1025 f1da30e6 Michael Hanselmann
    job = _QueuedJob(self, job_id, ops)
1026 f1da30e6 Michael Hanselmann
1027 f1da30e6 Michael Hanselmann
    # Write to disk
1028 85f03e0d Michael Hanselmann
    self.UpdateJobUnlocked(job)
1029 f1da30e6 Michael Hanselmann
1030 5685c1a5 Michael Hanselmann
    logging.debug("Adding new job %s to the cache", job_id)
1031 ac0930b9 Iustin Pop
    self._memcache[job_id] = job
1032 ac0930b9 Iustin Pop
1033 85f03e0d Michael Hanselmann
    # Add to worker pool
1034 85f03e0d Michael Hanselmann
    self._wpool.AddTask(job)
1035 85f03e0d Michael Hanselmann
1036 85f03e0d Michael Hanselmann
    return job.id
1037 f1da30e6 Michael Hanselmann
1038 56d8ff91 Iustin Pop
  @utils.LockedMethod
1039 56d8ff91 Iustin Pop
  @_RequireOpenQueue
1040 56d8ff91 Iustin Pop
  def SubmitJob(self, ops):
1041 56d8ff91 Iustin Pop
    """Create and store a new job.
1042 56d8ff91 Iustin Pop

1043 56d8ff91 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1044 56d8ff91 Iustin Pop

1045 56d8ff91 Iustin Pop
    """
1046 009e73d0 Iustin Pop
    job_id = self._NewSerialsUnlocked(1)[0]
1047 009e73d0 Iustin Pop
    return self._SubmitJobUnlocked(job_id, ops)
1048 56d8ff91 Iustin Pop
1049 56d8ff91 Iustin Pop
  @utils.LockedMethod
1050 56d8ff91 Iustin Pop
  @_RequireOpenQueue
1051 56d8ff91 Iustin Pop
  def SubmitManyJobs(self, jobs):
1052 56d8ff91 Iustin Pop
    """Create and store multiple jobs.
1053 56d8ff91 Iustin Pop

1054 56d8ff91 Iustin Pop
    @see: L{_SubmitJobUnlocked}
1055 56d8ff91 Iustin Pop

1056 56d8ff91 Iustin Pop
    """
1057 56d8ff91 Iustin Pop
    results = []
1058 009e73d0 Iustin Pop
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1059 009e73d0 Iustin Pop
    for job_id, ops in zip(all_job_ids, jobs):
1060 56d8ff91 Iustin Pop
      try:
1061 009e73d0 Iustin Pop
        data = self._SubmitJobUnlocked(job_id, ops)
1062 56d8ff91 Iustin Pop
        status = True
1063 56d8ff91 Iustin Pop
      except errors.GenericError, err:
1064 56d8ff91 Iustin Pop
        data = str(err)
1065 56d8ff91 Iustin Pop
        status = False
1066 56d8ff91 Iustin Pop
      results.append((status, data))
1067 56d8ff91 Iustin Pop
1068 56d8ff91 Iustin Pop
    return results
1069 56d8ff91 Iustin Pop
1070 56d8ff91 Iustin Pop
1071 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1072 85f03e0d Michael Hanselmann
  def UpdateJobUnlocked(self, job):
1073 ea03467c Iustin Pop
    """Update a job's on disk storage.
1074 ea03467c Iustin Pop

1075 ea03467c Iustin Pop
    After a job has been modified, this function needs to be called in
1076 ea03467c Iustin Pop
    order to write the changes to disk and replicate them to the other
1077 ea03467c Iustin Pop
    nodes.
1078 ea03467c Iustin Pop

1079 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1080 ea03467c Iustin Pop
    @param job: the changed job
1081 ea03467c Iustin Pop

1082 ea03467c Iustin Pop
    """
1083 f1da30e6 Michael Hanselmann
    filename = self._GetJobPath(job.id)
1084 23752136 Michael Hanselmann
    data = serializer.DumpJson(job.Serialize(), indent=False)
1085 f1da30e6 Michael Hanselmann
    logging.debug("Writing job %s to %s", job.id, filename)
1086 23752136 Michael Hanselmann
    self._WriteAndReplicateFileUnlocked(filename, data)
1087 ac0930b9 Iustin Pop
1088 dfe57c22 Michael Hanselmann
    # Notify waiters about potential changes
1089 6c5a7090 Michael Hanselmann
    job.change.notifyAll()
1090 dfe57c22 Michael Hanselmann
1091 6c5a7090 Michael Hanselmann
  @utils.LockedMethod
1092 dfe57c22 Michael Hanselmann
  @_RequireOpenQueue
1093 5c735209 Iustin Pop
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1094 5c735209 Iustin Pop
                        timeout):
1095 6c5a7090 Michael Hanselmann
    """Waits for changes in a job.
1096 6c5a7090 Michael Hanselmann

1097 6c5a7090 Michael Hanselmann
    @type job_id: string
1098 6c5a7090 Michael Hanselmann
    @param job_id: Job identifier
1099 6c5a7090 Michael Hanselmann
    @type fields: list of strings
1100 6c5a7090 Michael Hanselmann
    @param fields: Which fields to check for changes
1101 6c5a7090 Michael Hanselmann
    @type prev_job_info: list or None
1102 6c5a7090 Michael Hanselmann
    @param prev_job_info: Last job information returned
1103 6c5a7090 Michael Hanselmann
    @type prev_log_serial: int
1104 6c5a7090 Michael Hanselmann
    @param prev_log_serial: Last job message serial number
1105 5c735209 Iustin Pop
    @type timeout: float
1106 5c735209 Iustin Pop
    @param timeout: maximum time to wait
1107 ea03467c Iustin Pop
    @rtype: tuple (job info, log entries)
1108 ea03467c Iustin Pop
    @return: a tuple of the job information as required via
1109 ea03467c Iustin Pop
        the fields parameter, and the log entries as a list
1110 ea03467c Iustin Pop

1111 ea03467c Iustin Pop
        if the job has not changed and the timeout has expired,
1112 ea03467c Iustin Pop
        we instead return a special value,
1113 ea03467c Iustin Pop
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1114 ea03467c Iustin Pop
        as such by the clients
1115 6c5a7090 Michael Hanselmann

1116 6c5a7090 Michael Hanselmann
    """
1117 dfe57c22 Michael Hanselmann
    logging.debug("Waiting for changes in job %s", job_id)
1118 6e237482 Michael Hanselmann
1119 6e237482 Michael Hanselmann
    job_info = None
1120 6e237482 Michael Hanselmann
    log_entries = None
1121 6e237482 Michael Hanselmann
1122 5c735209 Iustin Pop
    end_time = time.time() + timeout
1123 dfe57c22 Michael Hanselmann
    while True:
1124 5c735209 Iustin Pop
      delta_time = end_time - time.time()
1125 5c735209 Iustin Pop
      if delta_time < 0:
1126 5c735209 Iustin Pop
        return constants.JOB_NOTCHANGED
1127 5c735209 Iustin Pop
1128 6c5a7090 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1129 6c5a7090 Michael Hanselmann
      if not job:
1130 6c5a7090 Michael Hanselmann
        logging.debug("Job %s not found", job_id)
1131 6c5a7090 Michael Hanselmann
        break
1132 dfe57c22 Michael Hanselmann
1133 6c5a7090 Michael Hanselmann
      status = job.CalcStatus()
1134 6c5a7090 Michael Hanselmann
      job_info = self._GetJobInfoUnlocked(job, fields)
1135 6c5a7090 Michael Hanselmann
      log_entries = job.GetLogEntries(prev_log_serial)
1136 dfe57c22 Michael Hanselmann
1137 dfe57c22 Michael Hanselmann
      # Serializing and deserializing data can cause type changes (e.g. from
1138 dfe57c22 Michael Hanselmann
      # tuple to list) or precision loss. We're doing it here so that we get
1139 dfe57c22 Michael Hanselmann
      # the same modifications as the data received from the client. Without
1140 dfe57c22 Michael Hanselmann
      # this, the comparison afterwards might fail without the data being
1141 dfe57c22 Michael Hanselmann
      # significantly different.
1142 6c5a7090 Michael Hanselmann
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1143 6c5a7090 Michael Hanselmann
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1144 dfe57c22 Michael Hanselmann
1145 6c5a7090 Michael Hanselmann
      if status not in (constants.JOB_STATUS_QUEUED,
1146 e92376d7 Iustin Pop
                        constants.JOB_STATUS_RUNNING,
1147 e92376d7 Iustin Pop
                        constants.JOB_STATUS_WAITLOCK):
1148 6c5a7090 Michael Hanselmann
        # Don't even try to wait if the job is no longer running, there will be
1149 6c5a7090 Michael Hanselmann
        # no changes.
1150 dfe57c22 Michael Hanselmann
        break
1151 dfe57c22 Michael Hanselmann
1152 6c5a7090 Michael Hanselmann
      if (prev_job_info != job_info or
1153 6c5a7090 Michael Hanselmann
          (log_entries and prev_log_serial != log_entries[0][0])):
1154 6c5a7090 Michael Hanselmann
        break
1155 6c5a7090 Michael Hanselmann
1156 6c5a7090 Michael Hanselmann
      logging.debug("Waiting again")
1157 6c5a7090 Michael Hanselmann
1158 6c5a7090 Michael Hanselmann
      # Release the queue lock while waiting
1159 5c735209 Iustin Pop
      job.change.wait(delta_time)
1160 dfe57c22 Michael Hanselmann
1161 dfe57c22 Michael Hanselmann
    logging.debug("Job %s changed", job_id)
1162 dfe57c22 Michael Hanselmann
1163 6e237482 Michael Hanselmann
    if job_info is None and log_entries is None:
1164 6e237482 Michael Hanselmann
      return None
1165 6e237482 Michael Hanselmann
    else:
1166 6e237482 Michael Hanselmann
      return (job_info, log_entries)
1167 dfe57c22 Michael Hanselmann
1168 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1169 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1170 188c5e0a Michael Hanselmann
  def CancelJob(self, job_id):
1171 188c5e0a Michael Hanselmann
    """Cancels a job.
1172 188c5e0a Michael Hanselmann

1173 ea03467c Iustin Pop
    This will only succeed if the job has not started yet.
1174 ea03467c Iustin Pop

1175 188c5e0a Michael Hanselmann
    @type job_id: string
1176 ea03467c Iustin Pop
    @param job_id: job ID of job to be cancelled.
1177 188c5e0a Michael Hanselmann

1178 188c5e0a Michael Hanselmann
    """
1179 fbf0262f Michael Hanselmann
    logging.info("Cancelling job %s", job_id)
1180 188c5e0a Michael Hanselmann
1181 85f03e0d Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1182 188c5e0a Michael Hanselmann
    if not job:
1183 188c5e0a Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1184 fbf0262f Michael Hanselmann
      return (False, "Job %s not found" % job_id)
1185 fbf0262f Michael Hanselmann
1186 fbf0262f Michael Hanselmann
    job_status = job.CalcStatus()
1187 188c5e0a Michael Hanselmann
1188 fbf0262f Michael Hanselmann
    if job_status not in (constants.JOB_STATUS_QUEUED,
1189 fbf0262f Michael Hanselmann
                          constants.JOB_STATUS_WAITLOCK):
1190 a9e97393 Michael Hanselmann
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1191 a9e97393 Michael Hanselmann
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1192 fbf0262f Michael Hanselmann
1193 fbf0262f Michael Hanselmann
    if job_status == constants.JOB_STATUS_QUEUED:
1194 fbf0262f Michael Hanselmann
      self.CancelJobUnlocked(job)
1195 fbf0262f Michael Hanselmann
      return (True, "Job %s canceled" % job.id)
1196 188c5e0a Michael Hanselmann
1197 fbf0262f Michael Hanselmann
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1198 fbf0262f Michael Hanselmann
      # The worker will notice the new status and cancel the job
1199 fbf0262f Michael Hanselmann
      try:
1200 34327f51 Iustin Pop
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1201 fbf0262f Michael Hanselmann
      finally:
1202 fbf0262f Michael Hanselmann
        self.UpdateJobUnlocked(job)
1203 fbf0262f Michael Hanselmann
      return (True, "Job %s will be canceled" % job.id)
1204 fbf0262f Michael Hanselmann
1205 fbf0262f Michael Hanselmann
  @_RequireOpenQueue
1206 fbf0262f Michael Hanselmann
  def CancelJobUnlocked(self, job):
1207 fbf0262f Michael Hanselmann
    """Marks a job as canceled.
1208 fbf0262f Michael Hanselmann

1209 fbf0262f Michael Hanselmann
    """
1210 85f03e0d Michael Hanselmann
    try:
1211 34327f51 Iustin Pop
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1212 34327f51 Iustin Pop
                            "Job canceled by request")
1213 85f03e0d Michael Hanselmann
    finally:
1214 85f03e0d Michael Hanselmann
      self.UpdateJobUnlocked(job)
1215 188c5e0a Michael Hanselmann
1216 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1217 d7fd1f28 Michael Hanselmann
  def _ArchiveJobsUnlocked(self, jobs):
1218 d7fd1f28 Michael Hanselmann
    """Archives jobs.
1219 c609f802 Michael Hanselmann

1220 d7fd1f28 Michael Hanselmann
    @type jobs: list of L{_QueuedJob}
1221 25e7b43f Iustin Pop
    @param jobs: Job objects
1222 d7fd1f28 Michael Hanselmann
    @rtype: int
1223 d7fd1f28 Michael Hanselmann
    @return: Number of archived jobs
1224 c609f802 Michael Hanselmann

1225 c609f802 Michael Hanselmann
    """
1226 d7fd1f28 Michael Hanselmann
    archive_jobs = []
1227 d7fd1f28 Michael Hanselmann
    rename_files = []
1228 d7fd1f28 Michael Hanselmann
    for job in jobs:
1229 d7fd1f28 Michael Hanselmann
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1230 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_SUCCESS,
1231 d7fd1f28 Michael Hanselmann
                                  constants.JOB_STATUS_ERROR):
1232 d7fd1f28 Michael Hanselmann
        logging.debug("Job %s is not yet done", job.id)
1233 d7fd1f28 Michael Hanselmann
        continue
1234 c609f802 Michael Hanselmann
1235 d7fd1f28 Michael Hanselmann
      archive_jobs.append(job)
1236 c609f802 Michael Hanselmann
1237 d7fd1f28 Michael Hanselmann
      old = self._GetJobPath(job.id)
1238 d7fd1f28 Michael Hanselmann
      new = self._GetArchivedJobPath(job.id)
1239 d7fd1f28 Michael Hanselmann
      rename_files.append((old, new))
1240 c609f802 Michael Hanselmann
1241 d7fd1f28 Michael Hanselmann
    # TODO: What if 1..n files fail to rename?
1242 d7fd1f28 Michael Hanselmann
    self._RenameFilesUnlocked(rename_files)
1243 f1da30e6 Michael Hanselmann
1244 d7fd1f28 Michael Hanselmann
    logging.debug("Successfully archived job(s) %s",
1245 d7fd1f28 Michael Hanselmann
                  ", ".join(job.id for job in archive_jobs))
1246 d7fd1f28 Michael Hanselmann
1247 d7fd1f28 Michael Hanselmann
    return len(archive_jobs)
1248 78d12585 Michael Hanselmann
1249 07cd723a Iustin Pop
  @utils.LockedMethod
1250 07cd723a Iustin Pop
  @_RequireOpenQueue
1251 07cd723a Iustin Pop
  def ArchiveJob(self, job_id):
1252 07cd723a Iustin Pop
    """Archives a job.
1253 07cd723a Iustin Pop

1254 25e7b43f Iustin Pop
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1255 ea03467c Iustin Pop

1256 07cd723a Iustin Pop
    @type job_id: string
1257 07cd723a Iustin Pop
    @param job_id: Job ID of job to be archived.
1258 78d12585 Michael Hanselmann
    @rtype: bool
1259 78d12585 Michael Hanselmann
    @return: Whether job was archived
1260 07cd723a Iustin Pop

1261 07cd723a Iustin Pop
    """
1262 78d12585 Michael Hanselmann
    logging.info("Archiving job %s", job_id)
1263 78d12585 Michael Hanselmann
1264 78d12585 Michael Hanselmann
    job = self._LoadJobUnlocked(job_id)
1265 78d12585 Michael Hanselmann
    if not job:
1266 78d12585 Michael Hanselmann
      logging.debug("Job %s not found", job_id)
1267 78d12585 Michael Hanselmann
      return False
1268 78d12585 Michael Hanselmann
1269 5278185a Iustin Pop
    return self._ArchiveJobsUnlocked([job]) == 1
1270 07cd723a Iustin Pop
1271 07cd723a Iustin Pop
  @utils.LockedMethod
1272 07cd723a Iustin Pop
  @_RequireOpenQueue
1273 f8ad5591 Michael Hanselmann
  def AutoArchiveJobs(self, age, timeout):
1274 07cd723a Iustin Pop
    """Archives all jobs based on age.
1275 07cd723a Iustin Pop

1276 07cd723a Iustin Pop
    The method will archive all jobs which are older than the age
1277 07cd723a Iustin Pop
    parameter. For jobs that don't have an end timestamp, the start
1278 07cd723a Iustin Pop
    timestamp will be considered. The special '-1' age will cause
1279 07cd723a Iustin Pop
    archival of all jobs (that are not running or queued).
1280 07cd723a Iustin Pop

1281 07cd723a Iustin Pop
    @type age: int
1282 07cd723a Iustin Pop
    @param age: the minimum age in seconds
1283 07cd723a Iustin Pop

1284 07cd723a Iustin Pop
    """
1285 07cd723a Iustin Pop
    logging.info("Archiving jobs with age more than %s seconds", age)
1286 07cd723a Iustin Pop
1287 07cd723a Iustin Pop
    now = time.time()
1288 f8ad5591 Michael Hanselmann
    end_time = now + timeout
1289 f8ad5591 Michael Hanselmann
    archived_count = 0
1290 f8ad5591 Michael Hanselmann
    last_touched = 0
1291 f8ad5591 Michael Hanselmann
1292 f8ad5591 Michael Hanselmann
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1293 d7fd1f28 Michael Hanselmann
    pending = []
1294 f8ad5591 Michael Hanselmann
    for idx, job_id in enumerate(all_job_ids):
1295 f8ad5591 Michael Hanselmann
      last_touched = idx
1296 f8ad5591 Michael Hanselmann
1297 d7fd1f28 Michael Hanselmann
      # Not optimal because jobs could be pending
1298 d7fd1f28 Michael Hanselmann
      # TODO: Measure average duration for job archival and take number of
1299 d7fd1f28 Michael Hanselmann
      # pending jobs into account.
1300 f8ad5591 Michael Hanselmann
      if time.time() > end_time:
1301 f8ad5591 Michael Hanselmann
        break
1302 f8ad5591 Michael Hanselmann
1303 78d12585 Michael Hanselmann
      # Returns None if the job failed to load
1304 78d12585 Michael Hanselmann
      job = self._LoadJobUnlocked(job_id)
1305 f8ad5591 Michael Hanselmann
      if job:
1306 f8ad5591 Michael Hanselmann
        if job.end_timestamp is None:
1307 f8ad5591 Michael Hanselmann
          if job.start_timestamp is None:
1308 f8ad5591 Michael Hanselmann
            job_age = job.received_timestamp
1309 f8ad5591 Michael Hanselmann
          else:
1310 f8ad5591 Michael Hanselmann
            job_age = job.start_timestamp
1311 07cd723a Iustin Pop
        else:
1312 f8ad5591 Michael Hanselmann
          job_age = job.end_timestamp
1313 f8ad5591 Michael Hanselmann
1314 f8ad5591 Michael Hanselmann
        if age == -1 or now - job_age[0] > age:
1315 d7fd1f28 Michael Hanselmann
          pending.append(job)
1316 d7fd1f28 Michael Hanselmann
1317 d7fd1f28 Michael Hanselmann
          # Archive 10 jobs at a time
1318 d7fd1f28 Michael Hanselmann
          if len(pending) >= 10:
1319 d7fd1f28 Michael Hanselmann
            archived_count += self._ArchiveJobsUnlocked(pending)
1320 d7fd1f28 Michael Hanselmann
            pending = []
1321 f8ad5591 Michael Hanselmann
1322 d7fd1f28 Michael Hanselmann
    if pending:
1323 d7fd1f28 Michael Hanselmann
      archived_count += self._ArchiveJobsUnlocked(pending)
1324 07cd723a Iustin Pop
1325 f8ad5591 Michael Hanselmann
    return (archived_count, len(all_job_ids) - last_touched - 1)
1326 07cd723a Iustin Pop
1327 85f03e0d Michael Hanselmann
  def _GetJobInfoUnlocked(self, job, fields):
1328 ea03467c Iustin Pop
    """Returns information about a job.
1329 ea03467c Iustin Pop

1330 ea03467c Iustin Pop
    @type job: L{_QueuedJob}
1331 ea03467c Iustin Pop
    @param job: the job which we query
1332 ea03467c Iustin Pop
    @type fields: list
1333 ea03467c Iustin Pop
    @param fields: names of fields to return
1334 ea03467c Iustin Pop
    @rtype: list
1335 ea03467c Iustin Pop
    @return: list with one element for each field
1336 ea03467c Iustin Pop
    @raise errors.OpExecError: when an invalid field
1337 ea03467c Iustin Pop
        has been passed
1338 ea03467c Iustin Pop

1339 ea03467c Iustin Pop
    """
1340 e2715f69 Michael Hanselmann
    row = []
1341 e2715f69 Michael Hanselmann
    for fname in fields:
1342 e2715f69 Michael Hanselmann
      if fname == "id":
1343 e2715f69 Michael Hanselmann
        row.append(job.id)
1344 e2715f69 Michael Hanselmann
      elif fname == "status":
1345 85f03e0d Michael Hanselmann
        row.append(job.CalcStatus())
1346 af30b2fd Michael Hanselmann
      elif fname == "ops":
1347 85f03e0d Michael Hanselmann
        row.append([op.input.__getstate__() for op in job.ops])
1348 af30b2fd Michael Hanselmann
      elif fname == "opresult":
1349 85f03e0d Michael Hanselmann
        row.append([op.result for op in job.ops])
1350 af30b2fd Michael Hanselmann
      elif fname == "opstatus":
1351 85f03e0d Michael Hanselmann
        row.append([op.status for op in job.ops])
1352 5b23c34c Iustin Pop
      elif fname == "oplog":
1353 5b23c34c Iustin Pop
        row.append([op.log for op in job.ops])
1354 c56ec146 Iustin Pop
      elif fname == "opstart":
1355 c56ec146 Iustin Pop
        row.append([op.start_timestamp for op in job.ops])
1356 c56ec146 Iustin Pop
      elif fname == "opend":
1357 c56ec146 Iustin Pop
        row.append([op.end_timestamp for op in job.ops])
1358 c56ec146 Iustin Pop
      elif fname == "received_ts":
1359 c56ec146 Iustin Pop
        row.append(job.received_timestamp)
1360 c56ec146 Iustin Pop
      elif fname == "start_ts":
1361 c56ec146 Iustin Pop
        row.append(job.start_timestamp)
1362 c56ec146 Iustin Pop
      elif fname == "end_ts":
1363 c56ec146 Iustin Pop
        row.append(job.end_timestamp)
1364 60dd1473 Iustin Pop
      elif fname == "summary":
1365 60dd1473 Iustin Pop
        row.append([op.input.Summary() for op in job.ops])
1366 e2715f69 Michael Hanselmann
      else:
1367 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1368 e2715f69 Michael Hanselmann
    return row
1369 e2715f69 Michael Hanselmann
1370 85f03e0d Michael Hanselmann
  @utils.LockedMethod
1371 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1372 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
1373 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
1374 e2715f69 Michael Hanselmann

1375 ea03467c Iustin Pop
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1376 ea03467c Iustin Pop
    processing for each job.
1377 ea03467c Iustin Pop

1378 ea03467c Iustin Pop
    @type job_ids: list
1379 ea03467c Iustin Pop
    @param job_ids: sequence of job identifiers or None for all
1380 ea03467c Iustin Pop
    @type fields: list
1381 ea03467c Iustin Pop
    @param fields: names of fields to return
1382 ea03467c Iustin Pop
    @rtype: list
1383 ea03467c Iustin Pop
    @return: list one element per job, each element being list with
1384 ea03467c Iustin Pop
        the requested fields
1385 e2715f69 Michael Hanselmann

1386 e2715f69 Michael Hanselmann
    """
1387 85f03e0d Michael Hanselmann
    jobs = []
1388 e2715f69 Michael Hanselmann
1389 85f03e0d Michael Hanselmann
    for job in self._GetJobsUnlocked(job_ids):
1390 85f03e0d Michael Hanselmann
      if job is None:
1391 85f03e0d Michael Hanselmann
        jobs.append(None)
1392 85f03e0d Michael Hanselmann
      else:
1393 85f03e0d Michael Hanselmann
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1394 e2715f69 Michael Hanselmann
1395 85f03e0d Michael Hanselmann
    return jobs
1396 e2715f69 Michael Hanselmann
1397 f1da30e6 Michael Hanselmann
  @utils.LockedMethod
1398 db37da70 Michael Hanselmann
  @_RequireOpenQueue
1399 e2715f69 Michael Hanselmann
  def Shutdown(self):
1400 e2715f69 Michael Hanselmann
    """Stops the job queue.
1401 e2715f69 Michael Hanselmann

1402 ea03467c Iustin Pop
    This shutdowns all the worker threads an closes the queue.
1403 ea03467c Iustin Pop

1404 e2715f69 Michael Hanselmann
    """
1405 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()
1406 85f03e0d Michael Hanselmann
1407 04ab05ce Michael Hanselmann
    self._queue_lock.Close()
1408 04ab05ce Michael Hanselmann
    self._queue_lock = None