Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ 8572f1fe

History | View | Annotate | Download (42.9 kB)

1 989a8bee Michael Hanselmann
#!/usr/bin/python
2 989a8bee Michael Hanselmann
#
3 989a8bee Michael Hanselmann
4 989a8bee Michael Hanselmann
# Copyright (C) 2010 Google Inc.
5 989a8bee Michael Hanselmann
#
6 989a8bee Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 989a8bee Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 989a8bee Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 989a8bee Michael Hanselmann
# (at your option) any later version.
10 989a8bee Michael Hanselmann
#
11 989a8bee Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 989a8bee Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 989a8bee Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 989a8bee Michael Hanselmann
# General Public License for more details.
15 989a8bee Michael Hanselmann
#
16 989a8bee Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 989a8bee Michael Hanselmann
# along with this program; if not, write to the Free Software
18 989a8bee Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 989a8bee Michael Hanselmann
# 02110-1301, USA.
20 989a8bee Michael Hanselmann
21 989a8bee Michael Hanselmann
22 989a8bee Michael Hanselmann
"""Script for testing ganeti.jqueue"""
23 989a8bee Michael Hanselmann
24 989a8bee Michael Hanselmann
import os
25 989a8bee Michael Hanselmann
import sys
26 989a8bee Michael Hanselmann
import unittest
27 989a8bee Michael Hanselmann
import tempfile
28 989a8bee Michael Hanselmann
import shutil
29 989a8bee Michael Hanselmann
import errno
30 26d3fd2f Michael Hanselmann
import itertools
31 989a8bee Michael Hanselmann
32 989a8bee Michael Hanselmann
from ganeti import constants
33 989a8bee Michael Hanselmann
from ganeti import utils
34 989a8bee Michael Hanselmann
from ganeti import errors
35 989a8bee Michael Hanselmann
from ganeti import jqueue
36 8f5c488d Michael Hanselmann
from ganeti import opcodes
37 8f5c488d Michael Hanselmann
from ganeti import compat
38 26d3fd2f Michael Hanselmann
from ganeti import mcpu
39 989a8bee Michael Hanselmann
40 989a8bee Michael Hanselmann
import testutils
41 989a8bee Michael Hanselmann
42 989a8bee Michael Hanselmann
43 989a8bee Michael Hanselmann
class _FakeJob:
44 989a8bee Michael Hanselmann
  def __init__(self, job_id, status):
45 989a8bee Michael Hanselmann
    self.id = job_id
46 989a8bee Michael Hanselmann
    self._status = status
47 989a8bee Michael Hanselmann
    self._log = []
48 989a8bee Michael Hanselmann
49 989a8bee Michael Hanselmann
  def SetStatus(self, status):
50 989a8bee Michael Hanselmann
    self._status = status
51 989a8bee Michael Hanselmann
52 989a8bee Michael Hanselmann
  def AddLogEntry(self, msg):
53 989a8bee Michael Hanselmann
    self._log.append((len(self._log), msg))
54 989a8bee Michael Hanselmann
55 989a8bee Michael Hanselmann
  def CalcStatus(self):
56 989a8bee Michael Hanselmann
    return self._status
57 989a8bee Michael Hanselmann
58 989a8bee Michael Hanselmann
  def GetInfo(self, fields):
59 989a8bee Michael Hanselmann
    result = []
60 989a8bee Michael Hanselmann
61 989a8bee Michael Hanselmann
    for name in fields:
62 989a8bee Michael Hanselmann
      if name == "status":
63 989a8bee Michael Hanselmann
        result.append(self._status)
64 989a8bee Michael Hanselmann
      else:
65 989a8bee Michael Hanselmann
        raise Exception("Unknown field")
66 989a8bee Michael Hanselmann
67 989a8bee Michael Hanselmann
    return result
68 989a8bee Michael Hanselmann
69 989a8bee Michael Hanselmann
  def GetLogEntries(self, newer_than):
70 989a8bee Michael Hanselmann
    assert newer_than is None or newer_than >= 0
71 989a8bee Michael Hanselmann
72 989a8bee Michael Hanselmann
    if newer_than is None:
73 989a8bee Michael Hanselmann
      return self._log
74 989a8bee Michael Hanselmann
75 989a8bee Michael Hanselmann
    return self._log[newer_than:]
76 989a8bee Michael Hanselmann
77 989a8bee Michael Hanselmann
78 989a8bee Michael Hanselmann
class TestJobChangesChecker(unittest.TestCase):
79 989a8bee Michael Hanselmann
  def testStatus(self):
80 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
82 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
83 989a8bee Michael Hanselmann
84 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
85 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
86 989a8bee Michael Hanselmann
87 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
88 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
89 989a8bee Michael Hanselmann
90 989a8bee Michael Hanselmann
    # job.id is used by checker
91 989a8bee Michael Hanselmann
    self.assertEqual(job.id, 9094)
92 989a8bee Michael Hanselmann
93 989a8bee Michael Hanselmann
  def testStatusWithPrev(self):
94 989a8bee Michael Hanselmann
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"],
96 989a8bee Michael Hanselmann
                                        [constants.JOB_STATUS_QUEUED], None)
97 989a8bee Michael Hanselmann
    self.assert_(checker(job) is None)
98 989a8bee Michael Hanselmann
99 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
100 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
101 989a8bee Michael Hanselmann
102 989a8bee Michael Hanselmann
  def testFinalStatus(self):
103 989a8bee Michael Hanselmann
    for status in constants.JOBS_FINALIZED:
104 989a8bee Michael Hanselmann
      job = _FakeJob(2178711, status)
105 989a8bee Michael Hanselmann
      checker = jqueue._JobChangesChecker(["status"], [status], None)
106 989a8bee Michael Hanselmann
      # There won't be any changes in this status, hence it should signal
107 989a8bee Michael Hanselmann
      # a change immediately
108 989a8bee Michael Hanselmann
      self.assertEqual(checker(job), ([status], []))
109 989a8bee Michael Hanselmann
110 989a8bee Michael Hanselmann
  def testLog(self):
111 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
113 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
114 989a8bee Michael Hanselmann
115 989a8bee Michael Hanselmann
    job.AddLogEntry("Hello World")
116 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker(job)
117 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"]])
119 989a8bee Michael Hanselmann
120 989a8bee Michael Hanselmann
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121 989a8bee Michael Hanselmann
    self.assert_(checker2(job) is None)
122 989a8bee Michael Hanselmann
123 989a8bee Michael Hanselmann
    job.AddLogEntry("Foo Bar")
124 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_ERROR)
125 989a8bee Michael Hanselmann
126 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker2(job)
127 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
129 989a8bee Michael Hanselmann
130 989a8bee Michael Hanselmann
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
131 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker3(job)
132 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
134 989a8bee Michael Hanselmann
135 989a8bee Michael Hanselmann
136 989a8bee Michael Hanselmann
class TestJobChangesWaiter(unittest.TestCase):
137 989a8bee Michael Hanselmann
  def setUp(self):
138 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
139 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
140 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
141 989a8bee Michael Hanselmann
142 989a8bee Michael Hanselmann
  def tearDown(self):
143 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
144 989a8bee Michael Hanselmann
145 989a8bee Michael Hanselmann
  def _EnsureNotifierClosed(self, notifier):
146 989a8bee Michael Hanselmann
    try:
147 989a8bee Michael Hanselmann
      os.fstat(notifier._fd)
148 989a8bee Michael Hanselmann
    except EnvironmentError, err:
149 989a8bee Michael Hanselmann
      self.assertEqual(err.errno, errno.EBADF)
150 989a8bee Michael Hanselmann
    else:
151 989a8bee Michael Hanselmann
      self.fail("File descriptor wasn't closed")
152 989a8bee Michael Hanselmann
153 989a8bee Michael Hanselmann
  def testClose(self):
154 989a8bee Michael Hanselmann
    for wait in [False, True]:
155 989a8bee Michael Hanselmann
      waiter = jqueue._JobFileChangesWaiter(self.filename)
156 989a8bee Michael Hanselmann
      try:
157 989a8bee Michael Hanselmann
        if wait:
158 989a8bee Michael Hanselmann
          waiter.Wait(0.001)
159 989a8bee Michael Hanselmann
      finally:
160 989a8bee Michael Hanselmann
        waiter.Close()
161 989a8bee Michael Hanselmann
162 989a8bee Michael Hanselmann
      # Ensure file descriptor was closed
163 989a8bee Michael Hanselmann
      self._EnsureNotifierClosed(waiter._notifier)
164 989a8bee Michael Hanselmann
165 989a8bee Michael Hanselmann
  def testChangingFile(self):
166 989a8bee Michael Hanselmann
    waiter = jqueue._JobFileChangesWaiter(self.filename)
167 989a8bee Michael Hanselmann
    try:
168 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
169 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
170 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
171 989a8bee Michael Hanselmann
    finally:
172 989a8bee Michael Hanselmann
      waiter.Close()
173 989a8bee Michael Hanselmann
174 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._notifier)
175 989a8bee Michael Hanselmann
176 989a8bee Michael Hanselmann
  def testChangingFile2(self):
177 989a8bee Michael Hanselmann
    waiter = jqueue._JobChangesWaiter(self.filename)
178 989a8bee Michael Hanselmann
    try:
179 989a8bee Michael Hanselmann
      self.assertFalse(waiter._filewaiter)
180 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(0.1))
181 989a8bee Michael Hanselmann
      self.assert_(waiter._filewaiter)
182 989a8bee Michael Hanselmann
183 989a8bee Michael Hanselmann
      # File waiter is now used, but there have been no changes
184 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
185 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
186 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
187 989a8bee Michael Hanselmann
    finally:
188 989a8bee Michael Hanselmann
      waiter.Close()
189 989a8bee Michael Hanselmann
190 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
191 989a8bee Michael Hanselmann
192 989a8bee Michael Hanselmann
193 989a8bee Michael Hanselmann
class TestWaitForJobChangesHelper(unittest.TestCase):
194 989a8bee Michael Hanselmann
  def setUp(self):
195 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
196 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
198 989a8bee Michael Hanselmann
199 989a8bee Michael Hanselmann
  def tearDown(self):
200 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
201 989a8bee Michael Hanselmann
202 989a8bee Michael Hanselmann
  def _LoadWaitingJob(self):
203 989a8bee Michael Hanselmann
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
204 989a8bee Michael Hanselmann
205 989a8bee Michael Hanselmann
  def _LoadLostJob(self):
206 989a8bee Michael Hanselmann
    return None
207 989a8bee Michael Hanselmann
208 989a8bee Michael Hanselmann
  def testNoChanges(self):
209 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
210 989a8bee Michael Hanselmann
211 989a8bee Michael Hanselmann
    # No change
212 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213 989a8bee Michael Hanselmann
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214 989a8bee Michael Hanselmann
                     constants.JOB_NOTCHANGED)
215 989a8bee Michael Hanselmann
216 989a8bee Michael Hanselmann
    # No previous information
217 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218 989a8bee Michael Hanselmann
                          ["status"], None, None, 1.0),
219 989a8bee Michael Hanselmann
                     ([constants.JOB_STATUS_WAITLOCK], []))
220 989a8bee Michael Hanselmann
221 989a8bee Michael Hanselmann
  def testLostJob(self):
222 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
223 989a8bee Michael Hanselmann
    self.assert_(wfjc(self.filename, self._LoadLostJob,
224 989a8bee Michael Hanselmann
                      ["status"], None, None, 1.0) is None)
225 989a8bee Michael Hanselmann
226 989a8bee Michael Hanselmann
227 6760e4ed Michael Hanselmann
class TestEncodeOpError(unittest.TestCase):
228 6760e4ed Michael Hanselmann
  def test(self):
229 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
231 6760e4ed Michael Hanselmann
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
232 6760e4ed Michael Hanselmann
233 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
235 6760e4ed Michael Hanselmann
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
236 6760e4ed Michael Hanselmann
237 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
239 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
240 6760e4ed Michael Hanselmann
241 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError("Hello World")
242 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
243 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244 6760e4ed Michael Hanselmann
245 6760e4ed Michael Hanselmann
246 8f5c488d Michael Hanselmann
class TestQueuedOpCode(unittest.TestCase):
247 8f5c488d Michael Hanselmann
  def testDefaults(self):
248 8f5c488d Michael Hanselmann
    def _Check(op):
249 8f5c488d Michael Hanselmann
      self.assertFalse(hasattr(op.input, "dry_run"))
250 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251 8f5c488d Michael Hanselmann
      self.assertFalse(op.log)
252 8f5c488d Michael Hanselmann
      self.assert_(op.start_timestamp is None)
253 8f5c488d Michael Hanselmann
      self.assert_(op.exec_timestamp is None)
254 8f5c488d Michael Hanselmann
      self.assert_(op.end_timestamp is None)
255 8f5c488d Michael Hanselmann
      self.assert_(op.result is None)
256 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
257 8f5c488d Michael Hanselmann
258 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
259 8f5c488d Michael Hanselmann
    _Check(op1)
260 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
261 8f5c488d Michael Hanselmann
    _Check(op2)
262 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
263 8f5c488d Michael Hanselmann
264 8f5c488d Michael Hanselmann
  def testPriority(self):
265 8f5c488d Michael Hanselmann
    def _Check(op):
266 8f5c488d Michael Hanselmann
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267 8f5c488d Michael Hanselmann
             "Default priority equals high priority; test can't work"
268 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
270 8f5c488d Michael Hanselmann
271 8f5c488d Michael Hanselmann
    inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
272 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(inpop)
273 8f5c488d Michael Hanselmann
    _Check(op1)
274 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
275 8f5c488d Michael Hanselmann
    _Check(op2)
276 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
277 8f5c488d Michael Hanselmann
278 8f5c488d Michael Hanselmann
279 8f5c488d Michael Hanselmann
class TestQueuedJob(unittest.TestCase):
280 5f6b0b71 Michael Hanselmann
  def test(self):
281 5f6b0b71 Michael Hanselmann
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
282 5f6b0b71 Michael Hanselmann
                      None, 1, [])
283 5f6b0b71 Michael Hanselmann
284 8f5c488d Michael Hanselmann
  def testDefaults(self):
285 8f5c488d Michael Hanselmann
    job_id = 4260
286 8f5c488d Michael Hanselmann
    ops = [
287 8f5c488d Michael Hanselmann
      opcodes.OpGetTags(),
288 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
289 8f5c488d Michael Hanselmann
      ]
290 8f5c488d Michael Hanselmann
291 8f5c488d Michael Hanselmann
    def _Check(job):
292 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
293 8f5c488d Michael Hanselmann
      self.assertEqual(job.log_serial, 0)
294 8f5c488d Michael Hanselmann
      self.assert_(job.received_timestamp)
295 8f5c488d Michael Hanselmann
      self.assert_(job.start_timestamp is None)
296 8f5c488d Michael Hanselmann
      self.assert_(job.end_timestamp is None)
297 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
300 8f5c488d Michael Hanselmann
      self.assertEqual(len(job.ops), len(ops))
301 8f5c488d Michael Hanselmann
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302 8f5c488d Michael Hanselmann
                              for (inp, op) in zip(ops, job.ops)))
303 5f6b0b71 Michael Hanselmann
      self.assertRaises(errors.OpExecError, job.GetInfo,
304 5f6b0b71 Michael Hanselmann
                        ["unknown-field"])
305 5f6b0b71 Michael Hanselmann
      self.assertEqual(job.GetInfo(["summary"]),
306 5f6b0b71 Michael Hanselmann
                       [[op.input.Summary() for op in job.ops]])
307 8f5c488d Michael Hanselmann
308 8f5c488d Michael Hanselmann
    job1 = jqueue._QueuedJob(None, job_id, ops)
309 8f5c488d Michael Hanselmann
    _Check(job1)
310 8f5c488d Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
311 8f5c488d Michael Hanselmann
    _Check(job2)
312 8f5c488d Michael Hanselmann
    self.assertEqual(job1.Serialize(), job2.Serialize())
313 8f5c488d Michael Hanselmann
314 8f5c488d Michael Hanselmann
  def testPriority(self):
315 8f5c488d Michael Hanselmann
    job_id = 4283
316 8f5c488d Michael Hanselmann
    ops = [
317 8f5c488d Michael Hanselmann
      opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
318 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
319 8f5c488d Michael Hanselmann
      ]
320 8f5c488d Michael Hanselmann
321 8f5c488d Michael Hanselmann
    def _Check(job):
322 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
323 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
325 8f5c488d Michael Hanselmann
326 8f5c488d Michael Hanselmann
    job = jqueue._QueuedJob(None, job_id, ops)
327 8f5c488d Michael Hanselmann
    _Check(job)
328 8f5c488d Michael Hanselmann
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
329 8f5c488d Michael Hanselmann
                            for op in job.ops))
330 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331 8f5c488d Michael Hanselmann
332 8f5c488d Michael Hanselmann
    # Increase first
333 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 1
334 8f5c488d Michael Hanselmann
    _Check(job)
335 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
336 8f5c488d Michael Hanselmann
337 8f5c488d Michael Hanselmann
    # Mark opcode as finished
338 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
339 8f5c488d Michael Hanselmann
    _Check(job)
340 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
341 8f5c488d Michael Hanselmann
342 8f5c488d Michael Hanselmann
    # Increase second
343 8f5c488d Michael Hanselmann
    job.ops[1].priority -= 10
344 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
345 8f5c488d Michael Hanselmann
346 8f5c488d Michael Hanselmann
    # Test increasing first
347 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
348 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 19
349 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
350 8f5c488d Michael Hanselmann
351 db5bce34 Michael Hanselmann
  def testCalcStatus(self):
352 db5bce34 Michael Hanselmann
    def _Queued(ops):
353 db5bce34 Michael Hanselmann
      # The default status is "queued"
354 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
355 db5bce34 Michael Hanselmann
                              for op in ops))
356 db5bce34 Michael Hanselmann
357 db5bce34 Michael Hanselmann
    def _Waitlock1(ops):
358 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_WAITLOCK
359 db5bce34 Michael Hanselmann
360 db5bce34 Michael Hanselmann
    def _Waitlock2(ops):
361 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
362 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
363 db5bce34 Michael Hanselmann
      ops[2].status = constants.OP_STATUS_WAITLOCK
364 db5bce34 Michael Hanselmann
365 db5bce34 Michael Hanselmann
    def _Running(ops):
366 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
367 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_RUNNING
368 db5bce34 Michael Hanselmann
      for op in ops[2:]:
369 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_QUEUED
370 db5bce34 Michael Hanselmann
371 db5bce34 Michael Hanselmann
    def _Canceling1(ops):
372 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
373 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
374 db5bce34 Michael Hanselmann
      for op in ops[2:]:
375 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
376 db5bce34 Michael Hanselmann
377 db5bce34 Michael Hanselmann
    def _Canceling2(ops):
378 db5bce34 Michael Hanselmann
      for op in ops:
379 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
380 db5bce34 Michael Hanselmann
381 db5bce34 Michael Hanselmann
    def _Canceled(ops):
382 db5bce34 Michael Hanselmann
      for op in ops:
383 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELED
384 db5bce34 Michael Hanselmann
385 db5bce34 Michael Hanselmann
    def _Error1(ops):
386 db5bce34 Michael Hanselmann
      for idx, op in enumerate(ops):
387 db5bce34 Michael Hanselmann
        if idx > 3:
388 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_ERROR
389 db5bce34 Michael Hanselmann
        else:
390 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_SUCCESS
391 db5bce34 Michael Hanselmann
392 db5bce34 Michael Hanselmann
    def _Error2(ops):
393 db5bce34 Michael Hanselmann
      for op in ops:
394 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
395 db5bce34 Michael Hanselmann
396 db5bce34 Michael Hanselmann
    def _Success(ops):
397 db5bce34 Michael Hanselmann
      for op in ops:
398 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
399 db5bce34 Michael Hanselmann
400 db5bce34 Michael Hanselmann
    tests = {
401 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_QUEUED: [_Queued],
402 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_RUNNING: [_Running],
404 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELED: [_Canceled],
406 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_SUCCESS: [_Success],
408 db5bce34 Michael Hanselmann
      }
409 db5bce34 Michael Hanselmann
410 db5bce34 Michael Hanselmann
    def _NewJob():
411 db5bce34 Michael Hanselmann
      job = jqueue._QueuedJob(None, 1,
412 db5bce34 Michael Hanselmann
                              [opcodes.OpTestDelay() for _ in range(10)])
413 db5bce34 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
415 db5bce34 Michael Hanselmann
                              for op in job.ops))
416 db5bce34 Michael Hanselmann
      return job
417 db5bce34 Michael Hanselmann
418 db5bce34 Michael Hanselmann
    for status in constants.JOB_STATUS_ALL:
419 db5bce34 Michael Hanselmann
      sttests = tests[status]
420 db5bce34 Michael Hanselmann
      assert sttests
421 db5bce34 Michael Hanselmann
      for fn in sttests:
422 db5bce34 Michael Hanselmann
        job = _NewJob()
423 db5bce34 Michael Hanselmann
        fn(job.ops)
424 db5bce34 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), status)
425 db5bce34 Michael Hanselmann
426 8f5c488d Michael Hanselmann
427 be760ba8 Michael Hanselmann
class _FakeQueueForProc:
428 be760ba8 Michael Hanselmann
  def __init__(self):
429 be760ba8 Michael Hanselmann
    self._acquired = False
430 ebb2a2a3 Michael Hanselmann
    self._updates = []
431 be760ba8 Michael Hanselmann
432 be760ba8 Michael Hanselmann
  def IsAcquired(self):
433 be760ba8 Michael Hanselmann
    return self._acquired
434 be760ba8 Michael Hanselmann
435 ebb2a2a3 Michael Hanselmann
  def GetNextUpdate(self):
436 ebb2a2a3 Michael Hanselmann
    return self._updates.pop(0)
437 ebb2a2a3 Michael Hanselmann
438 be760ba8 Michael Hanselmann
  def acquire(self, shared=0):
439 be760ba8 Michael Hanselmann
    assert shared == 1
440 be760ba8 Michael Hanselmann
    self._acquired = True
441 be760ba8 Michael Hanselmann
442 be760ba8 Michael Hanselmann
  def release(self):
443 be760ba8 Michael Hanselmann
    assert self._acquired
444 be760ba8 Michael Hanselmann
    self._acquired = False
445 be760ba8 Michael Hanselmann
446 ebb2a2a3 Michael Hanselmann
  def UpdateJobUnlocked(self, job, replicate=True):
447 ebb2a2a3 Michael Hanselmann
    assert self._acquired, "Lock not acquired while updating job"
448 ebb2a2a3 Michael Hanselmann
    self._updates.append((job, bool(replicate)))
449 be760ba8 Michael Hanselmann
450 be760ba8 Michael Hanselmann
451 be760ba8 Michael Hanselmann
class _FakeExecOpCodeForProc:
452 ebb2a2a3 Michael Hanselmann
  def __init__(self, queue, before_start, after_start):
453 ebb2a2a3 Michael Hanselmann
    self._queue = queue
454 be760ba8 Michael Hanselmann
    self._before_start = before_start
455 be760ba8 Michael Hanselmann
    self._after_start = after_start
456 be760ba8 Michael Hanselmann
457 f23db633 Michael Hanselmann
  def __call__(self, op, cbs, timeout=None, priority=None):
458 be760ba8 Michael Hanselmann
    assert isinstance(op, opcodes.OpTestDummy)
459 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired(), \
460 ebb2a2a3 Michael Hanselmann
           "Queue lock not released when executing opcode"
461 be760ba8 Michael Hanselmann
462 be760ba8 Michael Hanselmann
    if self._before_start:
463 f23db633 Michael Hanselmann
      self._before_start(timeout, priority)
464 be760ba8 Michael Hanselmann
465 be760ba8 Michael Hanselmann
    cbs.NotifyStart()
466 be760ba8 Michael Hanselmann
467 be760ba8 Michael Hanselmann
    if self._after_start:
468 be760ba8 Michael Hanselmann
      self._after_start(op, cbs)
469 be760ba8 Michael Hanselmann
470 ebb2a2a3 Michael Hanselmann
    # Check again after the callbacks
471 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired()
472 ebb2a2a3 Michael Hanselmann
473 be760ba8 Michael Hanselmann
    if op.fail:
474 be760ba8 Michael Hanselmann
      raise errors.OpExecError("Error requested (%s)" % op.result)
475 be760ba8 Michael Hanselmann
476 be760ba8 Michael Hanselmann
    return op.result
477 be760ba8 Michael Hanselmann
478 be760ba8 Michael Hanselmann
479 26d3fd2f Michael Hanselmann
class _JobProcessorTestUtils:
480 be760ba8 Michael Hanselmann
  def _CreateJob(self, queue, job_id, ops):
481 be760ba8 Michael Hanselmann
    job = jqueue._QueuedJob(queue, job_id, ops)
482 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
483 be760ba8 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
484 be760ba8 Michael Hanselmann
    self.assertEqual(len(ops), len(job.ops))
485 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.input == inp
486 be760ba8 Michael Hanselmann
                            for (op, inp) in zip(job.ops, ops)))
487 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
488 be760ba8 Michael Hanselmann
    return job
489 be760ba8 Michael Hanselmann
490 26d3fd2f Michael Hanselmann
491 26d3fd2f Michael Hanselmann
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
492 be760ba8 Michael Hanselmann
  def _GenericCheckJob(self, job):
493 be760ba8 Michael Hanselmann
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
494 be760ba8 Michael Hanselmann
                      for op in job.ops)
495 be760ba8 Michael Hanselmann
496 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
497 be760ba8 Michael Hanselmann
                     [[op.start_timestamp for op in job.ops],
498 be760ba8 Michael Hanselmann
                      [op.exec_timestamp for op in job.ops],
499 be760ba8 Michael Hanselmann
                      [op.end_timestamp for op in job.ops]])
500 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
501 be760ba8 Michael Hanselmann
                     [job.received_timestamp,
502 be760ba8 Michael Hanselmann
                      job.start_timestamp,
503 be760ba8 Michael Hanselmann
                      job.end_timestamp])
504 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
505 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
506 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
507 be760ba8 Michael Hanselmann
508 be760ba8 Michael Hanselmann
  def testSuccess(self):
509 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
510 be760ba8 Michael Hanselmann
511 be760ba8 Michael Hanselmann
    for (job_id, opcount) in [(25351, 1), (6637, 3),
512 be760ba8 Michael Hanselmann
                              (24644, 10), (32207, 100)]:
513 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
514 be760ba8 Michael Hanselmann
             for i in range(opcount)]
515 be760ba8 Michael Hanselmann
516 be760ba8 Michael Hanselmann
      # Create job
517 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
518 be760ba8 Michael Hanselmann
519 f23db633 Michael Hanselmann
      def _BeforeStart(timeout, priority):
520 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
521 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
522 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
523 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
524 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
525 be760ba8 Michael Hanselmann
526 be760ba8 Michael Hanselmann
      def _AfterStart(op, cbs):
527 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
528 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
529 ebb2a2a3 Michael Hanselmann
530 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
531 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
532 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
533 be760ba8 Michael Hanselmann
534 be760ba8 Michael Hanselmann
        # Job is running, cancelling shouldn't be possible
535 be760ba8 Michael Hanselmann
        (success, _) = job.Cancel()
536 be760ba8 Michael Hanselmann
        self.assertFalse(success)
537 be760ba8 Michael Hanselmann
538 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
539 be760ba8 Michael Hanselmann
540 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
541 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
542 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
543 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
544 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
545 be760ba8 Michael Hanselmann
        if idx == len(ops) - 1:
546 be760ba8 Michael Hanselmann
          # Last opcode
547 be760ba8 Michael Hanselmann
          self.assert_(result)
548 be760ba8 Michael Hanselmann
        else:
549 be760ba8 Michael Hanselmann
          self.assertFalse(result)
550 be760ba8 Michael Hanselmann
551 be760ba8 Michael Hanselmann
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
552 be760ba8 Michael Hanselmann
          self.assert_(job.start_timestamp)
553 be760ba8 Michael Hanselmann
          self.assertFalse(job.end_timestamp)
554 be760ba8 Michael Hanselmann
555 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
556 ebb2a2a3 Michael Hanselmann
557 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
558 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
559 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opresult"]),
560 be760ba8 Michael Hanselmann
                       [[op.input.result for op in job.ops]])
561 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
562 be760ba8 Michael Hanselmann
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
563 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
564 be760ba8 Michael Hanselmann
                              for op in job.ops))
565 be760ba8 Michael Hanselmann
566 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
567 be760ba8 Michael Hanselmann
568 be760ba8 Michael Hanselmann
      # Finished jobs can't be processed any further
569 be760ba8 Michael Hanselmann
      self.assertRaises(errors.ProgrammerError,
570 be760ba8 Michael Hanselmann
                        jqueue._JobProcessor(queue, opexec, job))
571 be760ba8 Michael Hanselmann
572 be760ba8 Michael Hanselmann
  def testOpcodeError(self):
573 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
574 be760ba8 Michael Hanselmann
575 be760ba8 Michael Hanselmann
    testdata = [
576 be760ba8 Michael Hanselmann
      (17077, 1, 0, 0),
577 be760ba8 Michael Hanselmann
      (1782, 5, 2, 2),
578 be760ba8 Michael Hanselmann
      (18179, 10, 9, 9),
579 be760ba8 Michael Hanselmann
      (4744, 10, 3, 8),
580 be760ba8 Michael Hanselmann
      (23816, 100, 39, 45),
581 be760ba8 Michael Hanselmann
      ]
582 be760ba8 Michael Hanselmann
583 be760ba8 Michael Hanselmann
    for (job_id, opcount, failfrom, failto) in testdata:
584 be760ba8 Michael Hanselmann
      # Prepare opcodes
585 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
586 be760ba8 Michael Hanselmann
                                 fail=(failfrom <= i and
587 be760ba8 Michael Hanselmann
                                       i <= failto))
588 be760ba8 Michael Hanselmann
             for i in range(opcount)]
589 be760ba8 Michael Hanselmann
590 be760ba8 Michael Hanselmann
      # Create job
591 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
592 be760ba8 Michael Hanselmann
593 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, None, None)
594 be760ba8 Michael Hanselmann
595 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
596 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
597 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
598 ebb2a2a3 Michael Hanselmann
        # queued to waitlock
599 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
600 ebb2a2a3 Michael Hanselmann
        # waitlock to running
601 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
602 ebb2a2a3 Michael Hanselmann
        # Opcode result
603 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
604 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
605 be760ba8 Michael Hanselmann
606 be760ba8 Michael Hanselmann
        if idx in (failfrom, len(ops) - 1):
607 be760ba8 Michael Hanselmann
          # Last opcode
608 be760ba8 Michael Hanselmann
          self.assert_(result)
609 be760ba8 Michael Hanselmann
          break
610 be760ba8 Michael Hanselmann
611 be760ba8 Michael Hanselmann
        self.assertFalse(result)
612 be760ba8 Michael Hanselmann
613 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
614 be760ba8 Michael Hanselmann
615 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
616 ebb2a2a3 Michael Hanselmann
617 be760ba8 Michael Hanselmann
      # Check job status
618 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
619 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["id"]), [job_id])
620 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
621 be760ba8 Michael Hanselmann
622 be760ba8 Michael Hanselmann
      # Check opcode status
623 be760ba8 Michael Hanselmann
      data = zip(job.ops,
624 be760ba8 Michael Hanselmann
                 job.GetInfo(["opstatus"])[0],
625 be760ba8 Michael Hanselmann
                 job.GetInfo(["opresult"])[0])
626 be760ba8 Michael Hanselmann
627 be760ba8 Michael Hanselmann
      for idx, (op, opstatus, opresult) in enumerate(data):
628 be760ba8 Michael Hanselmann
        if idx < failfrom:
629 be760ba8 Michael Hanselmann
          assert not op.input.fail
630 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
631 be760ba8 Michael Hanselmann
          self.assertEqual(opresult, op.input.result)
632 be760ba8 Michael Hanselmann
        elif idx <= failto:
633 be760ba8 Michael Hanselmann
          assert op.input.fail
634 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
635 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
636 be760ba8 Michael Hanselmann
        else:
637 be760ba8 Michael Hanselmann
          assert not op.input.fail
638 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
639 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
640 be760ba8 Michael Hanselmann
641 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
642 be760ba8 Michael Hanselmann
                              for op in job.ops[:failfrom]))
643 be760ba8 Michael Hanselmann
644 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
645 be760ba8 Michael Hanselmann
646 be760ba8 Michael Hanselmann
      # Finished jobs can't be processed any further
647 be760ba8 Michael Hanselmann
      self.assertRaises(errors.ProgrammerError,
648 be760ba8 Michael Hanselmann
                        jqueue._JobProcessor(queue, opexec, job))
649 be760ba8 Michael Hanselmann
650 be760ba8 Michael Hanselmann
  def testCancelWhileInQueue(self):
651 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
652 be760ba8 Michael Hanselmann
653 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
654 be760ba8 Michael Hanselmann
           for i in range(5)]
655 be760ba8 Michael Hanselmann
656 be760ba8 Michael Hanselmann
    # Create job
657 be760ba8 Michael Hanselmann
    job_id = 17045
658 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
659 be760ba8 Michael Hanselmann
660 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
661 be760ba8 Michael Hanselmann
662 be760ba8 Michael Hanselmann
    # Mark as cancelled
663 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
664 be760ba8 Michael Hanselmann
    self.assert_(success)
665 be760ba8 Michael Hanselmann
666 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
667 ebb2a2a3 Michael Hanselmann
668 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
669 be760ba8 Michael Hanselmann
                            for op in job.ops))
670 be760ba8 Michael Hanselmann
671 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
672 30c945d0 Michael Hanselmann
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
673 30c945d0 Michael Hanselmann
674 30c945d0 Michael Hanselmann
    # Check result
675 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
676 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
677 30c945d0 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
678 30c945d0 Michael Hanselmann
    self.assert_(job.end_timestamp)
679 30c945d0 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
680 30c945d0 Michael Hanselmann
                                for op in job.ops))
681 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
682 30c945d0 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
683 30c945d0 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
684 30c945d0 Michael Hanselmann
685 30c945d0 Michael Hanselmann
  def testCancelWhileWaitlockInQueue(self):
686 30c945d0 Michael Hanselmann
    queue = _FakeQueueForProc()
687 30c945d0 Michael Hanselmann
688 30c945d0 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
689 30c945d0 Michael Hanselmann
           for i in range(5)]
690 30c945d0 Michael Hanselmann
691 30c945d0 Michael Hanselmann
    # Create job
692 30c945d0 Michael Hanselmann
    job_id = 8645
693 30c945d0 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
694 30c945d0 Michael Hanselmann
695 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
696 30c945d0 Michael Hanselmann
697 30c945d0 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_WAITLOCK
698 30c945d0 Michael Hanselmann
699 30c945d0 Michael Hanselmann
    assert len(job.ops) == 5
700 30c945d0 Michael Hanselmann
701 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
702 30c945d0 Michael Hanselmann
703 30c945d0 Michael Hanselmann
    # Mark as cancelling
704 30c945d0 Michael Hanselmann
    (success, _) = job.Cancel()
705 30c945d0 Michael Hanselmann
    self.assert_(success)
706 30c945d0 Michael Hanselmann
707 30c945d0 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
708 30c945d0 Michael Hanselmann
709 30c945d0 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
710 30c945d0 Michael Hanselmann
                            for op in job.ops))
711 30c945d0 Michael Hanselmann
712 30c945d0 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
713 30c945d0 Michael Hanselmann
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
714 be760ba8 Michael Hanselmann
715 be760ba8 Michael Hanselmann
    # Check result
716 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
717 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
718 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
719 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
720 be760ba8 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
721 be760ba8 Michael Hanselmann
                                for op in job.ops))
722 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
723 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
724 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
725 be760ba8 Michael Hanselmann
726 be760ba8 Michael Hanselmann
  def testCancelWhileWaitlock(self):
727 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
728 be760ba8 Michael Hanselmann
729 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
730 be760ba8 Michael Hanselmann
           for i in range(5)]
731 be760ba8 Michael Hanselmann
732 be760ba8 Michael Hanselmann
    # Create job
733 be760ba8 Michael Hanselmann
    job_id = 11009
734 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
735 be760ba8 Michael Hanselmann
736 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
737 be760ba8 Michael Hanselmann
738 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
739 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
740 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
741 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
742 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
743 be760ba8 Michael Hanselmann
744 be760ba8 Michael Hanselmann
      # Mark as cancelled
745 be760ba8 Michael Hanselmann
      (success, _) = job.Cancel()
746 be760ba8 Michael Hanselmann
      self.assert_(success)
747 be760ba8 Michael Hanselmann
748 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
749 be760ba8 Michael Hanselmann
                              for op in job.ops))
750 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
751 be760ba8 Michael Hanselmann
752 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
753 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
754 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
755 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
756 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
757 be760ba8 Michael Hanselmann
758 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
759 be760ba8 Michael Hanselmann
760 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
761 ebb2a2a3 Michael Hanselmann
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
762 ebb2a2a3 Michael Hanselmann
    self.assertEqual(queue.GetNextUpdate(), (job, True))
763 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
764 be760ba8 Michael Hanselmann
765 be760ba8 Michael Hanselmann
    # Check result
766 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
767 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
768 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
769 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
770 be760ba8 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
771 be760ba8 Michael Hanselmann
                                for op in job.ops))
772 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
773 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
774 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
775 be760ba8 Michael Hanselmann
776 9e49dfc5 Michael Hanselmann
  def testCancelWhileWaitlockWithTimeout(self):
777 9e49dfc5 Michael Hanselmann
    queue = _FakeQueueForProc()
778 9e49dfc5 Michael Hanselmann
779 9e49dfc5 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
780 9e49dfc5 Michael Hanselmann
           for i in range(5)]
781 9e49dfc5 Michael Hanselmann
782 9e49dfc5 Michael Hanselmann
    # Create job
783 9e49dfc5 Michael Hanselmann
    job_id = 24314
784 9e49dfc5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
785 9e49dfc5 Michael Hanselmann
786 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
787 9e49dfc5 Michael Hanselmann
788 9e49dfc5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
789 9e49dfc5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
790 9e49dfc5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
791 9e49dfc5 Michael Hanselmann
792 9e49dfc5 Michael Hanselmann
      # Mark as cancelled
793 9e49dfc5 Michael Hanselmann
      (success, _) = job.Cancel()
794 9e49dfc5 Michael Hanselmann
      self.assert_(success)
795 9e49dfc5 Michael Hanselmann
796 9e49dfc5 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
797 9e49dfc5 Michael Hanselmann
                              for op in job.ops))
798 9e49dfc5 Michael Hanselmann
799 9e49dfc5 Michael Hanselmann
      # Fake an acquire attempt timing out
800 9e49dfc5 Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
801 9e49dfc5 Michael Hanselmann
802 9e49dfc5 Michael Hanselmann
    def _AfterStart(op, cbs):
803 9e49dfc5 Michael Hanselmann
      self.fail("Should not reach this")
804 9e49dfc5 Michael Hanselmann
805 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
806 9e49dfc5 Michael Hanselmann
807 9e49dfc5 Michael Hanselmann
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
808 9e49dfc5 Michael Hanselmann
809 9e49dfc5 Michael Hanselmann
    # Check result
810 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
811 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
812 9e49dfc5 Michael Hanselmann
    self.assert_(job.start_timestamp)
813 9e49dfc5 Michael Hanselmann
    self.assert_(job.end_timestamp)
814 9e49dfc5 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
815 9e49dfc5 Michael Hanselmann
                                for op in job.ops))
816 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
817 9e49dfc5 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
818 9e49dfc5 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
819 9e49dfc5 Michael Hanselmann
820 be760ba8 Michael Hanselmann
  def testCancelWhileRunning(self):
821 be760ba8 Michael Hanselmann
    # Tests canceling a job with finished opcodes and more, unprocessed ones
822 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
823 be760ba8 Michael Hanselmann
824 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
825 be760ba8 Michael Hanselmann
           for i in range(3)]
826 be760ba8 Michael Hanselmann
827 be760ba8 Michael Hanselmann
    # Create job
828 be760ba8 Michael Hanselmann
    job_id = 28492
829 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
830 be760ba8 Michael Hanselmann
831 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
832 be760ba8 Michael Hanselmann
833 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
834 be760ba8 Michael Hanselmann
835 be760ba8 Michael Hanselmann
    # Run one opcode
836 be760ba8 Michael Hanselmann
    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
837 be760ba8 Michael Hanselmann
838 be760ba8 Michael Hanselmann
    # Job goes back to queued
839 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
840 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
841 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
842 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
843 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
844 be760ba8 Michael Hanselmann
                      ["Res0", None, None]])
845 be760ba8 Michael Hanselmann
846 be760ba8 Michael Hanselmann
    # Mark as cancelled
847 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
848 be760ba8 Michael Hanselmann
    self.assert_(success)
849 be760ba8 Michael Hanselmann
850 be760ba8 Michael Hanselmann
    # Try processing another opcode (this will actually cancel the job)
851 be760ba8 Michael Hanselmann
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
852 be760ba8 Michael Hanselmann
853 be760ba8 Michael Hanselmann
    # Check result
854 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
855 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
856 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
857 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
858 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
859 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
860 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
861 be760ba8 Michael Hanselmann
                      ["Res0", "Job canceled by request",
862 be760ba8 Michael Hanselmann
                       "Job canceled by request"]])
863 be760ba8 Michael Hanselmann
864 be760ba8 Michael Hanselmann
  def testPartiallyRun(self):
865 be760ba8 Michael Hanselmann
    # Tests calling the processor on a job that's been partially run before the
866 be760ba8 Michael Hanselmann
    # program was restarted
867 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
868 be760ba8 Michael Hanselmann
869 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
870 be760ba8 Michael Hanselmann
871 be760ba8 Michael Hanselmann
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
872 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
873 be760ba8 Michael Hanselmann
             for i in range(10)]
874 be760ba8 Michael Hanselmann
875 be760ba8 Michael Hanselmann
      # Create job
876 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
877 be760ba8 Michael Hanselmann
878 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
879 be760ba8 Michael Hanselmann
880 be760ba8 Michael Hanselmann
      for _ in range(successcount):
881 be760ba8 Michael Hanselmann
        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
882 be760ba8 Michael Hanselmann
883 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
884 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
885 be760ba8 Michael Hanselmann
                       [[constants.OP_STATUS_SUCCESS
886 be760ba8 Michael Hanselmann
                         for _ in range(successcount)] +
887 be760ba8 Michael Hanselmann
                        [constants.OP_STATUS_QUEUED
888 be760ba8 Michael Hanselmann
                         for _ in range(len(ops) - successcount)]])
889 be760ba8 Michael Hanselmann
890 03b63608 Michael Hanselmann
      self.assert_(job.ops_iter)
891 be760ba8 Michael Hanselmann
892 be760ba8 Michael Hanselmann
      # Serialize and restore (simulates program restart)
893 be760ba8 Michael Hanselmann
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
894 03b63608 Michael Hanselmann
      self.assertFalse(newjob.ops_iter)
895 be760ba8 Michael Hanselmann
      self._TestPartial(newjob, successcount)
896 be760ba8 Michael Hanselmann
897 be760ba8 Michael Hanselmann
  def _TestPartial(self, job, successcount):
898 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
899 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
900 be760ba8 Michael Hanselmann
901 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
902 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
903 be760ba8 Michael Hanselmann
904 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops) - successcount)):
905 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
906 be760ba8 Michael Hanselmann
907 be760ba8 Michael Hanselmann
      if remaining == 0:
908 be760ba8 Michael Hanselmann
        # Last opcode
909 be760ba8 Michael Hanselmann
        self.assert_(result)
910 be760ba8 Michael Hanselmann
        break
911 be760ba8 Michael Hanselmann
912 be760ba8 Michael Hanselmann
      self.assertFalse(result)
913 be760ba8 Michael Hanselmann
914 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
915 be760ba8 Michael Hanselmann
916 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
917 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
918 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
919 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
920 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
921 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
922 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
923 be760ba8 Michael Hanselmann
                            for op in job.ops))
924 be760ba8 Michael Hanselmann
925 be760ba8 Michael Hanselmann
    self._GenericCheckJob(job)
926 be760ba8 Michael Hanselmann
927 be760ba8 Michael Hanselmann
    # Finished jobs can't be processed any further
928 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
929 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job))
930 be760ba8 Michael Hanselmann
931 be760ba8 Michael Hanselmann
    # ... also after being restored
932 be760ba8 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
933 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
934 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job2))
935 be760ba8 Michael Hanselmann
936 be760ba8 Michael Hanselmann
  def testProcessorOnRunningJob(self):
937 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
938 be760ba8 Michael Hanselmann
939 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
940 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
941 be760ba8 Michael Hanselmann
942 be760ba8 Michael Hanselmann
    # Create job
943 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 9571, ops)
944 be760ba8 Michael Hanselmann
945 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
946 be760ba8 Michael Hanselmann
947 be760ba8 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
948 be760ba8 Michael Hanselmann
949 be760ba8 Michael Hanselmann
    assert len(job.ops) == 1
950 be760ba8 Michael Hanselmann
951 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
952 be760ba8 Michael Hanselmann
953 be760ba8 Michael Hanselmann
    # Calling on running job must fail
954 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
955 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job))
956 be760ba8 Michael Hanselmann
957 be760ba8 Michael Hanselmann
  def testLogMessages(self):
958 be760ba8 Michael Hanselmann
    # Tests the "Feedback" callback function
959 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
960 be760ba8 Michael Hanselmann
961 be760ba8 Michael Hanselmann
    messages = {
962 be760ba8 Michael Hanselmann
      1: [
963 be760ba8 Michael Hanselmann
        (None, "Hello"),
964 be760ba8 Michael Hanselmann
        (None, "World"),
965 be760ba8 Michael Hanselmann
        (constants.ELOG_MESSAGE, "there"),
966 be760ba8 Michael Hanselmann
        ],
967 be760ba8 Michael Hanselmann
      4: [
968 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
969 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
970 be760ba8 Michael Hanselmann
        ],
971 be760ba8 Michael Hanselmann
      }
972 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
973 be760ba8 Michael Hanselmann
                               messages=messages.get(i, []))
974 be760ba8 Michael Hanselmann
           for i in range(5)]
975 be760ba8 Michael Hanselmann
976 be760ba8 Michael Hanselmann
    # Create job
977 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 29386, ops)
978 be760ba8 Michael Hanselmann
979 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
980 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
981 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
982 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
983 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
984 be760ba8 Michael Hanselmann
985 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
986 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
987 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
988 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
989 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
990 be760ba8 Michael Hanselmann
991 be760ba8 Michael Hanselmann
      self.assertRaises(AssertionError, cbs.Feedback,
992 be760ba8 Michael Hanselmann
                        "too", "many", "arguments")
993 be760ba8 Michael Hanselmann
994 be760ba8 Michael Hanselmann
      for (log_type, msg) in op.messages:
995 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
996 be760ba8 Michael Hanselmann
        if log_type:
997 be760ba8 Michael Hanselmann
          cbs.Feedback(log_type, msg)
998 be760ba8 Michael Hanselmann
        else:
999 be760ba8 Michael Hanselmann
          cbs.Feedback(msg)
1000 ebb2a2a3 Michael Hanselmann
        # Check for job update without replication
1001 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1002 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1003 be760ba8 Michael Hanselmann
1004 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1005 be760ba8 Michael Hanselmann
1006 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops))):
1007 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1008 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1009 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1010 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1011 be760ba8 Michael Hanselmann
1012 be760ba8 Michael Hanselmann
      if remaining == 0:
1013 be760ba8 Michael Hanselmann
        # Last opcode
1014 be760ba8 Michael Hanselmann
        self.assert_(result)
1015 be760ba8 Michael Hanselmann
        break
1016 be760ba8 Michael Hanselmann
1017 be760ba8 Michael Hanselmann
      self.assertFalse(result)
1018 be760ba8 Michael Hanselmann
1019 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1020 be760ba8 Michael Hanselmann
1021 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1022 ebb2a2a3 Michael Hanselmann
1023 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1024 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1025 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1026 be760ba8 Michael Hanselmann
1027 be760ba8 Michael Hanselmann
    logmsgcount = sum(len(m) for m in messages.values())
1028 be760ba8 Michael Hanselmann
1029 be760ba8 Michael Hanselmann
    self._CheckLogMessages(job, logmsgcount)
1030 be760ba8 Michael Hanselmann
1031 be760ba8 Michael Hanselmann
    # Serialize and restore (simulates program restart)
1032 be760ba8 Michael Hanselmann
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
1033 be760ba8 Michael Hanselmann
    self._CheckLogMessages(newjob, logmsgcount)
1034 be760ba8 Michael Hanselmann
1035 be760ba8 Michael Hanselmann
    # Check each message
1036 be760ba8 Michael Hanselmann
    prevserial = -1
1037 be760ba8 Michael Hanselmann
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1038 be760ba8 Michael Hanselmann
      for (serial, timestamp, log_type, msg) in oplog:
1039 be760ba8 Michael Hanselmann
        (exptype, expmsg) = messages.get(idx).pop(0)
1040 be760ba8 Michael Hanselmann
        if exptype:
1041 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, exptype)
1042 be760ba8 Michael Hanselmann
        else:
1043 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1044 be760ba8 Michael Hanselmann
        self.assertEqual(expmsg, msg)
1045 be760ba8 Michael Hanselmann
        self.assert_(serial > prevserial)
1046 be760ba8 Michael Hanselmann
        prevserial = serial
1047 be760ba8 Michael Hanselmann
1048 be760ba8 Michael Hanselmann
  def _CheckLogMessages(self, job, count):
1049 be760ba8 Michael Hanselmann
    # Check serial
1050 be760ba8 Michael Hanselmann
    self.assertEqual(job.log_serial, count)
1051 be760ba8 Michael Hanselmann
1052 be760ba8 Michael Hanselmann
    # No filter
1053 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(None),
1054 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1055 be760ba8 Michael Hanselmann
                      for entry in entries])
1056 be760ba8 Michael Hanselmann
1057 be760ba8 Michael Hanselmann
    # Filter with serial
1058 be760ba8 Michael Hanselmann
    assert count > 3
1059 be760ba8 Michael Hanselmann
    self.assert_(job.GetLogEntries(3))
1060 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(3),
1061 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1062 be760ba8 Michael Hanselmann
                      for entry in entries][3:])
1063 be760ba8 Michael Hanselmann
1064 be760ba8 Michael Hanselmann
    # No log message after highest serial
1065 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count))
1066 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count + 3))
1067 be760ba8 Michael Hanselmann
1068 be760ba8 Michael Hanselmann
1069 26d3fd2f Michael Hanselmann
class _FakeTimeoutStrategy:
1070 26d3fd2f Michael Hanselmann
  def __init__(self, timeouts):
1071 26d3fd2f Michael Hanselmann
    self.timeouts = timeouts
1072 26d3fd2f Michael Hanselmann
    self.attempts = 0
1073 26d3fd2f Michael Hanselmann
    self.last_timeout = None
1074 26d3fd2f Michael Hanselmann
1075 26d3fd2f Michael Hanselmann
  def NextAttempt(self):
1076 26d3fd2f Michael Hanselmann
    self.attempts += 1
1077 26d3fd2f Michael Hanselmann
    if self.timeouts:
1078 26d3fd2f Michael Hanselmann
      timeout = self.timeouts.pop(0)
1079 26d3fd2f Michael Hanselmann
    else:
1080 26d3fd2f Michael Hanselmann
      timeout = None
1081 26d3fd2f Michael Hanselmann
    self.last_timeout = timeout
1082 26d3fd2f Michael Hanselmann
    return timeout
1083 26d3fd2f Michael Hanselmann
1084 26d3fd2f Michael Hanselmann
1085 26d3fd2f Michael Hanselmann
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1086 26d3fd2f Michael Hanselmann
  def setUp(self):
1087 26d3fd2f Michael Hanselmann
    self.queue = _FakeQueueForProc()
1088 26d3fd2f Michael Hanselmann
    self.job = None
1089 26d3fd2f Michael Hanselmann
    self.curop = None
1090 26d3fd2f Michael Hanselmann
    self.opcounter = None
1091 26d3fd2f Michael Hanselmann
    self.timeout_strategy = None
1092 26d3fd2f Michael Hanselmann
    self.retries = 0
1093 26d3fd2f Michael Hanselmann
    self.prev_tsop = None
1094 26d3fd2f Michael Hanselmann
    self.prev_prio = None
1095 5fd6b694 Michael Hanselmann
    self.prev_status = None
1096 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = None
1097 26d3fd2f Michael Hanselmann
    self.gave_lock = None
1098 26d3fd2f Michael Hanselmann
    self.done_lock_before_blocking = False
1099 26d3fd2f Michael Hanselmann
1100 f23db633 Michael Hanselmann
  def _BeforeStart(self, timeout, priority):
1101 26d3fd2f Michael Hanselmann
    job = self.job
1102 26d3fd2f Michael Hanselmann
1103 5fd6b694 Michael Hanselmann
    # If status has changed, job must've been written
1104 5fd6b694 Michael Hanselmann
    if self.prev_status != self.job.ops[self.curop].status:
1105 5fd6b694 Michael Hanselmann
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1106 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1107 5fd6b694 Michael Hanselmann
1108 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
1109 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1110 26d3fd2f Michael Hanselmann
1111 26d3fd2f Michael Hanselmann
    ts = self.timeout_strategy
1112 26d3fd2f Michael Hanselmann
1113 26d3fd2f Michael Hanselmann
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
1114 26d3fd2f Michael Hanselmann
    self.assertEqual(timeout, ts.last_timeout)
1115 f23db633 Michael Hanselmann
    self.assertEqual(priority, job.ops[self.curop].priority)
1116 26d3fd2f Michael Hanselmann
1117 26d3fd2f Michael Hanselmann
    self.gave_lock = True
1118 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = priority
1119 26d3fd2f Michael Hanselmann
1120 26d3fd2f Michael Hanselmann
    if (self.curop == 3 and
1121 26d3fd2f Michael Hanselmann
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1122 26d3fd2f Michael Hanselmann
      # Give locks before running into blocking acquire
1123 26d3fd2f Michael Hanselmann
      assert self.retries == 7
1124 26d3fd2f Michael Hanselmann
      self.retries = 0
1125 26d3fd2f Michael Hanselmann
      self.done_lock_before_blocking = True
1126 26d3fd2f Michael Hanselmann
      return
1127 26d3fd2f Michael Hanselmann
1128 26d3fd2f Michael Hanselmann
    if self.retries > 0:
1129 26d3fd2f Michael Hanselmann
      self.assert_(timeout is not None)
1130 26d3fd2f Michael Hanselmann
      self.retries -= 1
1131 26d3fd2f Michael Hanselmann
      self.gave_lock = False
1132 26d3fd2f Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
1133 26d3fd2f Michael Hanselmann
1134 26d3fd2f Michael Hanselmann
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1135 26d3fd2f Michael Hanselmann
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1136 26d3fd2f Michael Hanselmann
      assert not ts.timeouts
1137 26d3fd2f Michael Hanselmann
      self.assert_(timeout is None)
1138 26d3fd2f Michael Hanselmann
1139 26d3fd2f Michael Hanselmann
  def _AfterStart(self, op, cbs):
1140 26d3fd2f Michael Hanselmann
    job = self.job
1141 26d3fd2f Michael Hanselmann
1142 5fd6b694 Michael Hanselmann
    # Setting to "running" requires an update
1143 ebb2a2a3 Michael Hanselmann
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1144 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1145 5fd6b694 Michael Hanselmann
1146 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
1147 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1148 26d3fd2f Michael Hanselmann
1149 26d3fd2f Michael Hanselmann
    # Job is running, cancelling shouldn't be possible
1150 26d3fd2f Michael Hanselmann
    (success, _) = job.Cancel()
1151 26d3fd2f Michael Hanselmann
    self.assertFalse(success)
1152 26d3fd2f Michael Hanselmann
1153 26d3fd2f Michael Hanselmann
  def _NextOpcode(self):
1154 26d3fd2f Michael Hanselmann
    self.curop = self.opcounter.next()
1155 26d3fd2f Michael Hanselmann
    self.prev_prio = self.job.ops[self.curop].priority
1156 5fd6b694 Michael Hanselmann
    self.prev_status = self.job.ops[self.curop].status
1157 26d3fd2f Michael Hanselmann
1158 26d3fd2f Michael Hanselmann
  def _NewTimeoutStrategy(self):
1159 26d3fd2f Michael Hanselmann
    job = self.job
1160 26d3fd2f Michael Hanselmann
1161 26d3fd2f Michael Hanselmann
    self.assertEqual(self.retries, 0)
1162 26d3fd2f Michael Hanselmann
1163 26d3fd2f Michael Hanselmann
    if self.prev_tsop == self.curop:
1164 26d3fd2f Michael Hanselmann
      # Still on the same opcode, priority must've been increased
1165 26d3fd2f Michael Hanselmann
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1166 26d3fd2f Michael Hanselmann
1167 26d3fd2f Michael Hanselmann
    if self.curop == 1:
1168 26d3fd2f Michael Hanselmann
      # Normal retry
1169 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
1170 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts) - 1
1171 26d3fd2f Michael Hanselmann
1172 26d3fd2f Michael Hanselmann
    elif self.curop == 2:
1173 26d3fd2f Michael Hanselmann
      # Let this run into a blocking acquire
1174 26d3fd2f Michael Hanselmann
      timeouts = range(11, 61, 12)
1175 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1176 26d3fd2f Michael Hanselmann
1177 26d3fd2f Michael Hanselmann
    elif self.curop == 3:
1178 26d3fd2f Michael Hanselmann
      # Wait for priority to increase, but give lock before blocking acquire
1179 26d3fd2f Michael Hanselmann
      timeouts = range(12, 100, 14)
1180 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1181 26d3fd2f Michael Hanselmann
1182 26d3fd2f Michael Hanselmann
      self.assertFalse(self.done_lock_before_blocking)
1183 26d3fd2f Michael Hanselmann
1184 26d3fd2f Michael Hanselmann
    elif self.curop == 4:
1185 26d3fd2f Michael Hanselmann
      self.assert_(self.done_lock_before_blocking)
1186 26d3fd2f Michael Hanselmann
1187 26d3fd2f Michael Hanselmann
      # Timeouts, but no need to retry
1188 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
1189 26d3fd2f Michael Hanselmann
      self.retries = 0
1190 26d3fd2f Michael Hanselmann
1191 26d3fd2f Michael Hanselmann
    elif self.curop == 5:
1192 26d3fd2f Michael Hanselmann
      # Normal retry
1193 26d3fd2f Michael Hanselmann
      timeouts = range(19, 100, 11)
1194 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1195 26d3fd2f Michael Hanselmann
1196 26d3fd2f Michael Hanselmann
    else:
1197 26d3fd2f Michael Hanselmann
      timeouts = []
1198 26d3fd2f Michael Hanselmann
      self.retries = 0
1199 26d3fd2f Michael Hanselmann
1200 26d3fd2f Michael Hanselmann
    assert len(job.ops) == 10
1201 26d3fd2f Michael Hanselmann
    assert self.retries <= len(timeouts)
1202 26d3fd2f Michael Hanselmann
1203 26d3fd2f Michael Hanselmann
    ts = _FakeTimeoutStrategy(timeouts)
1204 26d3fd2f Michael Hanselmann
1205 26d3fd2f Michael Hanselmann
    self.timeout_strategy = ts
1206 26d3fd2f Michael Hanselmann
    self.prev_tsop = self.curop
1207 26d3fd2f Michael Hanselmann
    self.prev_prio = job.ops[self.curop].priority
1208 26d3fd2f Michael Hanselmann
1209 26d3fd2f Michael Hanselmann
    return ts
1210 26d3fd2f Michael Hanselmann
1211 26d3fd2f Michael Hanselmann
  def testTimeout(self):
1212 26d3fd2f Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1213 26d3fd2f Michael Hanselmann
           for i in range(10)]
1214 26d3fd2f Michael Hanselmann
1215 26d3fd2f Michael Hanselmann
    # Create job
1216 26d3fd2f Michael Hanselmann
    job_id = 15801
1217 26d3fd2f Michael Hanselmann
    job = self._CreateJob(self.queue, job_id, ops)
1218 26d3fd2f Michael Hanselmann
    self.job = job
1219 26d3fd2f Michael Hanselmann
1220 26d3fd2f Michael Hanselmann
    self.opcounter = itertools.count(0)
1221 26d3fd2f Michael Hanselmann
1222 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1223 ebb2a2a3 Michael Hanselmann
                                    self._AfterStart)
1224 26d3fd2f Michael Hanselmann
    tsf = self._NewTimeoutStrategy
1225 26d3fd2f Michael Hanselmann
1226 26d3fd2f Michael Hanselmann
    self.assertFalse(self.done_lock_before_blocking)
1227 26d3fd2f Michael Hanselmann
1228 5fd6b694 Michael Hanselmann
    while True:
1229 26d3fd2f Michael Hanselmann
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1230 26d3fd2f Michael Hanselmann
                                  _timeout_strategy_factory=tsf)
1231 26d3fd2f Michael Hanselmann
1232 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1233 5fd6b694 Michael Hanselmann
1234 5fd6b694 Michael Hanselmann
      if self.curop is not None:
1235 5fd6b694 Michael Hanselmann
        self.prev_status = self.job.ops[self.curop].status
1236 5fd6b694 Michael Hanselmann
1237 5fd6b694 Michael Hanselmann
      self.lock_acq_prio = None
1238 5fd6b694 Michael Hanselmann
1239 26d3fd2f Michael Hanselmann
      result = proc(_nextop_fn=self._NextOpcode)
1240 5fd6b694 Michael Hanselmann
      assert self.curop is not None
1241 5fd6b694 Michael Hanselmann
1242 5fd6b694 Michael Hanselmann
      if result or self.gave_lock:
1243 5fd6b694 Michael Hanselmann
        # Got lock and/or job is done, result must've been written
1244 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1245 5fd6b694 Michael Hanselmann
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1246 5fd6b694 Michael Hanselmann
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
1247 5fd6b694 Michael Hanselmann
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1248 5fd6b694 Michael Hanselmann
        self.assert_(job.ops[self.curop].exec_timestamp)
1249 5fd6b694 Michael Hanselmann
1250 26d3fd2f Michael Hanselmann
      if result:
1251 26d3fd2f Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1252 26d3fd2f Michael Hanselmann
        break
1253 26d3fd2f Michael Hanselmann
1254 26d3fd2f Michael Hanselmann
      self.assertFalse(result)
1255 26d3fd2f Michael Hanselmann
1256 5fd6b694 Michael Hanselmann
      if self.curop == 0:
1257 5fd6b694 Michael Hanselmann
        self.assertEqual(job.ops[self.curop].start_timestamp,
1258 5fd6b694 Michael Hanselmann
                         job.start_timestamp)
1259 5fd6b694 Michael Hanselmann
1260 26d3fd2f Michael Hanselmann
      if self.gave_lock:
1261 5fd6b694 Michael Hanselmann
        # Opcode finished, but job not yet done
1262 5fd6b694 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1263 26d3fd2f Michael Hanselmann
      else:
1264 5fd6b694 Michael Hanselmann
        # Did not get locks
1265 26d3fd2f Michael Hanselmann
        self.assert_(job.cur_opctx)
1266 26d3fd2f Michael Hanselmann
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1267 26d3fd2f Michael Hanselmann
                         self.timeout_strategy.NextAttempt)
1268 5fd6b694 Michael Hanselmann
        self.assertFalse(job.ops[self.curop].exec_timestamp)
1269 5fd6b694 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1270 5fd6b694 Michael Hanselmann
1271 5fd6b694 Michael Hanselmann
        # If priority has changed since acquiring locks, the job must've been
1272 5fd6b694 Michael Hanselmann
        # updated
1273 5fd6b694 Michael Hanselmann
        if self.lock_acq_prio != job.ops[self.curop].priority:
1274 5fd6b694 Michael Hanselmann
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1275 5fd6b694 Michael Hanselmann
1276 5fd6b694 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1277 26d3fd2f Michael Hanselmann
1278 26d3fd2f Michael Hanselmann
      self.assert_(job.start_timestamp)
1279 26d3fd2f Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1280 26d3fd2f Michael Hanselmann
1281 26d3fd2f Michael Hanselmann
    self.assertEqual(self.curop, len(job.ops) - 1)
1282 26d3fd2f Michael Hanselmann
    self.assertEqual(self.job, job)
1283 26d3fd2f Michael Hanselmann
    self.assertEqual(self.opcounter.next(), len(job.ops))
1284 26d3fd2f Michael Hanselmann
    self.assert_(self.done_lock_before_blocking)
1285 26d3fd2f Michael Hanselmann
1286 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1287 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1288 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1289 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1290 26d3fd2f Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1291 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1292 26d3fd2f Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1293 26d3fd2f Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1294 26d3fd2f Michael Hanselmann
                            for op in job.ops))
1295 26d3fd2f Michael Hanselmann
1296 26d3fd2f Michael Hanselmann
    # Finished jobs can't be processed any further
1297 26d3fd2f Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
1298 26d3fd2f Michael Hanselmann
                      jqueue._JobProcessor(self.queue, opexec, job))
1299 26d3fd2f Michael Hanselmann
1300 26d3fd2f Michael Hanselmann
1301 989a8bee Michael Hanselmann
if __name__ == "__main__":
1302 989a8bee Michael Hanselmann
  testutils.GanetiTestProgram()