Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ cefd4a4a

History | View | Annotate | Download (94.7 kB)

1 989a8bee Michael Hanselmann
#!/usr/bin/python
2 989a8bee Michael Hanselmann
#
3 989a8bee Michael Hanselmann
4 76b62028 Iustin Pop
# Copyright (C) 2010, 2011, 2012 Google Inc.
5 989a8bee Michael Hanselmann
#
6 989a8bee Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 989a8bee Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 989a8bee Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 989a8bee Michael Hanselmann
# (at your option) any later version.
10 989a8bee Michael Hanselmann
#
11 989a8bee Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 989a8bee Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 989a8bee Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 989a8bee Michael Hanselmann
# General Public License for more details.
15 989a8bee Michael Hanselmann
#
16 989a8bee Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 989a8bee Michael Hanselmann
# along with this program; if not, write to the Free Software
18 989a8bee Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 989a8bee Michael Hanselmann
# 02110-1301, USA.
20 989a8bee Michael Hanselmann
21 989a8bee Michael Hanselmann
22 989a8bee Michael Hanselmann
"""Script for testing ganeti.jqueue"""
23 989a8bee Michael Hanselmann
24 989a8bee Michael Hanselmann
import os
25 989a8bee Michael Hanselmann
import sys
26 989a8bee Michael Hanselmann
import unittest
27 989a8bee Michael Hanselmann
import tempfile
28 989a8bee Michael Hanselmann
import shutil
29 989a8bee Michael Hanselmann
import errno
30 26d3fd2f Michael Hanselmann
import itertools
31 fcb21ad7 Michael Hanselmann
import random
32 4679547e Michael Hanselmann
import operator
33 989a8bee Michael Hanselmann
34 989a8bee Michael Hanselmann
from ganeti import constants
35 989a8bee Michael Hanselmann
from ganeti import utils
36 989a8bee Michael Hanselmann
from ganeti import errors
37 989a8bee Michael Hanselmann
from ganeti import jqueue
38 8f5c488d Michael Hanselmann
from ganeti import opcodes
39 8f5c488d Michael Hanselmann
from ganeti import compat
40 26d3fd2f Michael Hanselmann
from ganeti import mcpu
41 fcb21ad7 Michael Hanselmann
from ganeti import query
42 df5a5730 Michael Hanselmann
from ganeti import workerpool
43 989a8bee Michael Hanselmann
44 989a8bee Michael Hanselmann
import testutils
45 989a8bee Michael Hanselmann
46 989a8bee Michael Hanselmann
47 989a8bee Michael Hanselmann
class _FakeJob:
48 989a8bee Michael Hanselmann
  def __init__(self, job_id, status):
49 989a8bee Michael Hanselmann
    self.id = job_id
50 c0f6d0d8 Michael Hanselmann
    self.writable = False
51 989a8bee Michael Hanselmann
    self._status = status
52 989a8bee Michael Hanselmann
    self._log = []
53 989a8bee Michael Hanselmann
54 989a8bee Michael Hanselmann
  def SetStatus(self, status):
55 989a8bee Michael Hanselmann
    self._status = status
56 989a8bee Michael Hanselmann
57 989a8bee Michael Hanselmann
  def AddLogEntry(self, msg):
58 989a8bee Michael Hanselmann
    self._log.append((len(self._log), msg))
59 989a8bee Michael Hanselmann
60 989a8bee Michael Hanselmann
  def CalcStatus(self):
61 989a8bee Michael Hanselmann
    return self._status
62 989a8bee Michael Hanselmann
63 989a8bee Michael Hanselmann
  def GetInfo(self, fields):
64 989a8bee Michael Hanselmann
    result = []
65 989a8bee Michael Hanselmann
66 989a8bee Michael Hanselmann
    for name in fields:
67 989a8bee Michael Hanselmann
      if name == "status":
68 989a8bee Michael Hanselmann
        result.append(self._status)
69 989a8bee Michael Hanselmann
      else:
70 989a8bee Michael Hanselmann
        raise Exception("Unknown field")
71 989a8bee Michael Hanselmann
72 989a8bee Michael Hanselmann
    return result
73 989a8bee Michael Hanselmann
74 989a8bee Michael Hanselmann
  def GetLogEntries(self, newer_than):
75 989a8bee Michael Hanselmann
    assert newer_than is None or newer_than >= 0
76 989a8bee Michael Hanselmann
77 989a8bee Michael Hanselmann
    if newer_than is None:
78 989a8bee Michael Hanselmann
      return self._log
79 989a8bee Michael Hanselmann
80 989a8bee Michael Hanselmann
    return self._log[newer_than:]
81 989a8bee Michael Hanselmann
82 989a8bee Michael Hanselmann
83 989a8bee Michael Hanselmann
class TestJobChangesChecker(unittest.TestCase):
84 989a8bee Michael Hanselmann
  def testStatus(self):
85 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
86 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
87 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
88 989a8bee Michael Hanselmann
89 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
90 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
91 989a8bee Michael Hanselmann
92 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
93 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
94 989a8bee Michael Hanselmann
95 989a8bee Michael Hanselmann
    # job.id is used by checker
96 989a8bee Michael Hanselmann
    self.assertEqual(job.id, 9094)
97 989a8bee Michael Hanselmann
98 989a8bee Michael Hanselmann
  def testStatusWithPrev(self):
99 989a8bee Michael Hanselmann
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
100 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"],
101 989a8bee Michael Hanselmann
                                        [constants.JOB_STATUS_QUEUED], None)
102 989a8bee Michael Hanselmann
    self.assert_(checker(job) is None)
103 989a8bee Michael Hanselmann
104 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
105 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
106 989a8bee Michael Hanselmann
107 989a8bee Michael Hanselmann
  def testFinalStatus(self):
108 989a8bee Michael Hanselmann
    for status in constants.JOBS_FINALIZED:
109 989a8bee Michael Hanselmann
      job = _FakeJob(2178711, status)
110 989a8bee Michael Hanselmann
      checker = jqueue._JobChangesChecker(["status"], [status], None)
111 989a8bee Michael Hanselmann
      # There won't be any changes in this status, hence it should signal
112 989a8bee Michael Hanselmann
      # a change immediately
113 989a8bee Michael Hanselmann
      self.assertEqual(checker(job), ([status], []))
114 989a8bee Michael Hanselmann
115 989a8bee Michael Hanselmann
  def testLog(self):
116 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
117 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
118 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
119 989a8bee Michael Hanselmann
120 989a8bee Michael Hanselmann
    job.AddLogEntry("Hello World")
121 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker(job)
122 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
123 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"]])
124 989a8bee Michael Hanselmann
125 989a8bee Michael Hanselmann
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
126 989a8bee Michael Hanselmann
    self.assert_(checker2(job) is None)
127 989a8bee Michael Hanselmann
128 989a8bee Michael Hanselmann
    job.AddLogEntry("Foo Bar")
129 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_ERROR)
130 989a8bee Michael Hanselmann
131 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker2(job)
132 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
134 989a8bee Michael Hanselmann
135 989a8bee Michael Hanselmann
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
136 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker3(job)
137 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
138 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
139 989a8bee Michael Hanselmann
140 989a8bee Michael Hanselmann
141 989a8bee Michael Hanselmann
class TestJobChangesWaiter(unittest.TestCase):
142 989a8bee Michael Hanselmann
  def setUp(self):
143 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
144 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
145 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
146 989a8bee Michael Hanselmann
147 989a8bee Michael Hanselmann
  def tearDown(self):
148 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
149 989a8bee Michael Hanselmann
150 989a8bee Michael Hanselmann
  def _EnsureNotifierClosed(self, notifier):
151 989a8bee Michael Hanselmann
    try:
152 989a8bee Michael Hanselmann
      os.fstat(notifier._fd)
153 989a8bee Michael Hanselmann
    except EnvironmentError, err:
154 989a8bee Michael Hanselmann
      self.assertEqual(err.errno, errno.EBADF)
155 989a8bee Michael Hanselmann
    else:
156 989a8bee Michael Hanselmann
      self.fail("File descriptor wasn't closed")
157 989a8bee Michael Hanselmann
158 989a8bee Michael Hanselmann
  def testClose(self):
159 989a8bee Michael Hanselmann
    for wait in [False, True]:
160 989a8bee Michael Hanselmann
      waiter = jqueue._JobFileChangesWaiter(self.filename)
161 989a8bee Michael Hanselmann
      try:
162 989a8bee Michael Hanselmann
        if wait:
163 989a8bee Michael Hanselmann
          waiter.Wait(0.001)
164 989a8bee Michael Hanselmann
      finally:
165 989a8bee Michael Hanselmann
        waiter.Close()
166 989a8bee Michael Hanselmann
167 989a8bee Michael Hanselmann
      # Ensure file descriptor was closed
168 989a8bee Michael Hanselmann
      self._EnsureNotifierClosed(waiter._notifier)
169 989a8bee Michael Hanselmann
170 989a8bee Michael Hanselmann
  def testChangingFile(self):
171 989a8bee Michael Hanselmann
    waiter = jqueue._JobFileChangesWaiter(self.filename)
172 989a8bee Michael Hanselmann
    try:
173 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
174 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
175 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
176 989a8bee Michael Hanselmann
    finally:
177 989a8bee Michael Hanselmann
      waiter.Close()
178 989a8bee Michael Hanselmann
179 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._notifier)
180 989a8bee Michael Hanselmann
181 989a8bee Michael Hanselmann
  def testChangingFile2(self):
182 989a8bee Michael Hanselmann
    waiter = jqueue._JobChangesWaiter(self.filename)
183 989a8bee Michael Hanselmann
    try:
184 989a8bee Michael Hanselmann
      self.assertFalse(waiter._filewaiter)
185 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(0.1))
186 989a8bee Michael Hanselmann
      self.assert_(waiter._filewaiter)
187 989a8bee Michael Hanselmann
188 989a8bee Michael Hanselmann
      # File waiter is now used, but there have been no changes
189 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
190 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
191 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
192 989a8bee Michael Hanselmann
    finally:
193 989a8bee Michael Hanselmann
      waiter.Close()
194 989a8bee Michael Hanselmann
195 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
196 989a8bee Michael Hanselmann
197 989a8bee Michael Hanselmann
198 989a8bee Michael Hanselmann
class TestWaitForJobChangesHelper(unittest.TestCase):
199 989a8bee Michael Hanselmann
  def setUp(self):
200 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
201 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
202 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
203 989a8bee Michael Hanselmann
204 989a8bee Michael Hanselmann
  def tearDown(self):
205 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
206 989a8bee Michael Hanselmann
207 989a8bee Michael Hanselmann
  def _LoadWaitingJob(self):
208 47099cd1 Michael Hanselmann
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
209 989a8bee Michael Hanselmann
210 989a8bee Michael Hanselmann
  def _LoadLostJob(self):
211 989a8bee Michael Hanselmann
    return None
212 989a8bee Michael Hanselmann
213 989a8bee Michael Hanselmann
  def testNoChanges(self):
214 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
215 989a8bee Michael Hanselmann
216 989a8bee Michael Hanselmann
    # No change
217 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
218 47099cd1 Michael Hanselmann
                          [constants.JOB_STATUS_WAITING], None, 0.1),
219 989a8bee Michael Hanselmann
                     constants.JOB_NOTCHANGED)
220 989a8bee Michael Hanselmann
221 989a8bee Michael Hanselmann
    # No previous information
222 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
223 989a8bee Michael Hanselmann
                          ["status"], None, None, 1.0),
224 47099cd1 Michael Hanselmann
                     ([constants.JOB_STATUS_WAITING], []))
225 989a8bee Michael Hanselmann
226 989a8bee Michael Hanselmann
  def testLostJob(self):
227 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
228 989a8bee Michael Hanselmann
    self.assert_(wfjc(self.filename, self._LoadLostJob,
229 989a8bee Michael Hanselmann
                      ["status"], None, None, 1.0) is None)
230 989a8bee Michael Hanselmann
231 989a8bee Michael Hanselmann
232 6760e4ed Michael Hanselmann
class TestEncodeOpError(unittest.TestCase):
233 6760e4ed Michael Hanselmann
  def test(self):
234 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
235 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
236 6760e4ed Michael Hanselmann
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
237 6760e4ed Michael Hanselmann
238 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
239 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
240 6760e4ed Michael Hanselmann
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
241 6760e4ed Michael Hanselmann
242 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
243 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
244 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
245 6760e4ed Michael Hanselmann
246 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError("Hello World")
247 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
248 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
249 6760e4ed Michael Hanselmann
250 6760e4ed Michael Hanselmann
251 8f5c488d Michael Hanselmann
class TestQueuedOpCode(unittest.TestCase):
252 8f5c488d Michael Hanselmann
  def testDefaults(self):
253 8f5c488d Michael Hanselmann
    def _Check(op):
254 8f5c488d Michael Hanselmann
      self.assertFalse(hasattr(op.input, "dry_run"))
255 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
256 8f5c488d Michael Hanselmann
      self.assertFalse(op.log)
257 8f5c488d Michael Hanselmann
      self.assert_(op.start_timestamp is None)
258 8f5c488d Michael Hanselmann
      self.assert_(op.exec_timestamp is None)
259 8f5c488d Michael Hanselmann
      self.assert_(op.end_timestamp is None)
260 8f5c488d Michael Hanselmann
      self.assert_(op.result is None)
261 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
262 8f5c488d Michael Hanselmann
263 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
264 8f5c488d Michael Hanselmann
    _Check(op1)
265 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
266 8f5c488d Michael Hanselmann
    _Check(op2)
267 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
268 8f5c488d Michael Hanselmann
269 8f5c488d Michael Hanselmann
  def testPriority(self):
270 8f5c488d Michael Hanselmann
    def _Check(op):
271 8f5c488d Michael Hanselmann
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
272 8f5c488d Michael Hanselmann
             "Default priority equals high priority; test can't work"
273 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
274 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
275 8f5c488d Michael Hanselmann
276 c6afb1ca Iustin Pop
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
277 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(inpop)
278 8f5c488d Michael Hanselmann
    _Check(op1)
279 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
280 8f5c488d Michael Hanselmann
    _Check(op2)
281 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
282 8f5c488d Michael Hanselmann
283 8f5c488d Michael Hanselmann
284 8f5c488d Michael Hanselmann
class TestQueuedJob(unittest.TestCase):
285 4679547e Michael Hanselmann
  def testNoOpCodes(self):
286 5f6b0b71 Michael Hanselmann
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
287 c0f6d0d8 Michael Hanselmann
                      None, 1, [], False)
288 5f6b0b71 Michael Hanselmann
289 8f5c488d Michael Hanselmann
  def testDefaults(self):
290 8f5c488d Michael Hanselmann
    job_id = 4260
291 8f5c488d Michael Hanselmann
    ops = [
292 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(),
293 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
294 8f5c488d Michael Hanselmann
      ]
295 8f5c488d Michael Hanselmann
296 8f5c488d Michael Hanselmann
    def _Check(job):
297 c0f6d0d8 Michael Hanselmann
      self.assertTrue(job.writable)
298 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
299 8f5c488d Michael Hanselmann
      self.assertEqual(job.log_serial, 0)
300 8f5c488d Michael Hanselmann
      self.assert_(job.received_timestamp)
301 8f5c488d Michael Hanselmann
      self.assert_(job.start_timestamp is None)
302 8f5c488d Michael Hanselmann
      self.assert_(job.end_timestamp is None)
303 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
304 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
305 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
306 8f5c488d Michael Hanselmann
      self.assertEqual(len(job.ops), len(ops))
307 8f5c488d Michael Hanselmann
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
308 8f5c488d Michael Hanselmann
                              for (inp, op) in zip(ops, job.ops)))
309 a06c6ae8 Michael Hanselmann
      self.assertRaises(errors.OpPrereqError, job.GetInfo,
310 5f6b0b71 Michael Hanselmann
                        ["unknown-field"])
311 5f6b0b71 Michael Hanselmann
      self.assertEqual(job.GetInfo(["summary"]),
312 5f6b0b71 Michael Hanselmann
                       [[op.input.Summary() for op in job.ops]])
313 8a3cd185 Michael Hanselmann
      self.assertFalse(job.archived)
314 8f5c488d Michael Hanselmann
315 c0f6d0d8 Michael Hanselmann
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
316 8f5c488d Michael Hanselmann
    _Check(job1)
317 8a3cd185 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True, False)
318 8f5c488d Michael Hanselmann
    _Check(job2)
319 8f5c488d Michael Hanselmann
    self.assertEqual(job1.Serialize(), job2.Serialize())
320 8f5c488d Michael Hanselmann
321 c0f6d0d8 Michael Hanselmann
  def testWritable(self):
322 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
323 c0f6d0d8 Michael Hanselmann
    self.assertFalse(job.writable)
324 c0f6d0d8 Michael Hanselmann
325 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
326 c0f6d0d8 Michael Hanselmann
    self.assertTrue(job.writable)
327 c0f6d0d8 Michael Hanselmann
328 8a3cd185 Michael Hanselmann
  def testArchived(self):
329 8a3cd185 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
330 8a3cd185 Michael Hanselmann
    self.assertFalse(job.archived)
331 8a3cd185 Michael Hanselmann
332 8a3cd185 Michael Hanselmann
    newjob = jqueue._QueuedJob.Restore(None, job.Serialize(), True, True)
333 8a3cd185 Michael Hanselmann
    self.assertTrue(newjob.archived)
334 8a3cd185 Michael Hanselmann
335 8a3cd185 Michael Hanselmann
    newjob2 = jqueue._QueuedJob.Restore(None, newjob.Serialize(), True, False)
336 8a3cd185 Michael Hanselmann
    self.assertFalse(newjob2.archived)
337 8a3cd185 Michael Hanselmann
338 8f5c488d Michael Hanselmann
  def testPriority(self):
339 8f5c488d Michael Hanselmann
    job_id = 4283
340 8f5c488d Michael Hanselmann
    ops = [
341 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
342 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
343 8f5c488d Michael Hanselmann
      ]
344 8f5c488d Michael Hanselmann
345 8f5c488d Michael Hanselmann
    def _Check(job):
346 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
347 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
348 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
349 8f5c488d Michael Hanselmann
350 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, job_id, ops, True)
351 8f5c488d Michael Hanselmann
    _Check(job)
352 8f5c488d Michael Hanselmann
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
353 8f5c488d Michael Hanselmann
                            for op in job.ops))
354 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
355 8f5c488d Michael Hanselmann
356 8f5c488d Michael Hanselmann
    # Increase first
357 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 1
358 8f5c488d Michael Hanselmann
    _Check(job)
359 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
360 8f5c488d Michael Hanselmann
361 8f5c488d Michael Hanselmann
    # Mark opcode as finished
362 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
363 8f5c488d Michael Hanselmann
    _Check(job)
364 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
365 8f5c488d Michael Hanselmann
366 8f5c488d Michael Hanselmann
    # Increase second
367 8f5c488d Michael Hanselmann
    job.ops[1].priority -= 10
368 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
369 8f5c488d Michael Hanselmann
370 8f5c488d Michael Hanselmann
    # Test increasing first
371 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
372 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 19
373 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
374 8f5c488d Michael Hanselmann
375 4679547e Michael Hanselmann
  def _JobForPriority(self, job_id):
376 4679547e Michael Hanselmann
    ops = [
377 4679547e Michael Hanselmann
      opcodes.OpTagsGet(),
378 4679547e Michael Hanselmann
      opcodes.OpTestDelay(),
379 4679547e Michael Hanselmann
      opcodes.OpTagsGet(),
380 4679547e Michael Hanselmann
      opcodes.OpTestDelay(),
381 4679547e Michael Hanselmann
      ]
382 4679547e Michael Hanselmann
383 4679547e Michael Hanselmann
    job = jqueue._QueuedJob(None, job_id, ops, True)
384 4679547e Michael Hanselmann
385 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
386 4679547e Michael Hanselmann
                               for op in job.ops))
387 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
388 4679547e Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
389 4679547e Michael Hanselmann
                                for op in job.ops))
390 4679547e Michael Hanselmann
391 4679547e Michael Hanselmann
    return job
392 4679547e Michael Hanselmann
393 4679547e Michael Hanselmann
  def testChangePriorityAllQueued(self):
394 4679547e Michael Hanselmann
    job = self._JobForPriority(24984)
395 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
396 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
397 4679547e Michael Hanselmann
                               for op in job.ops))
398 4679547e Michael Hanselmann
    result = job.ChangePriority(-10)
399 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -10)
400 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
401 3c631ea2 Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
402 3c631ea2 Michael Hanselmann
                                for op in job.ops))
403 4679547e Michael Hanselmann
    self.assertEqual(result,
404 4679547e Michael Hanselmann
                     (True, ("Priorities of pending opcodes for job 24984 have"
405 4679547e Michael Hanselmann
                             " been changed to -10")))
406 4679547e Michael Hanselmann
407 4679547e Michael Hanselmann
  def testChangePriorityAllFinished(self):
408 4679547e Michael Hanselmann
    job = self._JobForPriority(16405)
409 4679547e Michael Hanselmann
410 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
411 4679547e Michael Hanselmann
      if idx > 2:
412 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
413 4679547e Michael Hanselmann
      else:
414 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
415 4679547e Michael Hanselmann
416 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
417 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
418 4679547e Michael Hanselmann
    result = job.ChangePriority(-10)
419 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
420 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
421 4679547e Michael Hanselmann
                               for op in job.ops))
422 4679547e Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
423 4679547e Michael Hanselmann
                                for op in job.ops))
424 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
425 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
426 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
427 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
428 4679547e Michael Hanselmann
      constants.OP_STATUS_ERROR,
429 4679547e Michael Hanselmann
      ])
430 4679547e Michael Hanselmann
    self.assertEqual(result, (False, "Job 16405 is finished"))
431 4679547e Michael Hanselmann
432 4679547e Michael Hanselmann
  def testChangePriorityCancelling(self):
433 4679547e Michael Hanselmann
    job = self._JobForPriority(31572)
434 4679547e Michael Hanselmann
435 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
436 4679547e Michael Hanselmann
      if idx > 1:
437 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
438 4679547e Michael Hanselmann
      else:
439 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
440 4679547e Michael Hanselmann
441 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
442 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
443 4679547e Michael Hanselmann
    result = job.ChangePriority(5)
444 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
445 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
446 4679547e Michael Hanselmann
                               for op in job.ops))
447 4679547e Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
448 4679547e Michael Hanselmann
                                for op in job.ops))
449 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
450 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
451 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
452 4679547e Michael Hanselmann
      constants.OP_STATUS_CANCELING,
453 4679547e Michael Hanselmann
      constants.OP_STATUS_CANCELING,
454 4679547e Michael Hanselmann
      ])
455 4679547e Michael Hanselmann
    self.assertEqual(result, (False, "Job 31572 is cancelling"))
456 4679547e Michael Hanselmann
457 4679547e Michael Hanselmann
  def testChangePriorityFirstRunning(self):
458 4679547e Michael Hanselmann
    job = self._JobForPriority(1716215889)
459 4679547e Michael Hanselmann
460 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
461 4679547e Michael Hanselmann
      if idx == 0:
462 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_RUNNING
463 4679547e Michael Hanselmann
      else:
464 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_QUEUED
465 4679547e Michael Hanselmann
466 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
467 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
468 4679547e Michael Hanselmann
    result = job.ChangePriority(7)
469 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
470 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
471 4679547e Michael Hanselmann
                     [constants.OP_PRIO_DEFAULT, 7, 7, 7])
472 3c631ea2 Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
473 3c631ea2 Michael Hanselmann
                                for op in job.ops))
474 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
475 4679547e Michael Hanselmann
      constants.OP_STATUS_RUNNING,
476 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
477 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
478 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
479 4679547e Michael Hanselmann
      ])
480 4679547e Michael Hanselmann
    self.assertEqual(result,
481 4679547e Michael Hanselmann
                     (True, ("Priorities of pending opcodes for job"
482 4679547e Michael Hanselmann
                             " 1716215889 have been changed to 7")))
483 4679547e Michael Hanselmann
484 4679547e Michael Hanselmann
  def testChangePriorityLastRunning(self):
485 4679547e Michael Hanselmann
    job = self._JobForPriority(1308)
486 4679547e Michael Hanselmann
487 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
488 4679547e Michael Hanselmann
      if idx == (len(job.ops) - 1):
489 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_RUNNING
490 4679547e Michael Hanselmann
      else:
491 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
492 4679547e Michael Hanselmann
493 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
494 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
495 4679547e Michael Hanselmann
    result = job.ChangePriority(-3)
496 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
497 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
498 4679547e Michael Hanselmann
                               for op in job.ops))
499 4679547e Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
500 4679547e Michael Hanselmann
                                for op in job.ops))
501 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
502 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
503 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
504 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
505 4679547e Michael Hanselmann
      constants.OP_STATUS_RUNNING,
506 4679547e Michael Hanselmann
      ])
507 4679547e Michael Hanselmann
    self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
508 4679547e Michael Hanselmann
509 4679547e Michael Hanselmann
  def testChangePrioritySecondOpcodeRunning(self):
510 4679547e Michael Hanselmann
    job = self._JobForPriority(27701)
511 4679547e Michael Hanselmann
512 4679547e Michael Hanselmann
    self.assertEqual(len(job.ops), 4)
513 4679547e Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
514 4679547e Michael Hanselmann
    job.ops[1].status = constants.OP_STATUS_RUNNING
515 4679547e Michael Hanselmann
    job.ops[2].status = constants.OP_STATUS_QUEUED
516 4679547e Michael Hanselmann
    job.ops[3].status = constants.OP_STATUS_QUEUED
517 4679547e Michael Hanselmann
518 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
519 4679547e Michael Hanselmann
    result = job.ChangePriority(-19)
520 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -19)
521 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
522 4679547e Michael Hanselmann
                     [constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
523 4679547e Michael Hanselmann
                      -19, -19])
524 3c631ea2 Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
525 3c631ea2 Michael Hanselmann
                                for op in job.ops))
526 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
527 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
528 4679547e Michael Hanselmann
      constants.OP_STATUS_RUNNING,
529 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
530 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
531 4679547e Michael Hanselmann
      ])
532 4679547e Michael Hanselmann
    self.assertEqual(result,
533 4679547e Michael Hanselmann
                     (True, ("Priorities of pending opcodes for job"
534 4679547e Michael Hanselmann
                             " 27701 have been changed to -19")))
535 4679547e Michael Hanselmann
536 4679547e Michael Hanselmann
  def testChangePriorityWithInconsistentJob(self):
537 4679547e Michael Hanselmann
    job = self._JobForPriority(30097)
538 4679547e Michael Hanselmann
539 4679547e Michael Hanselmann
    self.assertEqual(len(job.ops), 4)
540 4679547e Michael Hanselmann
541 4679547e Michael Hanselmann
    # This job is invalid (as it has two opcodes marked as running) and make
542 4679547e Michael Hanselmann
    # the call fail because an unprocessed opcode precedes a running one (which
543 4679547e Michael Hanselmann
    # should never happen in reality)
544 4679547e Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
545 4679547e Michael Hanselmann
    job.ops[1].status = constants.OP_STATUS_RUNNING
546 4679547e Michael Hanselmann
    job.ops[2].status = constants.OP_STATUS_QUEUED
547 4679547e Michael Hanselmann
    job.ops[3].status = constants.OP_STATUS_RUNNING
548 4679547e Michael Hanselmann
549 4679547e Michael Hanselmann
    self.assertRaises(AssertionError, job.ChangePriority, 19)
550 4679547e Michael Hanselmann
551 db5bce34 Michael Hanselmann
  def testCalcStatus(self):
552 db5bce34 Michael Hanselmann
    def _Queued(ops):
553 db5bce34 Michael Hanselmann
      # The default status is "queued"
554 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
555 db5bce34 Michael Hanselmann
                              for op in ops))
556 db5bce34 Michael Hanselmann
557 db5bce34 Michael Hanselmann
    def _Waitlock1(ops):
558 47099cd1 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_WAITING
559 db5bce34 Michael Hanselmann
560 db5bce34 Michael Hanselmann
    def _Waitlock2(ops):
561 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
562 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
563 47099cd1 Michael Hanselmann
      ops[2].status = constants.OP_STATUS_WAITING
564 db5bce34 Michael Hanselmann
565 db5bce34 Michael Hanselmann
    def _Running(ops):
566 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
567 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_RUNNING
568 db5bce34 Michael Hanselmann
      for op in ops[2:]:
569 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_QUEUED
570 db5bce34 Michael Hanselmann
571 db5bce34 Michael Hanselmann
    def _Canceling1(ops):
572 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
573 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
574 db5bce34 Michael Hanselmann
      for op in ops[2:]:
575 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
576 db5bce34 Michael Hanselmann
577 db5bce34 Michael Hanselmann
    def _Canceling2(ops):
578 db5bce34 Michael Hanselmann
      for op in ops:
579 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
580 db5bce34 Michael Hanselmann
581 db5bce34 Michael Hanselmann
    def _Canceled(ops):
582 db5bce34 Michael Hanselmann
      for op in ops:
583 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELED
584 db5bce34 Michael Hanselmann
585 db5bce34 Michael Hanselmann
    def _Error1(ops):
586 db5bce34 Michael Hanselmann
      for idx, op in enumerate(ops):
587 db5bce34 Michael Hanselmann
        if idx > 3:
588 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_ERROR
589 db5bce34 Michael Hanselmann
        else:
590 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_SUCCESS
591 db5bce34 Michael Hanselmann
592 db5bce34 Michael Hanselmann
    def _Error2(ops):
593 db5bce34 Michael Hanselmann
      for op in ops:
594 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
595 db5bce34 Michael Hanselmann
596 db5bce34 Michael Hanselmann
    def _Success(ops):
597 db5bce34 Michael Hanselmann
      for op in ops:
598 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
599 db5bce34 Michael Hanselmann
600 db5bce34 Michael Hanselmann
    tests = {
601 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_QUEUED: [_Queued],
602 47099cd1 Michael Hanselmann
      constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
603 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_RUNNING: [_Running],
604 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
605 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELED: [_Canceled],
606 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
607 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_SUCCESS: [_Success],
608 db5bce34 Michael Hanselmann
      }
609 db5bce34 Michael Hanselmann
610 db5bce34 Michael Hanselmann
    def _NewJob():
611 db5bce34 Michael Hanselmann
      job = jqueue._QueuedJob(None, 1,
612 c0f6d0d8 Michael Hanselmann
                              [opcodes.OpTestDelay() for _ in range(10)],
613 c0f6d0d8 Michael Hanselmann
                              True)
614 db5bce34 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
615 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
616 db5bce34 Michael Hanselmann
                              for op in job.ops))
617 db5bce34 Michael Hanselmann
      return job
618 db5bce34 Michael Hanselmann
619 db5bce34 Michael Hanselmann
    for status in constants.JOB_STATUS_ALL:
620 db5bce34 Michael Hanselmann
      sttests = tests[status]
621 db5bce34 Michael Hanselmann
      assert sttests
622 db5bce34 Michael Hanselmann
      for fn in sttests:
623 db5bce34 Michael Hanselmann
        job = _NewJob()
624 db5bce34 Michael Hanselmann
        fn(job.ops)
625 db5bce34 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), status)
626 db5bce34 Michael Hanselmann
627 8f5c488d Michael Hanselmann
628 b95479a5 Michael Hanselmann
class _FakeDependencyManager:
629 be760ba8 Michael Hanselmann
  def __init__(self):
630 b95479a5 Michael Hanselmann
    self._checks = []
631 b95479a5 Michael Hanselmann
    self._notifications = []
632 b95479a5 Michael Hanselmann
    self._waiting = set()
633 b95479a5 Michael Hanselmann
634 b95479a5 Michael Hanselmann
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
635 b95479a5 Michael Hanselmann
    self._checks.append((job, dep_job_id, dep_status, result))
636 b95479a5 Michael Hanselmann
637 b95479a5 Michael Hanselmann
  def CountPendingResults(self):
638 b95479a5 Michael Hanselmann
    return len(self._checks)
639 b95479a5 Michael Hanselmann
640 b95479a5 Michael Hanselmann
  def CountWaitingJobs(self):
641 b95479a5 Michael Hanselmann
    return len(self._waiting)
642 b95479a5 Michael Hanselmann
643 b95479a5 Michael Hanselmann
  def GetNextNotification(self):
644 b95479a5 Michael Hanselmann
    return self._notifications.pop(0)
645 b95479a5 Michael Hanselmann
646 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
647 b95479a5 Michael Hanselmann
    return job in self._waiting
648 b95479a5 Michael Hanselmann
649 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
650 b95479a5 Michael Hanselmann
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
651 b95479a5 Michael Hanselmann
652 b95479a5 Michael Hanselmann
    assert exp_job == job
653 b95479a5 Michael Hanselmann
    assert exp_dep_job_id == dep_job_id
654 b95479a5 Michael Hanselmann
    assert exp_dep_status == dep_status
655 b95479a5 Michael Hanselmann
656 b95479a5 Michael Hanselmann
    (result_status, _) = result
657 b95479a5 Michael Hanselmann
658 b95479a5 Michael Hanselmann
    if result_status == jqueue._JobDependencyManager.WAIT:
659 b95479a5 Michael Hanselmann
      self._waiting.add(job)
660 b95479a5 Michael Hanselmann
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
661 b95479a5 Michael Hanselmann
      self._waiting.remove(job)
662 b95479a5 Michael Hanselmann
663 b95479a5 Michael Hanselmann
    return result
664 b95479a5 Michael Hanselmann
665 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
666 b95479a5 Michael Hanselmann
    self._notifications.append(job_id)
667 b95479a5 Michael Hanselmann
668 b95479a5 Michael Hanselmann
669 b95479a5 Michael Hanselmann
class _DisabledFakeDependencyManager:
670 b95479a5 Michael Hanselmann
  def JobWaiting(self, _):
671 b95479a5 Michael Hanselmann
    return False
672 b95479a5 Michael Hanselmann
673 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, *args):
674 b95479a5 Michael Hanselmann
    assert False, "Should not be called"
675 b95479a5 Michael Hanselmann
676 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, _):
677 b95479a5 Michael Hanselmann
    pass
678 b95479a5 Michael Hanselmann
679 b95479a5 Michael Hanselmann
680 b95479a5 Michael Hanselmann
class _FakeQueueForProc:
681 b95479a5 Michael Hanselmann
  def __init__(self, depmgr=None):
682 be760ba8 Michael Hanselmann
    self._acquired = False
683 ebb2a2a3 Michael Hanselmann
    self._updates = []
684 6a373640 Michael Hanselmann
    self._submitted = []
685 942e2262 Michael Hanselmann
    self._accepting_jobs = True
686 6a373640 Michael Hanselmann
687 6a373640 Michael Hanselmann
    self._submit_count = itertools.count(1000)
688 be760ba8 Michael Hanselmann
689 b95479a5 Michael Hanselmann
    if depmgr:
690 b95479a5 Michael Hanselmann
      self.depmgr = depmgr
691 b95479a5 Michael Hanselmann
    else:
692 b95479a5 Michael Hanselmann
      self.depmgr = _DisabledFakeDependencyManager()
693 b95479a5 Michael Hanselmann
694 be760ba8 Michael Hanselmann
  def IsAcquired(self):
695 be760ba8 Michael Hanselmann
    return self._acquired
696 be760ba8 Michael Hanselmann
697 ebb2a2a3 Michael Hanselmann
  def GetNextUpdate(self):
698 ebb2a2a3 Michael Hanselmann
    return self._updates.pop(0)
699 ebb2a2a3 Michael Hanselmann
700 6a373640 Michael Hanselmann
  def GetNextSubmittedJob(self):
701 6a373640 Michael Hanselmann
    return self._submitted.pop(0)
702 6a373640 Michael Hanselmann
703 be760ba8 Michael Hanselmann
  def acquire(self, shared=0):
704 be760ba8 Michael Hanselmann
    assert shared == 1
705 be760ba8 Michael Hanselmann
    self._acquired = True
706 be760ba8 Michael Hanselmann
707 be760ba8 Michael Hanselmann
  def release(self):
708 be760ba8 Michael Hanselmann
    assert self._acquired
709 be760ba8 Michael Hanselmann
    self._acquired = False
710 be760ba8 Michael Hanselmann
711 ebb2a2a3 Michael Hanselmann
  def UpdateJobUnlocked(self, job, replicate=True):
712 ebb2a2a3 Michael Hanselmann
    assert self._acquired, "Lock not acquired while updating job"
713 ebb2a2a3 Michael Hanselmann
    self._updates.append((job, bool(replicate)))
714 be760ba8 Michael Hanselmann
715 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
716 6a373640 Michael Hanselmann
    assert not self._acquired, "Lock acquired while submitting jobs"
717 6a373640 Michael Hanselmann
    job_ids = [self._submit_count.next() for _ in jobs]
718 6a373640 Michael Hanselmann
    self._submitted.extend(zip(job_ids, jobs))
719 6a373640 Michael Hanselmann
    return job_ids
720 6a373640 Michael Hanselmann
721 942e2262 Michael Hanselmann
  def StopAcceptingJobs(self):
722 942e2262 Michael Hanselmann
    self._accepting_jobs = False
723 942e2262 Michael Hanselmann
724 942e2262 Michael Hanselmann
  def AcceptingJobsUnlocked(self):
725 942e2262 Michael Hanselmann
    return self._accepting_jobs
726 942e2262 Michael Hanselmann
727 be760ba8 Michael Hanselmann
728 be760ba8 Michael Hanselmann
class _FakeExecOpCodeForProc:
729 ebb2a2a3 Michael Hanselmann
  def __init__(self, queue, before_start, after_start):
730 ebb2a2a3 Michael Hanselmann
    self._queue = queue
731 be760ba8 Michael Hanselmann
    self._before_start = before_start
732 be760ba8 Michael Hanselmann
    self._after_start = after_start
733 be760ba8 Michael Hanselmann
734 e4e59de8 Michael Hanselmann
  def __call__(self, op, cbs, timeout=None):
735 be760ba8 Michael Hanselmann
    assert isinstance(op, opcodes.OpTestDummy)
736 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired(), \
737 ebb2a2a3 Michael Hanselmann
           "Queue lock not released when executing opcode"
738 be760ba8 Michael Hanselmann
739 be760ba8 Michael Hanselmann
    if self._before_start:
740 e4e59de8 Michael Hanselmann
      self._before_start(timeout, cbs.CurrentPriority())
741 be760ba8 Michael Hanselmann
742 be760ba8 Michael Hanselmann
    cbs.NotifyStart()
743 be760ba8 Michael Hanselmann
744 be760ba8 Michael Hanselmann
    if self._after_start:
745 be760ba8 Michael Hanselmann
      self._after_start(op, cbs)
746 be760ba8 Michael Hanselmann
747 ebb2a2a3 Michael Hanselmann
    # Check again after the callbacks
748 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired()
749 ebb2a2a3 Michael Hanselmann
750 be760ba8 Michael Hanselmann
    if op.fail:
751 be760ba8 Michael Hanselmann
      raise errors.OpExecError("Error requested (%s)" % op.result)
752 be760ba8 Michael Hanselmann
753 6a373640 Michael Hanselmann
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
754 6a373640 Michael Hanselmann
      return cbs.SubmitManyJobs(op.submit_jobs)
755 6a373640 Michael Hanselmann
756 be760ba8 Michael Hanselmann
    return op.result
757 be760ba8 Michael Hanselmann
758 be760ba8 Michael Hanselmann
759 26d3fd2f Michael Hanselmann
class _JobProcessorTestUtils:
760 be760ba8 Michael Hanselmann
  def _CreateJob(self, queue, job_id, ops):
761 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(queue, job_id, ops, True)
762 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
763 be760ba8 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
764 be760ba8 Michael Hanselmann
    self.assertEqual(len(ops), len(job.ops))
765 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.input == inp
766 be760ba8 Michael Hanselmann
                            for (op, inp) in zip(job.ops, ops)))
767 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
768 be760ba8 Michael Hanselmann
    return job
769 be760ba8 Michael Hanselmann
770 26d3fd2f Michael Hanselmann
771 26d3fd2f Michael Hanselmann
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
772 be760ba8 Michael Hanselmann
  def _GenericCheckJob(self, job):
773 be760ba8 Michael Hanselmann
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
774 be760ba8 Michael Hanselmann
                      for op in job.ops)
775 be760ba8 Michael Hanselmann
776 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
777 be760ba8 Michael Hanselmann
                     [[op.start_timestamp for op in job.ops],
778 be760ba8 Michael Hanselmann
                      [op.exec_timestamp for op in job.ops],
779 be760ba8 Michael Hanselmann
                      [op.end_timestamp for op in job.ops]])
780 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
781 be760ba8 Michael Hanselmann
                     [job.received_timestamp,
782 be760ba8 Michael Hanselmann
                      job.start_timestamp,
783 be760ba8 Michael Hanselmann
                      job.end_timestamp])
784 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
785 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
786 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
787 be760ba8 Michael Hanselmann
788 be760ba8 Michael Hanselmann
  def testSuccess(self):
789 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
790 be760ba8 Michael Hanselmann
791 be760ba8 Michael Hanselmann
    for (job_id, opcount) in [(25351, 1), (6637, 3),
792 be760ba8 Michael Hanselmann
                              (24644, 10), (32207, 100)]:
793 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
794 be760ba8 Michael Hanselmann
             for i in range(opcount)]
795 be760ba8 Michael Hanselmann
796 be760ba8 Michael Hanselmann
      # Create job
797 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
798 be760ba8 Michael Hanselmann
799 f23db633 Michael Hanselmann
      def _BeforeStart(timeout, priority):
800 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
801 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
802 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
803 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
804 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
805 be760ba8 Michael Hanselmann
806 be760ba8 Michael Hanselmann
      def _AfterStart(op, cbs):
807 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
808 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
809 ebb2a2a3 Michael Hanselmann
810 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
811 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
812 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
813 be760ba8 Michael Hanselmann
814 be760ba8 Michael Hanselmann
        # Job is running, cancelling shouldn't be possible
815 be760ba8 Michael Hanselmann
        (success, _) = job.Cancel()
816 be760ba8 Michael Hanselmann
        self.assertFalse(success)
817 be760ba8 Michael Hanselmann
818 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
819 be760ba8 Michael Hanselmann
820 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
821 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
822 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
823 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
824 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
825 be760ba8 Michael Hanselmann
        if idx == len(ops) - 1:
826 be760ba8 Michael Hanselmann
          # Last opcode
827 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
828 be760ba8 Michael Hanselmann
        else:
829 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.DEFER)
830 be760ba8 Michael Hanselmann
831 be760ba8 Michael Hanselmann
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
832 be760ba8 Michael Hanselmann
          self.assert_(job.start_timestamp)
833 be760ba8 Michael Hanselmann
          self.assertFalse(job.end_timestamp)
834 be760ba8 Michael Hanselmann
835 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
836 ebb2a2a3 Michael Hanselmann
837 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
838 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
839 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opresult"]),
840 be760ba8 Michael Hanselmann
                       [[op.input.result for op in job.ops]])
841 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
842 be760ba8 Michael Hanselmann
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
843 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
844 be760ba8 Michael Hanselmann
                              for op in job.ops))
845 be760ba8 Michael Hanselmann
846 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
847 be760ba8 Michael Hanselmann
848 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
849 75d81fc8 Michael Hanselmann
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
850 75d81fc8 Michael Hanselmann
                       jqueue._JobProcessor.FINISHED)
851 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
852 be760ba8 Michael Hanselmann
853 be760ba8 Michael Hanselmann
  def testOpcodeError(self):
854 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
855 be760ba8 Michael Hanselmann
856 be760ba8 Michael Hanselmann
    testdata = [
857 be760ba8 Michael Hanselmann
      (17077, 1, 0, 0),
858 be760ba8 Michael Hanselmann
      (1782, 5, 2, 2),
859 be760ba8 Michael Hanselmann
      (18179, 10, 9, 9),
860 be760ba8 Michael Hanselmann
      (4744, 10, 3, 8),
861 be760ba8 Michael Hanselmann
      (23816, 100, 39, 45),
862 be760ba8 Michael Hanselmann
      ]
863 be760ba8 Michael Hanselmann
864 be760ba8 Michael Hanselmann
    for (job_id, opcount, failfrom, failto) in testdata:
865 be760ba8 Michael Hanselmann
      # Prepare opcodes
866 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
867 be760ba8 Michael Hanselmann
                                 fail=(failfrom <= i and
868 be760ba8 Michael Hanselmann
                                       i <= failto))
869 be760ba8 Michael Hanselmann
             for i in range(opcount)]
870 be760ba8 Michael Hanselmann
871 be760ba8 Michael Hanselmann
      # Create job
872 a06c6ae8 Michael Hanselmann
      job = self._CreateJob(queue, str(job_id), ops)
873 be760ba8 Michael Hanselmann
874 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, None, None)
875 be760ba8 Michael Hanselmann
876 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
877 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
878 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
879 ebb2a2a3 Michael Hanselmann
        # queued to waitlock
880 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
881 ebb2a2a3 Michael Hanselmann
        # waitlock to running
882 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
883 ebb2a2a3 Michael Hanselmann
        # Opcode result
884 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
885 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
886 be760ba8 Michael Hanselmann
887 be760ba8 Michael Hanselmann
        if idx in (failfrom, len(ops) - 1):
888 be760ba8 Michael Hanselmann
          # Last opcode
889 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
890 be760ba8 Michael Hanselmann
          break
891 be760ba8 Michael Hanselmann
892 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
893 be760ba8 Michael Hanselmann
894 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
895 be760ba8 Michael Hanselmann
896 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
897 ebb2a2a3 Michael Hanselmann
898 be760ba8 Michael Hanselmann
      # Check job status
899 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
900 76b62028 Iustin Pop
      self.assertEqual(job.GetInfo(["id"]), [job_id])
901 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
902 be760ba8 Michael Hanselmann
903 be760ba8 Michael Hanselmann
      # Check opcode status
904 be760ba8 Michael Hanselmann
      data = zip(job.ops,
905 be760ba8 Michael Hanselmann
                 job.GetInfo(["opstatus"])[0],
906 be760ba8 Michael Hanselmann
                 job.GetInfo(["opresult"])[0])
907 be760ba8 Michael Hanselmann
908 be760ba8 Michael Hanselmann
      for idx, (op, opstatus, opresult) in enumerate(data):
909 be760ba8 Michael Hanselmann
        if idx < failfrom:
910 be760ba8 Michael Hanselmann
          assert not op.input.fail
911 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
912 be760ba8 Michael Hanselmann
          self.assertEqual(opresult, op.input.result)
913 be760ba8 Michael Hanselmann
        elif idx <= failto:
914 be760ba8 Michael Hanselmann
          assert op.input.fail
915 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
916 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
917 be760ba8 Michael Hanselmann
        else:
918 be760ba8 Michael Hanselmann
          assert not op.input.fail
919 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
920 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
921 be760ba8 Michael Hanselmann
922 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
923 be760ba8 Michael Hanselmann
                              for op in job.ops[:failfrom]))
924 be760ba8 Michael Hanselmann
925 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
926 be760ba8 Michael Hanselmann
927 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
928 75d81fc8 Michael Hanselmann
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
929 75d81fc8 Michael Hanselmann
                       jqueue._JobProcessor.FINISHED)
930 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
931 be760ba8 Michael Hanselmann
932 be760ba8 Michael Hanselmann
  def testCancelWhileInQueue(self):
933 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
934 be760ba8 Michael Hanselmann
935 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
936 be760ba8 Michael Hanselmann
           for i in range(5)]
937 be760ba8 Michael Hanselmann
938 be760ba8 Michael Hanselmann
    # Create job
939 be760ba8 Michael Hanselmann
    job_id = 17045
940 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
941 be760ba8 Michael Hanselmann
942 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
943 be760ba8 Michael Hanselmann
944 be760ba8 Michael Hanselmann
    # Mark as cancelled
945 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
946 be760ba8 Michael Hanselmann
    self.assert_(success)
947 be760ba8 Michael Hanselmann
948 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
949 ebb2a2a3 Michael Hanselmann
950 66bd7445 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
951 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
952 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
953 be760ba8 Michael Hanselmann
                            for op in job.ops))
954 be760ba8 Michael Hanselmann
955 66bd7445 Michael Hanselmann
    # Serialize to check for differences
956 66bd7445 Michael Hanselmann
    before_proc = job.Serialize()
957 66bd7445 Michael Hanselmann
958 66bd7445 Michael Hanselmann
    # Simulate processor called in workerpool
959 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
960 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
961 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
962 30c945d0 Michael Hanselmann
963 30c945d0 Michael Hanselmann
    # Check result
964 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
965 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
966 30c945d0 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
967 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
968 30c945d0 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
969 30c945d0 Michael Hanselmann
                                for op in job.ops))
970 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
971 30c945d0 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
972 30c945d0 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
973 30c945d0 Michael Hanselmann
974 66bd7445 Michael Hanselmann
    # Must not have changed or written
975 66bd7445 Michael Hanselmann
    self.assertEqual(before_proc, job.Serialize())
976 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
977 66bd7445 Michael Hanselmann
978 30c945d0 Michael Hanselmann
  def testCancelWhileWaitlockInQueue(self):
979 30c945d0 Michael Hanselmann
    queue = _FakeQueueForProc()
980 30c945d0 Michael Hanselmann
981 30c945d0 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
982 30c945d0 Michael Hanselmann
           for i in range(5)]
983 30c945d0 Michael Hanselmann
984 30c945d0 Michael Hanselmann
    # Create job
985 30c945d0 Michael Hanselmann
    job_id = 8645
986 30c945d0 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
987 30c945d0 Michael Hanselmann
988 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
989 30c945d0 Michael Hanselmann
990 47099cd1 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_WAITING
991 30c945d0 Michael Hanselmann
992 30c945d0 Michael Hanselmann
    assert len(job.ops) == 5
993 30c945d0 Michael Hanselmann
994 47099cd1 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
995 30c945d0 Michael Hanselmann
996 30c945d0 Michael Hanselmann
    # Mark as cancelling
997 30c945d0 Michael Hanselmann
    (success, _) = job.Cancel()
998 30c945d0 Michael Hanselmann
    self.assert_(success)
999 30c945d0 Michael Hanselmann
1000 30c945d0 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1001 30c945d0 Michael Hanselmann
1002 30c945d0 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1003 30c945d0 Michael Hanselmann
                            for op in job.ops))
1004 30c945d0 Michael Hanselmann
1005 30c945d0 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1006 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1007 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1008 be760ba8 Michael Hanselmann
1009 be760ba8 Michael Hanselmann
    # Check result
1010 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1011 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1012 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
1013 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
1014 be760ba8 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1015 be760ba8 Michael Hanselmann
                                for op in job.ops))
1016 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1017 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1018 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1019 be760ba8 Michael Hanselmann
1020 be760ba8 Michael Hanselmann
  def testCancelWhileWaitlock(self):
1021 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1022 be760ba8 Michael Hanselmann
1023 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1024 be760ba8 Michael Hanselmann
           for i in range(5)]
1025 be760ba8 Michael Hanselmann
1026 be760ba8 Michael Hanselmann
    # Create job
1027 be760ba8 Michael Hanselmann
    job_id = 11009
1028 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1029 be760ba8 Michael Hanselmann
1030 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1031 be760ba8 Michael Hanselmann
1032 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1033 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1034 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1035 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1036 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1037 be760ba8 Michael Hanselmann
1038 be760ba8 Michael Hanselmann
      # Mark as cancelled
1039 be760ba8 Michael Hanselmann
      (success, _) = job.Cancel()
1040 be760ba8 Michael Hanselmann
      self.assert_(success)
1041 be760ba8 Michael Hanselmann
1042 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1043 be760ba8 Michael Hanselmann
                              for op in job.ops))
1044 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1045 be760ba8 Michael Hanselmann
1046 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
1047 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1048 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1049 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1050 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1051 be760ba8 Michael Hanselmann
1052 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1053 be760ba8 Michael Hanselmann
1054 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1055 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1056 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1057 ebb2a2a3 Michael Hanselmann
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1058 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1059 be760ba8 Michael Hanselmann
1060 be760ba8 Michael Hanselmann
    # Check result
1061 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1062 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1063 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
1064 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
1065 be760ba8 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1066 be760ba8 Michael Hanselmann
                                for op in job.ops))
1067 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1068 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1069 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1070 be760ba8 Michael Hanselmann
1071 942e2262 Michael Hanselmann
  def _TestCancelWhileSomething(self, cb):
1072 9e49dfc5 Michael Hanselmann
    queue = _FakeQueueForProc()
1073 9e49dfc5 Michael Hanselmann
1074 9e49dfc5 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1075 9e49dfc5 Michael Hanselmann
           for i in range(5)]
1076 9e49dfc5 Michael Hanselmann
1077 9e49dfc5 Michael Hanselmann
    # Create job
1078 9e49dfc5 Michael Hanselmann
    job_id = 24314
1079 9e49dfc5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1080 9e49dfc5 Michael Hanselmann
1081 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1082 9e49dfc5 Michael Hanselmann
1083 9e49dfc5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1084 9e49dfc5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1085 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1086 9e49dfc5 Michael Hanselmann
1087 9e49dfc5 Michael Hanselmann
      # Mark as cancelled
1088 9e49dfc5 Michael Hanselmann
      (success, _) = job.Cancel()
1089 9e49dfc5 Michael Hanselmann
      self.assert_(success)
1090 9e49dfc5 Michael Hanselmann
1091 9e49dfc5 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1092 9e49dfc5 Michael Hanselmann
                              for op in job.ops))
1093 9e49dfc5 Michael Hanselmann
1094 942e2262 Michael Hanselmann
      cb(queue)
1095 9e49dfc5 Michael Hanselmann
1096 9e49dfc5 Michael Hanselmann
    def _AfterStart(op, cbs):
1097 9e49dfc5 Michael Hanselmann
      self.fail("Should not reach this")
1098 9e49dfc5 Michael Hanselmann
1099 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1100 9e49dfc5 Michael Hanselmann
1101 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1102 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1103 9e49dfc5 Michael Hanselmann
1104 9e49dfc5 Michael Hanselmann
    # Check result
1105 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1106 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1107 9e49dfc5 Michael Hanselmann
    self.assert_(job.start_timestamp)
1108 9e49dfc5 Michael Hanselmann
    self.assert_(job.end_timestamp)
1109 9e49dfc5 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1110 9e49dfc5 Michael Hanselmann
                                for op in job.ops))
1111 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1112 9e49dfc5 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1113 9e49dfc5 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1114 9e49dfc5 Michael Hanselmann
1115 942e2262 Michael Hanselmann
    return queue
1116 942e2262 Michael Hanselmann
1117 942e2262 Michael Hanselmann
  def testCancelWhileWaitlockWithTimeout(self):
1118 942e2262 Michael Hanselmann
    def fn(_):
1119 942e2262 Michael Hanselmann
      # Fake an acquire attempt timing out
1120 942e2262 Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
1121 942e2262 Michael Hanselmann
1122 942e2262 Michael Hanselmann
    self._TestCancelWhileSomething(fn)
1123 942e2262 Michael Hanselmann
1124 942e2262 Michael Hanselmann
  def testCancelDuringQueueShutdown(self):
1125 942e2262 Michael Hanselmann
    queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs())
1126 942e2262 Michael Hanselmann
    self.assertFalse(queue.AcceptingJobsUnlocked())
1127 942e2262 Michael Hanselmann
1128 be760ba8 Michael Hanselmann
  def testCancelWhileRunning(self):
1129 be760ba8 Michael Hanselmann
    # Tests canceling a job with finished opcodes and more, unprocessed ones
1130 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1131 be760ba8 Michael Hanselmann
1132 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1133 be760ba8 Michael Hanselmann
           for i in range(3)]
1134 be760ba8 Michael Hanselmann
1135 be760ba8 Michael Hanselmann
    # Create job
1136 76b62028 Iustin Pop
    job_id = 28492
1137 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1138 be760ba8 Michael Hanselmann
1139 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1140 be760ba8 Michael Hanselmann
1141 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1142 be760ba8 Michael Hanselmann
1143 be760ba8 Michael Hanselmann
    # Run one opcode
1144 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1145 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1146 be760ba8 Michael Hanselmann
1147 be760ba8 Michael Hanselmann
    # Job goes back to queued
1148 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1149 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1150 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1151 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
1152 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1153 be760ba8 Michael Hanselmann
                      ["Res0", None, None]])
1154 be760ba8 Michael Hanselmann
1155 be760ba8 Michael Hanselmann
    # Mark as cancelled
1156 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
1157 be760ba8 Michael Hanselmann
    self.assert_(success)
1158 be760ba8 Michael Hanselmann
1159 be760ba8 Michael Hanselmann
    # Try processing another opcode (this will actually cancel the job)
1160 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1161 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1162 be760ba8 Michael Hanselmann
1163 be760ba8 Michael Hanselmann
    # Check result
1164 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1165 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
1166 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1167 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1168 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1169 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
1170 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
1171 be760ba8 Michael Hanselmann
                      ["Res0", "Job canceled by request",
1172 be760ba8 Michael Hanselmann
                       "Job canceled by request"]])
1173 be760ba8 Michael Hanselmann
1174 942e2262 Michael Hanselmann
  def _TestQueueShutdown(self, queue, opexec, job, runcount):
1175 942e2262 Michael Hanselmann
    self.assertTrue(queue.AcceptingJobsUnlocked())
1176 942e2262 Michael Hanselmann
1177 942e2262 Michael Hanselmann
    # Simulate shutdown
1178 942e2262 Michael Hanselmann
    queue.StopAcceptingJobs()
1179 942e2262 Michael Hanselmann
1180 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1181 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1182 942e2262 Michael Hanselmann
1183 942e2262 Michael Hanselmann
    # Check result
1184 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1185 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
1186 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1187 942e2262 Michael Hanselmann
    self.assertTrue(job.start_timestamp)
1188 942e2262 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
1189 942e2262 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1190 942e2262 Michael Hanselmann
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1191 942e2262 Michael Hanselmann
                               for op in job.ops[:runcount]))
1192 942e2262 Michael Hanselmann
    self.assertFalse(job.ops[runcount].end_timestamp)
1193 942e2262 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1194 942e2262 Michael Hanselmann
                                for op in job.ops[(runcount + 1):]))
1195 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1196 942e2262 Michael Hanselmann
                     [(([constants.OP_STATUS_SUCCESS] * runcount) +
1197 942e2262 Michael Hanselmann
                       ([constants.OP_STATUS_QUEUED] *
1198 942e2262 Michael Hanselmann
                        (len(job.ops) - runcount))),
1199 942e2262 Michael Hanselmann
                      (["Res%s" % i for i in range(runcount)] +
1200 942e2262 Michael Hanselmann
                       ([None] * (len(job.ops) - runcount)))])
1201 942e2262 Michael Hanselmann
1202 942e2262 Michael Hanselmann
    # Must have been written and replicated
1203 942e2262 Michael Hanselmann
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1204 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1205 942e2262 Michael Hanselmann
1206 942e2262 Michael Hanselmann
  def testQueueShutdownWhileRunning(self):
1207 942e2262 Michael Hanselmann
    # Tests shutting down the queue while a job is running
1208 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1209 942e2262 Michael Hanselmann
1210 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1211 942e2262 Michael Hanselmann
           for i in range(3)]
1212 942e2262 Michael Hanselmann
1213 942e2262 Michael Hanselmann
    # Create job
1214 942e2262 Michael Hanselmann
    job_id = 2718211587
1215 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1216 942e2262 Michael Hanselmann
1217 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1218 942e2262 Michael Hanselmann
1219 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1220 942e2262 Michael Hanselmann
1221 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1222 942e2262 Michael Hanselmann
1223 942e2262 Michael Hanselmann
    # Run one opcode
1224 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1225 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1226 942e2262 Michael Hanselmann
1227 942e2262 Michael Hanselmann
    # Job goes back to queued
1228 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1229 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1230 942e2262 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1231 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
1232 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1233 942e2262 Michael Hanselmann
                      ["Res0", None, None]])
1234 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1235 942e2262 Michael Hanselmann
1236 942e2262 Michael Hanselmann
    # Writes for waiting, running and result
1237 942e2262 Michael Hanselmann
    for _ in range(3):
1238 942e2262 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1239 942e2262 Michael Hanselmann
1240 942e2262 Michael Hanselmann
    # Run second opcode
1241 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1242 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1243 942e2262 Michael Hanselmann
1244 942e2262 Michael Hanselmann
    # Job goes back to queued
1245 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1246 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1247 942e2262 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1248 942e2262 Michael Hanselmann
                       constants.OP_STATUS_SUCCESS,
1249 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1250 942e2262 Michael Hanselmann
                      ["Res0", "Res1", None]])
1251 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1252 942e2262 Michael Hanselmann
1253 942e2262 Michael Hanselmann
    # Writes for waiting, running and result
1254 942e2262 Michael Hanselmann
    for _ in range(3):
1255 942e2262 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1256 942e2262 Michael Hanselmann
1257 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 2)
1258 942e2262 Michael Hanselmann
1259 942e2262 Michael Hanselmann
  def testQueueShutdownWithLockTimeout(self):
1260 942e2262 Michael Hanselmann
    # Tests shutting down while a lock acquire times out
1261 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1262 942e2262 Michael Hanselmann
1263 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1264 942e2262 Michael Hanselmann
           for i in range(3)]
1265 942e2262 Michael Hanselmann
1266 942e2262 Michael Hanselmann
    # Create job
1267 942e2262 Michael Hanselmann
    job_id = 1304231178
1268 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1269 942e2262 Michael Hanselmann
1270 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1271 942e2262 Michael Hanselmann
1272 942e2262 Michael Hanselmann
    acquire_timeout = False
1273 942e2262 Michael Hanselmann
1274 942e2262 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1275 942e2262 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1276 942e2262 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1277 942e2262 Michael Hanselmann
      if acquire_timeout:
1278 942e2262 Michael Hanselmann
        raise mcpu.LockAcquireTimeout()
1279 942e2262 Michael Hanselmann
1280 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None)
1281 942e2262 Michael Hanselmann
1282 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1283 942e2262 Michael Hanselmann
1284 942e2262 Michael Hanselmann
    # Run one opcode
1285 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1286 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1287 942e2262 Michael Hanselmann
1288 942e2262 Michael Hanselmann
    # Job goes back to queued
1289 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1290 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1291 942e2262 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1292 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
1293 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1294 942e2262 Michael Hanselmann
                      ["Res0", None, None]])
1295 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1296 942e2262 Michael Hanselmann
1297 942e2262 Michael Hanselmann
    # Writes for waiting, running and result
1298 942e2262 Michael Hanselmann
    for _ in range(3):
1299 942e2262 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1300 942e2262 Michael Hanselmann
1301 942e2262 Michael Hanselmann
    # The next opcode should have expiring lock acquires
1302 942e2262 Michael Hanselmann
    acquire_timeout = True
1303 942e2262 Michael Hanselmann
1304 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 1)
1305 942e2262 Michael Hanselmann
1306 942e2262 Michael Hanselmann
  def testQueueShutdownWhileInQueue(self):
1307 942e2262 Michael Hanselmann
    # This should never happen in reality (no new jobs are started by the
1308 942e2262 Michael Hanselmann
    # workerpool once a shutdown has been initiated), but it's better to test
1309 942e2262 Michael Hanselmann
    # the job processor for this scenario
1310 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1311 942e2262 Michael Hanselmann
1312 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1313 942e2262 Michael Hanselmann
           for i in range(5)]
1314 942e2262 Michael Hanselmann
1315 942e2262 Michael Hanselmann
    # Create job
1316 942e2262 Michael Hanselmann
    job_id = 2031
1317 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1318 942e2262 Michael Hanselmann
1319 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1320 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1321 942e2262 Michael Hanselmann
1322 942e2262 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
1323 942e2262 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
1324 942e2262 Michael Hanselmann
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
1325 942e2262 Michael Hanselmann
                               for op in job.ops))
1326 942e2262 Michael Hanselmann
1327 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1328 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 0)
1329 942e2262 Michael Hanselmann
1330 942e2262 Michael Hanselmann
  def testQueueShutdownWhileWaitlockInQueue(self):
1331 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1332 942e2262 Michael Hanselmann
1333 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1334 942e2262 Michael Hanselmann
           for i in range(5)]
1335 942e2262 Michael Hanselmann
1336 942e2262 Michael Hanselmann
    # Create job
1337 942e2262 Michael Hanselmann
    job_id = 53125685
1338 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1339 942e2262 Michael Hanselmann
1340 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1341 942e2262 Michael Hanselmann
1342 942e2262 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_WAITING
1343 942e2262 Michael Hanselmann
1344 942e2262 Michael Hanselmann
    assert len(job.ops) == 5
1345 942e2262 Michael Hanselmann
1346 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1347 942e2262 Michael Hanselmann
1348 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1349 942e2262 Michael Hanselmann
1350 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1351 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 0)
1352 942e2262 Michael Hanselmann
1353 be760ba8 Michael Hanselmann
  def testPartiallyRun(self):
1354 be760ba8 Michael Hanselmann
    # Tests calling the processor on a job that's been partially run before the
1355 be760ba8 Michael Hanselmann
    # program was restarted
1356 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1357 be760ba8 Michael Hanselmann
1358 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1359 be760ba8 Michael Hanselmann
1360 be760ba8 Michael Hanselmann
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
1361 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1362 be760ba8 Michael Hanselmann
             for i in range(10)]
1363 be760ba8 Michael Hanselmann
1364 be760ba8 Michael Hanselmann
      # Create job
1365 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
1366 be760ba8 Michael Hanselmann
1367 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1368 be760ba8 Michael Hanselmann
1369 be760ba8 Michael Hanselmann
      for _ in range(successcount):
1370 75d81fc8 Michael Hanselmann
        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1371 75d81fc8 Michael Hanselmann
                         jqueue._JobProcessor.DEFER)
1372 be760ba8 Michael Hanselmann
1373 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1374 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
1375 be760ba8 Michael Hanselmann
                       [[constants.OP_STATUS_SUCCESS
1376 be760ba8 Michael Hanselmann
                         for _ in range(successcount)] +
1377 be760ba8 Michael Hanselmann
                        [constants.OP_STATUS_QUEUED
1378 be760ba8 Michael Hanselmann
                         for _ in range(len(ops) - successcount)]])
1379 be760ba8 Michael Hanselmann
1380 03b63608 Michael Hanselmann
      self.assert_(job.ops_iter)
1381 be760ba8 Michael Hanselmann
1382 be760ba8 Michael Hanselmann
      # Serialize and restore (simulates program restart)
1383 8a3cd185 Michael Hanselmann
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1384 03b63608 Michael Hanselmann
      self.assertFalse(newjob.ops_iter)
1385 be760ba8 Michael Hanselmann
      self._TestPartial(newjob, successcount)
1386 be760ba8 Michael Hanselmann
1387 be760ba8 Michael Hanselmann
  def _TestPartial(self, job, successcount):
1388 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1389 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1390 be760ba8 Michael Hanselmann
1391 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1392 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1393 be760ba8 Michael Hanselmann
1394 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops) - successcount)):
1395 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1396 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1397 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1398 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1399 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1400 be760ba8 Michael Hanselmann
1401 be760ba8 Michael Hanselmann
      if remaining == 0:
1402 be760ba8 Michael Hanselmann
        # Last opcode
1403 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1404 be760ba8 Michael Hanselmann
        break
1405 be760ba8 Michael Hanselmann
1406 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1407 be760ba8 Michael Hanselmann
1408 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1409 be760ba8 Michael Hanselmann
1410 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1411 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1412 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1413 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1414 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1415 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1416 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1417 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1418 be760ba8 Michael Hanselmann
                            for op in job.ops))
1419 be760ba8 Michael Hanselmann
1420 be760ba8 Michael Hanselmann
    self._GenericCheckJob(job)
1421 be760ba8 Michael Hanselmann
1422 66bd7445 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1423 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1424 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1425 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1426 be760ba8 Michael Hanselmann
1427 be760ba8 Michael Hanselmann
    # ... also after being restored
1428 8a3cd185 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1429 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1430 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1431 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1432 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1433 be760ba8 Michael Hanselmann
1434 be760ba8 Michael Hanselmann
  def testProcessorOnRunningJob(self):
1435 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1436 be760ba8 Michael Hanselmann
1437 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1438 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1439 be760ba8 Michael Hanselmann
1440 be760ba8 Michael Hanselmann
    # Create job
1441 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 9571, ops)
1442 be760ba8 Michael Hanselmann
1443 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1444 be760ba8 Michael Hanselmann
1445 be760ba8 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
1446 be760ba8 Michael Hanselmann
1447 be760ba8 Michael Hanselmann
    assert len(job.ops) == 1
1448 be760ba8 Michael Hanselmann
1449 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1450 be760ba8 Michael Hanselmann
1451 be760ba8 Michael Hanselmann
    # Calling on running job must fail
1452 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
1453 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job))
1454 be760ba8 Michael Hanselmann
1455 be760ba8 Michael Hanselmann
  def testLogMessages(self):
1456 be760ba8 Michael Hanselmann
    # Tests the "Feedback" callback function
1457 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1458 be760ba8 Michael Hanselmann
1459 be760ba8 Michael Hanselmann
    messages = {
1460 be760ba8 Michael Hanselmann
      1: [
1461 be760ba8 Michael Hanselmann
        (None, "Hello"),
1462 be760ba8 Michael Hanselmann
        (None, "World"),
1463 be760ba8 Michael Hanselmann
        (constants.ELOG_MESSAGE, "there"),
1464 be760ba8 Michael Hanselmann
        ],
1465 be760ba8 Michael Hanselmann
      4: [
1466 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1467 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1468 be760ba8 Michael Hanselmann
        ],
1469 be760ba8 Michael Hanselmann
      }
1470 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1471 be760ba8 Michael Hanselmann
                               messages=messages.get(i, []))
1472 be760ba8 Michael Hanselmann
           for i in range(5)]
1473 be760ba8 Michael Hanselmann
1474 be760ba8 Michael Hanselmann
    # Create job
1475 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 29386, ops)
1476 be760ba8 Michael Hanselmann
1477 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1478 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1479 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1480 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1481 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1482 be760ba8 Michael Hanselmann
1483 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
1484 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1485 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1486 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1487 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1488 be760ba8 Michael Hanselmann
1489 be760ba8 Michael Hanselmann
      self.assertRaises(AssertionError, cbs.Feedback,
1490 be760ba8 Michael Hanselmann
                        "too", "many", "arguments")
1491 be760ba8 Michael Hanselmann
1492 be760ba8 Michael Hanselmann
      for (log_type, msg) in op.messages:
1493 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1494 be760ba8 Michael Hanselmann
        if log_type:
1495 be760ba8 Michael Hanselmann
          cbs.Feedback(log_type, msg)
1496 be760ba8 Michael Hanselmann
        else:
1497 be760ba8 Michael Hanselmann
          cbs.Feedback(msg)
1498 ebb2a2a3 Michael Hanselmann
        # Check for job update without replication
1499 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1500 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1501 be760ba8 Michael Hanselmann
1502 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1503 be760ba8 Michael Hanselmann
1504 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops))):
1505 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1506 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1507 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1508 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1509 be760ba8 Michael Hanselmann
1510 be760ba8 Michael Hanselmann
      if remaining == 0:
1511 be760ba8 Michael Hanselmann
        # Last opcode
1512 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1513 be760ba8 Michael Hanselmann
        break
1514 be760ba8 Michael Hanselmann
1515 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1516 be760ba8 Michael Hanselmann
1517 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1518 be760ba8 Michael Hanselmann
1519 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1520 ebb2a2a3 Michael Hanselmann
1521 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1522 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1523 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1524 be760ba8 Michael Hanselmann
1525 be760ba8 Michael Hanselmann
    logmsgcount = sum(len(m) for m in messages.values())
1526 be760ba8 Michael Hanselmann
1527 be760ba8 Michael Hanselmann
    self._CheckLogMessages(job, logmsgcount)
1528 be760ba8 Michael Hanselmann
1529 be760ba8 Michael Hanselmann
    # Serialize and restore (simulates program restart)
1530 8a3cd185 Michael Hanselmann
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1531 be760ba8 Michael Hanselmann
    self._CheckLogMessages(newjob, logmsgcount)
1532 be760ba8 Michael Hanselmann
1533 be760ba8 Michael Hanselmann
    # Check each message
1534 be760ba8 Michael Hanselmann
    prevserial = -1
1535 be760ba8 Michael Hanselmann
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1536 be760ba8 Michael Hanselmann
      for (serial, timestamp, log_type, msg) in oplog:
1537 be760ba8 Michael Hanselmann
        (exptype, expmsg) = messages.get(idx).pop(0)
1538 be760ba8 Michael Hanselmann
        if exptype:
1539 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, exptype)
1540 be760ba8 Michael Hanselmann
        else:
1541 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1542 be760ba8 Michael Hanselmann
        self.assertEqual(expmsg, msg)
1543 be760ba8 Michael Hanselmann
        self.assert_(serial > prevserial)
1544 be760ba8 Michael Hanselmann
        prevserial = serial
1545 be760ba8 Michael Hanselmann
1546 be760ba8 Michael Hanselmann
  def _CheckLogMessages(self, job, count):
1547 be760ba8 Michael Hanselmann
    # Check serial
1548 be760ba8 Michael Hanselmann
    self.assertEqual(job.log_serial, count)
1549 be760ba8 Michael Hanselmann
1550 be760ba8 Michael Hanselmann
    # No filter
1551 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(None),
1552 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1553 be760ba8 Michael Hanselmann
                      for entry in entries])
1554 be760ba8 Michael Hanselmann
1555 be760ba8 Michael Hanselmann
    # Filter with serial
1556 be760ba8 Michael Hanselmann
    assert count > 3
1557 be760ba8 Michael Hanselmann
    self.assert_(job.GetLogEntries(3))
1558 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(3),
1559 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1560 be760ba8 Michael Hanselmann
                      for entry in entries][3:])
1561 be760ba8 Michael Hanselmann
1562 be760ba8 Michael Hanselmann
    # No log message after highest serial
1563 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count))
1564 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count + 3))
1565 be760ba8 Michael Hanselmann
1566 6a373640 Michael Hanselmann
  def testSubmitManyJobs(self):
1567 6a373640 Michael Hanselmann
    queue = _FakeQueueForProc()
1568 6a373640 Michael Hanselmann
1569 6a373640 Michael Hanselmann
    job_id = 15656
1570 6a373640 Michael Hanselmann
    ops = [
1571 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1572 6a373640 Michael Hanselmann
                          submit_jobs=[]),
1573 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1574 6a373640 Michael Hanselmann
                          submit_jobs=[
1575 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1576 6a373640 Michael Hanselmann
                            ]),
1577 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False,
1578 6a373640 Michael Hanselmann
                          submit_jobs=[
1579 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1580 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1581 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1582 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1583 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1584 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1585 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1586 6a373640 Michael Hanselmann
                            ]),
1587 6a373640 Michael Hanselmann
      ]
1588 6a373640 Michael Hanselmann
1589 6a373640 Michael Hanselmann
    # Create job
1590 6a373640 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1591 6a373640 Michael Hanselmann
1592 6a373640 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1593 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1594 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1595 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1596 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1597 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1598 6a373640 Michael Hanselmann
1599 6a373640 Michael Hanselmann
    def _AfterStart(op, cbs):
1600 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1601 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1602 6a373640 Michael Hanselmann
1603 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1604 6a373640 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1605 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1606 6a373640 Michael Hanselmann
1607 6a373640 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1608 6a373640 Michael Hanselmann
      (success, _) = job.Cancel()
1609 6a373640 Michael Hanselmann
      self.assertFalse(success)
1610 6a373640 Michael Hanselmann
1611 6a373640 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1612 6a373640 Michael Hanselmann
1613 6a373640 Michael Hanselmann
    for idx in range(len(ops)):
1614 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1615 6a373640 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1616 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1617 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1618 6a373640 Michael Hanselmann
      if idx == len(ops) - 1:
1619 6a373640 Michael Hanselmann
        # Last opcode
1620 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1621 6a373640 Michael Hanselmann
      else:
1622 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
1623 6a373640 Michael Hanselmann
1624 6a373640 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1625 6a373640 Michael Hanselmann
        self.assert_(job.start_timestamp)
1626 6a373640 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1627 6a373640 Michael Hanselmann
1628 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1629 6a373640 Michael Hanselmann
1630 6a373640 Michael Hanselmann
    for idx, submitted_ops in enumerate(job_ops
1631 6a373640 Michael Hanselmann
                                        for op in ops
1632 6a373640 Michael Hanselmann
                                        for job_ops in op.submit_jobs):
1633 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextSubmittedJob(),
1634 6a373640 Michael Hanselmann
                       (1000 + idx, submitted_ops))
1635 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1636 6a373640 Michael Hanselmann
1637 6a373640 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1638 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1639 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1640 6a373640 Michael Hanselmann
                     [[[], [1000], [1001, 1002, 1003]]])
1641 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1642 6a373640 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1643 6a373640 Michael Hanselmann
1644 6a373640 Michael Hanselmann
    self._GenericCheckJob(job)
1645 6a373640 Michael Hanselmann
1646 1e6d5750 Iustin Pop
    # Calling the processor on a finished job should be a no-op
1647 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1648 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1649 1e6d5750 Iustin Pop
    self.assertRaises(IndexError, queue.GetNextUpdate)
1650 6a373640 Michael Hanselmann
1651 b95479a5 Michael Hanselmann
  def testJobDependency(self):
1652 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1653 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1654 b95479a5 Michael Hanselmann
1655 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1656 b95479a5 Michael Hanselmann
1657 b95479a5 Michael Hanselmann
    prev_job_id = 22113
1658 b95479a5 Michael Hanselmann
    prev_job_id2 = 28102
1659 b95479a5 Michael Hanselmann
    job_id = 29929
1660 b95479a5 Michael Hanselmann
    ops = [
1661 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1662 b95479a5 Michael Hanselmann
                          depends=[
1663 b95479a5 Michael Hanselmann
                            [prev_job_id2, None],
1664 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1665 b95479a5 Michael Hanselmann
                            ]),
1666 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False),
1667 b95479a5 Michael Hanselmann
      ]
1668 b95479a5 Michael Hanselmann
1669 b95479a5 Michael Hanselmann
    # Create job
1670 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1671 b95479a5 Michael Hanselmann
1672 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1673 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1674 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1675 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1676 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1677 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1678 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1679 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1680 b95479a5 Michael Hanselmann
1681 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1682 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1683 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1684 b95479a5 Michael Hanselmann
1685 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1686 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1687 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1688 b95479a5 Michael Hanselmann
1689 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1690 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1691 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1692 b95479a5 Michael Hanselmann
1693 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1694 b95479a5 Michael Hanselmann
1695 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1696 b95479a5 Michael Hanselmann
1697 b95479a5 Michael Hanselmann
    counter = itertools.count()
1698 b95479a5 Michael Hanselmann
    while True:
1699 b95479a5 Michael Hanselmann
      attempt = counter.next()
1700 b95479a5 Michael Hanselmann
1701 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1702 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1703 b95479a5 Michael Hanselmann
1704 b95479a5 Michael Hanselmann
      if attempt < 2:
1705 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1706 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1707 b95479a5 Michael Hanselmann
      elif attempt == 2:
1708 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1709 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1710 b95479a5 Michael Hanselmann
        # The processor will ask for the next dependency immediately
1711 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1712 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1713 b95479a5 Michael Hanselmann
      elif attempt < 5:
1714 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1715 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1716 b95479a5 Michael Hanselmann
      elif attempt == 5:
1717 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1718 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1719 b95479a5 Michael Hanselmann
      if attempt == 2:
1720 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 2)
1721 b95479a5 Michael Hanselmann
      elif attempt > 5:
1722 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1723 b95479a5 Michael Hanselmann
      else:
1724 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1725 b95479a5 Michael Hanselmann
1726 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1727 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt >= 5:
1728 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1729 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1730 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1731 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1732 b95479a5 Michael Hanselmann
1733 b95479a5 Michael Hanselmann
      if attempt < 5:
1734 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1735 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1736 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1737 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1738 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1739 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1740 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1741 b95479a5 Michael Hanselmann
        continue
1742 b95479a5 Michael Hanselmann
1743 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1744 b95479a5 Michael Hanselmann
        # Last opcode
1745 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1746 b95479a5 Michael Hanselmann
        break
1747 b95479a5 Michael Hanselmann
1748 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1749 b95479a5 Michael Hanselmann
1750 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1751 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1752 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1753 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1754 b95479a5 Michael Hanselmann
1755 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1756 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1757 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1758 b95479a5 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1759 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1760 b95479a5 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1761 b95479a5 Michael Hanselmann
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1762 b95479a5 Michael Hanselmann
                               for op in job.ops))
1763 b95479a5 Michael Hanselmann
1764 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1765 b95479a5 Michael Hanselmann
1766 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1767 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1768 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1769 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountWaitingJobs())
1770 b95479a5 Michael Hanselmann
1771 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1772 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1773 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1774 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1775 b95479a5 Michael Hanselmann
1776 b95479a5 Michael Hanselmann
  def testJobDependencyCancel(self):
1777 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1778 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1779 b95479a5 Michael Hanselmann
1780 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1781 b95479a5 Michael Hanselmann
1782 b95479a5 Michael Hanselmann
    prev_job_id = 13623
1783 b95479a5 Michael Hanselmann
    job_id = 30876
1784 b95479a5 Michael Hanselmann
    ops = [
1785 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1786 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1787 b95479a5 Michael Hanselmann
                          depends=[
1788 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1789 b95479a5 Michael Hanselmann
                            ]),
1790 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1791 b95479a5 Michael Hanselmann
      ]
1792 b95479a5 Michael Hanselmann
1793 b95479a5 Michael Hanselmann
    # Create job
1794 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1795 b95479a5 Michael Hanselmann
1796 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1797 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1798 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1799 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1800 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1801 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1802 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1803 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1804 b95479a5 Michael Hanselmann
1805 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1806 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1807 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1808 b95479a5 Michael Hanselmann
1809 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1810 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1811 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1812 b95479a5 Michael Hanselmann
1813 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1814 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1815 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1816 b95479a5 Michael Hanselmann
1817 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1818 b95479a5 Michael Hanselmann
1819 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1820 b95479a5 Michael Hanselmann
1821 b95479a5 Michael Hanselmann
    counter = itertools.count()
1822 b95479a5 Michael Hanselmann
    while True:
1823 b95479a5 Michael Hanselmann
      attempt = counter.next()
1824 b95479a5 Michael Hanselmann
1825 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1826 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1827 b95479a5 Michael Hanselmann
1828 b95479a5 Michael Hanselmann
      if attempt == 0:
1829 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1830 b95479a5 Michael Hanselmann
        pass
1831 b95479a5 Michael Hanselmann
      elif attempt < 4:
1832 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1833 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1834 b95479a5 Michael Hanselmann
      elif attempt == 4:
1835 b95479a5 Michael Hanselmann
        # Other job was cancelled
1836 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1837 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1838 b95479a5 Michael Hanselmann
1839 b95479a5 Michael Hanselmann
      if attempt == 0:
1840 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1841 b95479a5 Michael Hanselmann
      else:
1842 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1843 b95479a5 Michael Hanselmann
1844 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1845 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
1846 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1847 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1848 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1849 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1850 b95479a5 Michael Hanselmann
1851 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
1852 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1853 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1854 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1855 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1856 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1857 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1858 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1859 b95479a5 Michael Hanselmann
        continue
1860 b95479a5 Michael Hanselmann
1861 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1862 b95479a5 Michael Hanselmann
        # Last opcode
1863 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1864 b95479a5 Michael Hanselmann
        break
1865 b95479a5 Michael Hanselmann
1866 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1867 b95479a5 Michael Hanselmann
1868 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1869 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1870 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1871 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1872 b95479a5 Michael Hanselmann
1873 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1874 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1875 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1876 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1877 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
1878 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
1879 b95479a5 Michael Hanselmann
                      ["Res0", "Job canceled by request",
1880 b95479a5 Michael Hanselmann
                       "Job canceled by request"]])
1881 b95479a5 Michael Hanselmann
1882 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1883 b95479a5 Michael Hanselmann
1884 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1885 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1886 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1887 b95479a5 Michael Hanselmann
1888 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1889 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1890 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1891 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1892 b95479a5 Michael Hanselmann
1893 b95479a5 Michael Hanselmann
  def testJobDependencyWrongstatus(self):
1894 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1895 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1896 b95479a5 Michael Hanselmann
1897 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1898 b95479a5 Michael Hanselmann
1899 b95479a5 Michael Hanselmann
    prev_job_id = 9741
1900 b95479a5 Michael Hanselmann
    job_id = 11763
1901 b95479a5 Michael Hanselmann
    ops = [
1902 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1903 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1904 b95479a5 Michael Hanselmann
                          depends=[
1905 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1906 b95479a5 Michael Hanselmann
                            ]),
1907 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1908 b95479a5 Michael Hanselmann
      ]
1909 b95479a5 Michael Hanselmann
1910 b95479a5 Michael Hanselmann
    # Create job
1911 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1912 b95479a5 Michael Hanselmann
1913 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1914 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1915 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1916 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1917 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1918 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1919 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1920 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1921 b95479a5 Michael Hanselmann
1922 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1923 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1924 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1925 b95479a5 Michael Hanselmann
1926 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1927 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1928 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1929 b95479a5 Michael Hanselmann
1930 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1931 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1932 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1933 b95479a5 Michael Hanselmann
1934 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1935 b95479a5 Michael Hanselmann
1936 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1937 b95479a5 Michael Hanselmann
1938 b95479a5 Michael Hanselmann
    counter = itertools.count()
1939 b95479a5 Michael Hanselmann
    while True:
1940 b95479a5 Michael Hanselmann
      attempt = counter.next()
1941 b95479a5 Michael Hanselmann
1942 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1943 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1944 b95479a5 Michael Hanselmann
1945 b95479a5 Michael Hanselmann
      if attempt == 0:
1946 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1947 b95479a5 Michael Hanselmann
        pass
1948 b95479a5 Michael Hanselmann
      elif attempt < 4:
1949 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1950 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1951 b95479a5 Michael Hanselmann
      elif attempt == 4:
1952 b95479a5 Michael Hanselmann
        # Other job failed
1953 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1954 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1955 b95479a5 Michael Hanselmann
1956 b95479a5 Michael Hanselmann
      if attempt == 0:
1957 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1958 b95479a5 Michael Hanselmann
      else:
1959 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1960 b95479a5 Michael Hanselmann
1961 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1962 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
1963 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1964 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1965 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1966 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1967 b95479a5 Michael Hanselmann
1968 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
1969 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1970 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1971 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1972 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1973 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1974 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1975 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1976 b95479a5 Michael Hanselmann
        continue
1977 b95479a5 Michael Hanselmann
1978 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1979 b95479a5 Michael Hanselmann
        # Last opcode
1980 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1981 b95479a5 Michael Hanselmann
        break
1982 b95479a5 Michael Hanselmann
1983 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1984 b95479a5 Michael Hanselmann
1985 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1986 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1987 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1988 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1989 b95479a5 Michael Hanselmann
1990 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1991 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1992 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1993 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1994 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR,
1995 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR]]),
1996 b95479a5 Michael Hanselmann
1997 b95479a5 Michael Hanselmann
    (opresult, ) = job.GetInfo(["opresult"])
1998 b95479a5 Michael Hanselmann
    self.assertEqual(len(opresult), len(ops))
1999 b95479a5 Michael Hanselmann
    self.assertEqual(opresult[0], "Res0")
2000 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[1]))
2001 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[2]))
2002 b95479a5 Michael Hanselmann
2003 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
2004 b95479a5 Michael Hanselmann
2005 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
2006 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2007 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
2008 b95479a5 Michael Hanselmann
2009 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
2010 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
2011 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
2012 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
2013 b95479a5 Michael Hanselmann
2014 be760ba8 Michael Hanselmann
2015 df5a5730 Michael Hanselmann
class TestEvaluateJobProcessorResult(unittest.TestCase):
2016 df5a5730 Michael Hanselmann
  def testFinished(self):
2017 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2018 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(30953)
2019 df5a5730 Michael Hanselmann
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2020 df5a5730 Michael Hanselmann
                                       jqueue._JobProcessor.FINISHED)
2021 df5a5730 Michael Hanselmann
    self.assertEqual(depmgr.GetNextNotification(), job.id)
2022 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2023 df5a5730 Michael Hanselmann
2024 df5a5730 Michael Hanselmann
  def testDefer(self):
2025 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2026 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(11326, priority=5463)
2027 df5a5730 Michael Hanselmann
    try:
2028 df5a5730 Michael Hanselmann
      jqueue._EvaluateJobProcessorResult(depmgr, job,
2029 df5a5730 Michael Hanselmann
                                         jqueue._JobProcessor.DEFER)
2030 df5a5730 Michael Hanselmann
    except workerpool.DeferTask, err:
2031 df5a5730 Michael Hanselmann
      self.assertEqual(err.priority, 5463)
2032 df5a5730 Michael Hanselmann
    else:
2033 df5a5730 Michael Hanselmann
      self.fail("Didn't raise exception")
2034 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2035 df5a5730 Michael Hanselmann
2036 df5a5730 Michael Hanselmann
  def testWaitdep(self):
2037 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2038 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(21317)
2039 df5a5730 Michael Hanselmann
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2040 df5a5730 Michael Hanselmann
                                       jqueue._JobProcessor.WAITDEP)
2041 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2042 df5a5730 Michael Hanselmann
2043 df5a5730 Michael Hanselmann
  def testOther(self):
2044 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2045 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(5813)
2046 df5a5730 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
2047 df5a5730 Michael Hanselmann
                      jqueue._EvaluateJobProcessorResult,
2048 df5a5730 Michael Hanselmann
                      depmgr, job, "Other result")
2049 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2050 df5a5730 Michael Hanselmann
2051 df5a5730 Michael Hanselmann
2052 26d3fd2f Michael Hanselmann
class _FakeTimeoutStrategy:
2053 26d3fd2f Michael Hanselmann
  def __init__(self, timeouts):
2054 26d3fd2f Michael Hanselmann
    self.timeouts = timeouts
2055 26d3fd2f Michael Hanselmann
    self.attempts = 0
2056 26d3fd2f Michael Hanselmann
    self.last_timeout = None
2057 26d3fd2f Michael Hanselmann
2058 26d3fd2f Michael Hanselmann
  def NextAttempt(self):
2059 26d3fd2f Michael Hanselmann
    self.attempts += 1
2060 26d3fd2f Michael Hanselmann
    if self.timeouts:
2061 26d3fd2f Michael Hanselmann
      timeout = self.timeouts.pop(0)
2062 26d3fd2f Michael Hanselmann
    else:
2063 26d3fd2f Michael Hanselmann
      timeout = None
2064 26d3fd2f Michael Hanselmann
    self.last_timeout = timeout
2065 26d3fd2f Michael Hanselmann
    return timeout
2066 26d3fd2f Michael Hanselmann
2067 26d3fd2f Michael Hanselmann
2068 26d3fd2f Michael Hanselmann
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
2069 26d3fd2f Michael Hanselmann
  def setUp(self):
2070 26d3fd2f Michael Hanselmann
    self.queue = _FakeQueueForProc()
2071 26d3fd2f Michael Hanselmann
    self.job = None
2072 26d3fd2f Michael Hanselmann
    self.curop = None
2073 26d3fd2f Michael Hanselmann
    self.opcounter = None
2074 26d3fd2f Michael Hanselmann
    self.timeout_strategy = None
2075 26d3fd2f Michael Hanselmann
    self.retries = 0
2076 26d3fd2f Michael Hanselmann
    self.prev_tsop = None
2077 26d3fd2f Michael Hanselmann
    self.prev_prio = None
2078 5fd6b694 Michael Hanselmann
    self.prev_status = None
2079 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = None
2080 26d3fd2f Michael Hanselmann
    self.gave_lock = None
2081 26d3fd2f Michael Hanselmann
    self.done_lock_before_blocking = False
2082 26d3fd2f Michael Hanselmann
2083 f23db633 Michael Hanselmann
  def _BeforeStart(self, timeout, priority):
2084 26d3fd2f Michael Hanselmann
    job = self.job
2085 26d3fd2f Michael Hanselmann
2086 5fd6b694 Michael Hanselmann
    # If status has changed, job must've been written
2087 5fd6b694 Michael Hanselmann
    if self.prev_status != self.job.ops[self.curop].status:
2088 5fd6b694 Michael Hanselmann
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2089 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2090 5fd6b694 Michael Hanselmann
2091 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
2092 47099cd1 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2093 26d3fd2f Michael Hanselmann
2094 26d3fd2f Michael Hanselmann
    ts = self.timeout_strategy
2095 26d3fd2f Michael Hanselmann
2096 26d3fd2f Michael Hanselmann
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
2097 26d3fd2f Michael Hanselmann
    self.assertEqual(timeout, ts.last_timeout)
2098 f23db633 Michael Hanselmann
    self.assertEqual(priority, job.ops[self.curop].priority)
2099 26d3fd2f Michael Hanselmann
2100 26d3fd2f Michael Hanselmann
    self.gave_lock = True
2101 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = priority
2102 26d3fd2f Michael Hanselmann
2103 26d3fd2f Michael Hanselmann
    if (self.curop == 3 and
2104 26d3fd2f Michael Hanselmann
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
2105 26d3fd2f Michael Hanselmann
      # Give locks before running into blocking acquire
2106 26d3fd2f Michael Hanselmann
      assert self.retries == 7
2107 26d3fd2f Michael Hanselmann
      self.retries = 0
2108 26d3fd2f Michael Hanselmann
      self.done_lock_before_blocking = True
2109 26d3fd2f Michael Hanselmann
      return
2110 26d3fd2f Michael Hanselmann
2111 26d3fd2f Michael Hanselmann
    if self.retries > 0:
2112 26d3fd2f Michael Hanselmann
      self.assert_(timeout is not None)
2113 26d3fd2f Michael Hanselmann
      self.retries -= 1
2114 26d3fd2f Michael Hanselmann
      self.gave_lock = False
2115 26d3fd2f Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
2116 26d3fd2f Michael Hanselmann
2117 26d3fd2f Michael Hanselmann
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
2118 26d3fd2f Michael Hanselmann
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
2119 26d3fd2f Michael Hanselmann
      assert not ts.timeouts
2120 26d3fd2f Michael Hanselmann
      self.assert_(timeout is None)
2121 26d3fd2f Michael Hanselmann
2122 26d3fd2f Michael Hanselmann
  def _AfterStart(self, op, cbs):
2123 26d3fd2f Michael Hanselmann
    job = self.job
2124 26d3fd2f Michael Hanselmann
2125 5fd6b694 Michael Hanselmann
    # Setting to "running" requires an update
2126 ebb2a2a3 Michael Hanselmann
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2127 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2128 5fd6b694 Michael Hanselmann
2129 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
2130 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
2131 26d3fd2f Michael Hanselmann
2132 26d3fd2f Michael Hanselmann
    # Job is running, cancelling shouldn't be possible
2133 26d3fd2f Michael Hanselmann
    (success, _) = job.Cancel()
2134 26d3fd2f Michael Hanselmann
    self.assertFalse(success)
2135 26d3fd2f Michael Hanselmann
2136 26d3fd2f Michael Hanselmann
  def _NextOpcode(self):
2137 26d3fd2f Michael Hanselmann
    self.curop = self.opcounter.next()
2138 26d3fd2f Michael Hanselmann
    self.prev_prio = self.job.ops[self.curop].priority
2139 5fd6b694 Michael Hanselmann
    self.prev_status = self.job.ops[self.curop].status
2140 26d3fd2f Michael Hanselmann
2141 26d3fd2f Michael Hanselmann
  def _NewTimeoutStrategy(self):
2142 26d3fd2f Michael Hanselmann
    job = self.job
2143 26d3fd2f Michael Hanselmann
2144 26d3fd2f Michael Hanselmann
    self.assertEqual(self.retries, 0)
2145 26d3fd2f Michael Hanselmann
2146 26d3fd2f Michael Hanselmann
    if self.prev_tsop == self.curop:
2147 26d3fd2f Michael Hanselmann
      # Still on the same opcode, priority must've been increased
2148 26d3fd2f Michael Hanselmann
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
2149 26d3fd2f Michael Hanselmann
2150 26d3fd2f Michael Hanselmann
    if self.curop == 1:
2151 26d3fd2f Michael Hanselmann
      # Normal retry
2152 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
2153 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts) - 1
2154 26d3fd2f Michael Hanselmann
2155 26d3fd2f Michael Hanselmann
    elif self.curop == 2:
2156 26d3fd2f Michael Hanselmann
      # Let this run into a blocking acquire
2157 26d3fd2f Michael Hanselmann
      timeouts = range(11, 61, 12)
2158 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
2159 26d3fd2f Michael Hanselmann
2160 26d3fd2f Michael Hanselmann
    elif self.curop == 3:
2161 26d3fd2f Michael Hanselmann
      # Wait for priority to increase, but give lock before blocking acquire
2162 26d3fd2f Michael Hanselmann
      timeouts = range(12, 100, 14)
2163 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
2164 26d3fd2f Michael Hanselmann
2165 26d3fd2f Michael Hanselmann
      self.assertFalse(self.done_lock_before_blocking)
2166 26d3fd2f Michael Hanselmann
2167 26d3fd2f Michael Hanselmann
    elif self.curop == 4:
2168 26d3fd2f Michael Hanselmann
      self.assert_(self.done_lock_before_blocking)
2169 26d3fd2f Michael Hanselmann
2170 26d3fd2f Michael Hanselmann
      # Timeouts, but no need to retry
2171 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
2172 26d3fd2f Michael Hanselmann
      self.retries = 0
2173 26d3fd2f Michael Hanselmann
2174 26d3fd2f Michael Hanselmann
    elif self.curop == 5:
2175 26d3fd2f Michael Hanselmann
      # Normal retry
2176 26d3fd2f Michael Hanselmann
      timeouts = range(19, 100, 11)
2177 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
2178 26d3fd2f Michael Hanselmann
2179 26d3fd2f Michael Hanselmann
    else:
2180 26d3fd2f Michael Hanselmann
      timeouts = []
2181 26d3fd2f Michael Hanselmann
      self.retries = 0
2182 26d3fd2f Michael Hanselmann
2183 26d3fd2f Michael Hanselmann
    assert len(job.ops) == 10
2184 26d3fd2f Michael Hanselmann
    assert self.retries <= len(timeouts)
2185 26d3fd2f Michael Hanselmann
2186 26d3fd2f Michael Hanselmann
    ts = _FakeTimeoutStrategy(timeouts)
2187 26d3fd2f Michael Hanselmann
2188 26d3fd2f Michael Hanselmann
    self.timeout_strategy = ts
2189 26d3fd2f Michael Hanselmann
    self.prev_tsop = self.curop
2190 26d3fd2f Michael Hanselmann
    self.prev_prio = job.ops[self.curop].priority
2191 26d3fd2f Michael Hanselmann
2192 26d3fd2f Michael Hanselmann
    return ts
2193 26d3fd2f Michael Hanselmann
2194 26d3fd2f Michael Hanselmann
  def testTimeout(self):
2195 26d3fd2f Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2196 26d3fd2f Michael Hanselmann
           for i in range(10)]
2197 26d3fd2f Michael Hanselmann
2198 26d3fd2f Michael Hanselmann
    # Create job
2199 26d3fd2f Michael Hanselmann
    job_id = 15801
2200 26d3fd2f Michael Hanselmann
    job = self._CreateJob(self.queue, job_id, ops)
2201 26d3fd2f Michael Hanselmann
    self.job = job
2202 26d3fd2f Michael Hanselmann
2203 26d3fd2f Michael Hanselmann
    self.opcounter = itertools.count(0)
2204 26d3fd2f Michael Hanselmann
2205 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
2206 ebb2a2a3 Michael Hanselmann
                                    self._AfterStart)
2207 26d3fd2f Michael Hanselmann
    tsf = self._NewTimeoutStrategy
2208 26d3fd2f Michael Hanselmann
2209 26d3fd2f Michael Hanselmann
    self.assertFalse(self.done_lock_before_blocking)
2210 26d3fd2f Michael Hanselmann
2211 5fd6b694 Michael Hanselmann
    while True:
2212 26d3fd2f Michael Hanselmann
      proc = jqueue._JobProcessor(self.queue, opexec, job,
2213 26d3fd2f Michael Hanselmann
                                  _timeout_strategy_factory=tsf)
2214 26d3fd2f Michael Hanselmann
2215 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
2216 5fd6b694 Michael Hanselmann
2217 5fd6b694 Michael Hanselmann
      if self.curop is not None:
2218 5fd6b694 Michael Hanselmann
        self.prev_status = self.job.ops[self.curop].status
2219 5fd6b694 Michael Hanselmann
2220 5fd6b694 Michael Hanselmann
      self.lock_acq_prio = None
2221 5fd6b694 Michael Hanselmann
2222 26d3fd2f Michael Hanselmann
      result = proc(_nextop_fn=self._NextOpcode)
2223 5fd6b694 Michael Hanselmann
      assert self.curop is not None
2224 5fd6b694 Michael Hanselmann
2225 3c631ea2 Michael Hanselmann
      # Input priority should never be set or modified
2226 3c631ea2 Michael Hanselmann
      self.assertFalse(compat.any(hasattr(op.input, "priority")
2227 3c631ea2 Michael Hanselmann
                                  for op in job.ops))
2228 3c631ea2 Michael Hanselmann
2229 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
2230 5fd6b694 Michael Hanselmann
        # Got lock and/or job is done, result must've been written
2231 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
2232 5fd6b694 Michael Hanselmann
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2233 5fd6b694 Michael Hanselmann
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
2234 5fd6b694 Michael Hanselmann
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
2235 5fd6b694 Michael Hanselmann
        self.assert_(job.ops[self.curop].exec_timestamp)
2236 5fd6b694 Michael Hanselmann
2237 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
2238 26d3fd2f Michael Hanselmann
        self.assertFalse(job.cur_opctx)
2239 26d3fd2f Michael Hanselmann
        break
2240 26d3fd2f Michael Hanselmann
2241 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
2242 26d3fd2f Michael Hanselmann
2243 5fd6b694 Michael Hanselmann
      if self.curop == 0:
2244 5fd6b694 Michael Hanselmann
        self.assertEqual(job.ops[self.curop].start_timestamp,
2245 5fd6b694 Michael Hanselmann
                         job.start_timestamp)
2246 5fd6b694 Michael Hanselmann
2247 26d3fd2f Michael Hanselmann
      if self.gave_lock:
2248 5fd6b694 Michael Hanselmann
        # Opcode finished, but job not yet done
2249 5fd6b694 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2250 26d3fd2f Michael Hanselmann
      else:
2251 5fd6b694 Michael Hanselmann
        # Did not get locks
2252 26d3fd2f Michael Hanselmann
        self.assert_(job.cur_opctx)
2253 26d3fd2f Michael Hanselmann
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
2254 26d3fd2f Michael Hanselmann
                         self.timeout_strategy.NextAttempt)
2255 5fd6b694 Michael Hanselmann
        self.assertFalse(job.ops[self.curop].exec_timestamp)
2256 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2257 5fd6b694 Michael Hanselmann
2258 5fd6b694 Michael Hanselmann
        # If priority has changed since acquiring locks, the job must've been
2259 5fd6b694 Michael Hanselmann
        # updated
2260 5fd6b694 Michael Hanselmann
        if self.lock_acq_prio != job.ops[self.curop].priority:
2261 5fd6b694 Michael Hanselmann
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2262 5fd6b694 Michael Hanselmann
2263 5fd6b694 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
2264 26d3fd2f Michael Hanselmann
2265 26d3fd2f Michael Hanselmann
      self.assert_(job.start_timestamp)
2266 26d3fd2f Michael Hanselmann
      self.assertFalse(job.end_timestamp)
2267 26d3fd2f Michael Hanselmann
2268 26d3fd2f Michael Hanselmann
    self.assertEqual(self.curop, len(job.ops) - 1)
2269 26d3fd2f Michael Hanselmann
    self.assertEqual(self.job, job)
2270 26d3fd2f Michael Hanselmann
    self.assertEqual(self.opcounter.next(), len(job.ops))
2271 26d3fd2f Michael Hanselmann
    self.assert_(self.done_lock_before_blocking)
2272 26d3fd2f Michael Hanselmann
2273 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2274 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2275 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2276 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
2277 26d3fd2f Michael Hanselmann
                     [[op.input.result for op in job.ops]])
2278 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
2279 26d3fd2f Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
2280 26d3fd2f Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
2281 26d3fd2f Michael Hanselmann
                            for op in job.ops))
2282 26d3fd2f Michael Hanselmann
2283 66bd7445 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
2284 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2285 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
2286 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2287 26d3fd2f Michael Hanselmann
2288 26d3fd2f Michael Hanselmann
2289 4679547e Michael Hanselmann
class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
2290 4679547e Michael Hanselmann
  def setUp(self):
2291 4679547e Michael Hanselmann
    self.queue = _FakeQueueForProc()
2292 4679547e Michael Hanselmann
    self.opexecprio = []
2293 4679547e Michael Hanselmann
2294 4679547e Michael Hanselmann
  def _BeforeStart(self, timeout, priority):
2295 4679547e Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
2296 4679547e Michael Hanselmann
    self.opexecprio.append(priority)
2297 4679547e Michael Hanselmann
2298 4679547e Michael Hanselmann
  def testChangePriorityWhileRunning(self):
2299 4679547e Michael Hanselmann
    # Tests changing the priority on a job while it has finished opcodes
2300 4679547e Michael Hanselmann
    # (successful) and more, unprocessed ones
2301 4679547e Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2302 4679547e Michael Hanselmann
           for i in range(3)]
2303 4679547e Michael Hanselmann
2304 4679547e Michael Hanselmann
    # Create job
2305 4679547e Michael Hanselmann
    job_id = 3499
2306 4679547e Michael Hanselmann
    job = self._CreateJob(self.queue, job_id, ops)
2307 4679547e Michael Hanselmann
2308 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2309 4679547e Michael Hanselmann
2310 4679547e Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
2311 4679547e Michael Hanselmann
2312 4679547e Michael Hanselmann
    # Run first opcode
2313 4679547e Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2314 4679547e Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
2315 4679547e Michael Hanselmann
2316 4679547e Michael Hanselmann
    # Job goes back to queued
2317 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2318 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2319 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2320 4679547e Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
2321 4679547e Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
2322 4679547e Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
2323 4679547e Michael Hanselmann
                      ["Res0", None, None]])
2324 4679547e Michael Hanselmann
2325 4679547e Michael Hanselmann
    self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
2326 4679547e Michael Hanselmann
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2327 4679547e Michael Hanselmann
2328 4679547e Michael Hanselmann
    # Change priority
2329 4679547e Michael Hanselmann
    self.assertEqual(job.ChangePriority(-10),
2330 4679547e Michael Hanselmann
                     (True,
2331 4679547e Michael Hanselmann
                      ("Priorities of pending opcodes for job 3499 have"
2332 4679547e Michael Hanselmann
                       " been changed to -10")))
2333 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -10)
2334 4679547e Michael Hanselmann
2335 4679547e Michael Hanselmann
    # Process second opcode
2336 4679547e Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2337 4679547e Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
2338 4679547e Michael Hanselmann
2339 4679547e Michael Hanselmann
    self.assertEqual(self.opexecprio.pop(0), -10)
2340 4679547e Michael Hanselmann
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2341 4679547e Michael Hanselmann
2342 4679547e Michael Hanselmann
    # Check status
2343 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2344 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -10)
2345 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2346 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
2347 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2348 4679547e Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
2349 4679547e Michael Hanselmann
                       constants.OP_STATUS_SUCCESS,
2350 4679547e Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
2351 4679547e Michael Hanselmann
                      ["Res0", "Res1", None]])
2352 4679547e Michael Hanselmann
2353 4679547e Michael Hanselmann
    # Change priority once more
2354 4679547e Michael Hanselmann
    self.assertEqual(job.ChangePriority(5),
2355 4679547e Michael Hanselmann
                     (True,
2356 4679547e Michael Hanselmann
                      ("Priorities of pending opcodes for job 3499 have"
2357 4679547e Michael Hanselmann
                       " been changed to 5")))
2358 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), 5)
2359 4679547e Michael Hanselmann
2360 4679547e Michael Hanselmann
    # Process third opcode
2361 4679547e Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2362 4679547e Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
2363 4679547e Michael Hanselmann
2364 4679547e Michael Hanselmann
    self.assertEqual(self.opexecprio.pop(0), 5)
2365 4679547e Michael Hanselmann
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2366 4679547e Michael Hanselmann
2367 4679547e Michael Hanselmann
    # Check status
2368 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2369 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2370 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2371 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2372 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2373 4679547e Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
2374 4679547e Michael Hanselmann
                       constants.OP_STATUS_SUCCESS,
2375 4679547e Michael Hanselmann
                       constants.OP_STATUS_SUCCESS],
2376 4679547e Michael Hanselmann
                      ["Res0", "Res1", "Res2"]])
2377 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
2378 4679547e Michael Hanselmann
                     [constants.OP_PRIO_DEFAULT, -10, 5])
2379 4679547e Michael Hanselmann
2380 4679547e Michael Hanselmann
2381 1b5150ab Michael Hanselmann
class _IdOnlyFakeJob:
2382 1b5150ab Michael Hanselmann
  def __init__(self, job_id, priority=NotImplemented):
2383 1b5150ab Michael Hanselmann
    self.id = str(job_id)
2384 1b5150ab Michael Hanselmann
    self._priority = priority
2385 1b5150ab Michael Hanselmann
2386 1b5150ab Michael Hanselmann
  def CalcPriority(self):
2387 1b5150ab Michael Hanselmann
    return self._priority
2388 b95479a5 Michael Hanselmann
2389 1b5150ab Michael Hanselmann
2390 1b5150ab Michael Hanselmann
class TestJobDependencyManager(unittest.TestCase):
2391 b95479a5 Michael Hanselmann
  def setUp(self):
2392 b95479a5 Michael Hanselmann
    self._status = []
2393 b95479a5 Michael Hanselmann
    self._queue = []
2394 b95479a5 Michael Hanselmann
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
2395 b95479a5 Michael Hanselmann
2396 b95479a5 Michael Hanselmann
  def _GetStatus(self, job_id):
2397 b95479a5 Michael Hanselmann
    (exp_job_id, result) = self._status.pop(0)
2398 b95479a5 Michael Hanselmann
    self.assertEqual(exp_job_id, job_id)
2399 b95479a5 Michael Hanselmann
    return result
2400 b95479a5 Michael Hanselmann
2401 b95479a5 Michael Hanselmann
  def _Enqueue(self, jobs):
2402 37d76f1e Michael Hanselmann
    self.assertFalse(self.jdm._lock.is_owned(),
2403 37d76f1e Michael Hanselmann
                     msg=("Must not own manager lock while re-adding jobs"
2404 37d76f1e Michael Hanselmann
                          " (potential deadlock)"))
2405 b95479a5 Michael Hanselmann
    self._queue.append(jobs)
2406 b95479a5 Michael Hanselmann
2407 b95479a5 Michael Hanselmann
  def testNotFinalizedThenCancel(self):
2408 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(17697)
2409 b95479a5 Michael Hanselmann
    job_id = str(28625)
2410 b95479a5 Michael Hanselmann
2411 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2412 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2413 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2414 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2415 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2416 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2417 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2418 b95479a5 Michael Hanselmann
      job_id: set([job]),
2419 b95479a5 Michael Hanselmann
      })
2420 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2421 fcb21ad7 Michael Hanselmann
      ("job/28625", None, None, [("job", [job.id])])
2422 fcb21ad7 Michael Hanselmann
      ])
2423 b95479a5 Michael Hanselmann
2424 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2425 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2426 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CANCEL)
2427 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2428 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2429 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2430 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2431 b95479a5 Michael Hanselmann
2432 942e2262 Michael Hanselmann
  def testNotFinalizedThenQueued(self):
2433 942e2262 Michael Hanselmann
    # This can happen on a queue shutdown
2434 942e2262 Michael Hanselmann
    job = _IdOnlyFakeJob(1320)
2435 942e2262 Michael Hanselmann
    job_id = str(22971)
2436 942e2262 Michael Hanselmann
2437 942e2262 Michael Hanselmann
    for i in range(5):
2438 942e2262 Michael Hanselmann
      if i > 2:
2439 942e2262 Michael Hanselmann
        self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2440 942e2262 Michael Hanselmann
      else:
2441 942e2262 Michael Hanselmann
        self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2442 942e2262 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2443 942e2262 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
2444 942e2262 Michael Hanselmann
      self.assertFalse(self._status)
2445 942e2262 Michael Hanselmann
      self.assertFalse(self._queue)
2446 942e2262 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
2447 942e2262 Michael Hanselmann
      self.assertEqual(self.jdm._waiters, {
2448 942e2262 Michael Hanselmann
        job_id: set([job]),
2449 942e2262 Michael Hanselmann
        })
2450 942e2262 Michael Hanselmann
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2451 942e2262 Michael Hanselmann
        ("job/22971", None, None, [("job", [job.id])])
2452 942e2262 Michael Hanselmann
        ])
2453 942e2262 Michael Hanselmann
2454 b95479a5 Michael Hanselmann
  def testRequireCancel(self):
2455 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(5278)
2456 b95479a5 Michael Hanselmann
    job_id = str(9610)
2457 b95479a5 Michael Hanselmann
    dep_status = [constants.JOB_STATUS_CANCELED]
2458 b95479a5 Michael Hanselmann
2459 47099cd1 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
2460 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2461 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2462 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2463 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2464 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2465 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2466 b95479a5 Michael Hanselmann
      job_id: set([job]),
2467 b95479a5 Michael Hanselmann
      })
2468 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2469 fcb21ad7 Michael Hanselmann
      ("job/9610", None, None, [("job", [job.id])])
2470 fcb21ad7 Michael Hanselmann
      ])
2471 b95479a5 Michael Hanselmann
2472 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2473 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2474 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2475 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2476 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2477 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2478 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2479 b95479a5 Michael Hanselmann
2480 b95479a5 Michael Hanselmann
  def testRequireError(self):
2481 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(21459)
2482 b95479a5 Michael Hanselmann
    job_id = str(25519)
2483 b95479a5 Michael Hanselmann
    dep_status = [constants.JOB_STATUS_ERROR]
2484 b95479a5 Michael Hanselmann
2485 47099cd1 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
2486 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2487 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2488 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2489 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2490 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2491 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2492 b95479a5 Michael Hanselmann
      job_id: set([job]),
2493 b95479a5 Michael Hanselmann
      })
2494 b95479a5 Michael Hanselmann
2495 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2496 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2497 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2498 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2499 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2500 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2501 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2502 b95479a5 Michael Hanselmann
2503 b95479a5 Michael Hanselmann
  def testRequireMultiple(self):
2504 b95479a5 Michael Hanselmann
    dep_status = list(constants.JOBS_FINALIZED)
2505 b95479a5 Michael Hanselmann
2506 b95479a5 Michael Hanselmann
    for end_status in dep_status:
2507 1b5150ab Michael Hanselmann
      job = _IdOnlyFakeJob(21343)
2508 b95479a5 Michael Hanselmann
      job_id = str(14609)
2509 b95479a5 Michael Hanselmann
2510 47099cd1 Michael Hanselmann
      self._status.append((job_id, constants.JOB_STATUS_WAITING))
2511 b95479a5 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2512 b95479a5 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
2513 b95479a5 Michael Hanselmann
      self.assertFalse(self._status)
2514 b95479a5 Michael Hanselmann
      self.assertFalse(self._queue)
2515 b95479a5 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
2516 b95479a5 Michael Hanselmann
      self.assertEqual(self.jdm._waiters, {
2517 b95479a5 Michael Hanselmann
        job_id: set([job]),
2518 b95479a5 Michael Hanselmann
        })
2519 fcb21ad7 Michael Hanselmann
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2520 fcb21ad7 Michael Hanselmann
        ("job/14609", None, None, [("job", [job.id])])
2521 fcb21ad7 Michael Hanselmann
        ])
2522 b95479a5 Michael Hanselmann
2523 b95479a5 Michael Hanselmann
      self._status.append((job_id, end_status))
2524 b95479a5 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2525 b95479a5 Michael Hanselmann
      self.assertEqual(result, self.jdm.CONTINUE)
2526 b95479a5 Michael Hanselmann
      self.assertFalse(self._status)
2527 b95479a5 Michael Hanselmann
      self.assertFalse(self._queue)
2528 b95479a5 Michael Hanselmann
      self.assertFalse(self.jdm.JobWaiting(job))
2529 fcb21ad7 Michael Hanselmann
      self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2530 b95479a5 Michael Hanselmann
2531 b95479a5 Michael Hanselmann
  def testNotify(self):
2532 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(8227)
2533 b95479a5 Michael Hanselmann
    job_id = str(4113)
2534 b95479a5 Michael Hanselmann
2535 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2536 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2537 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2538 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2539 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2540 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2541 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2542 b95479a5 Michael Hanselmann
      job_id: set([job]),
2543 b95479a5 Michael Hanselmann
      })
2544 b95479a5 Michael Hanselmann
2545 b95479a5 Michael Hanselmann
    self.jdm.NotifyWaiters(job_id)
2546 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2547 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm._waiters)
2548 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2549 b95479a5 Michael Hanselmann
    self.assertEqual(self._queue, [set([job])])
2550 b95479a5 Michael Hanselmann
2551 b95479a5 Michael Hanselmann
  def testWrongStatus(self):
2552 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(10102)
2553 b95479a5 Michael Hanselmann
    job_id = str(1271)
2554 b95479a5 Michael Hanselmann
2555 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2556 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2557 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2558 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2559 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2560 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2561 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2562 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2563 b95479a5 Michael Hanselmann
      job_id: set([job]),
2564 b95479a5 Michael Hanselmann
      })
2565 b95479a5 Michael Hanselmann
2566 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2567 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2568 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2569 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WRONGSTATUS)
2570 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2571 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2572 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2573 b95479a5 Michael Hanselmann
2574 b95479a5 Michael Hanselmann
  def testCorrectStatus(self):
2575 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(24273)
2576 b95479a5 Michael Hanselmann
    job_id = str(23885)
2577 b95479a5 Michael Hanselmann
2578 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2579 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2580 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2581 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2582 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2583 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2584 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2585 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2586 b95479a5 Michael Hanselmann
      job_id: set([job]),
2587 b95479a5 Michael Hanselmann
      })
2588 b95479a5 Michael Hanselmann
2589 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2590 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2591 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2592 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2593 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2594 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2595 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2596 b95479a5 Michael Hanselmann
2597 b95479a5 Michael Hanselmann
  def testFinalizedRightAway(self):
2598 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(224)
2599 b95479a5 Michael Hanselmann
    job_id = str(3081)
2600 b95479a5 Michael Hanselmann
2601 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2602 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2603 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2604 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2605 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2606 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2607 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2608 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2609 b95479a5 Michael Hanselmann
      job_id: set(),
2610 b95479a5 Michael Hanselmann
      })
2611 b95479a5 Michael Hanselmann
2612 b95479a5 Michael Hanselmann
    # Force cleanup
2613 b95479a5 Michael Hanselmann
    self.jdm.NotifyWaiters("0")
2614 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm._waiters)
2615 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2616 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2617 b95479a5 Michael Hanselmann
2618 fcb21ad7 Michael Hanselmann
  def testMultipleWaiting(self):
2619 fcb21ad7 Michael Hanselmann
    # Use a deterministic random generator
2620 fcb21ad7 Michael Hanselmann
    rnd = random.Random(21402)
2621 fcb21ad7 Michael Hanselmann
2622 fcb21ad7 Michael Hanselmann
    job_ids = map(str, rnd.sample(range(1, 10000), 150))
2623 fcb21ad7 Michael Hanselmann
2624 fcb21ad7 Michael Hanselmann
    waiters = dict((job_ids.pop(),
2625 1b5150ab Michael Hanselmann
                    set(map(_IdOnlyFakeJob,
2626 fcb21ad7 Michael Hanselmann
                            [job_ids.pop()
2627 fcb21ad7 Michael Hanselmann
                             for _ in range(rnd.randint(1, 20))])))
2628 fcb21ad7 Michael Hanselmann
                   for _ in range(10))
2629 fcb21ad7 Michael Hanselmann
2630 fcb21ad7 Michael Hanselmann
    # Ensure there are no duplicate job IDs
2631 fcb21ad7 Michael Hanselmann
    assert not utils.FindDuplicates(waiters.keys() +
2632 fcb21ad7 Michael Hanselmann
                                    [job.id
2633 fcb21ad7 Michael Hanselmann
                                     for jobs in waiters.values()
2634 fcb21ad7 Michael Hanselmann
                                     for job in jobs])
2635 fcb21ad7 Michael Hanselmann
2636 fcb21ad7 Michael Hanselmann
    # Register all jobs as waiters
2637 fcb21ad7 Michael Hanselmann
    for job_id, job in [(job_id, job)
2638 fcb21ad7 Michael Hanselmann
                        for (job_id, jobs) in waiters.items()
2639 fcb21ad7 Michael Hanselmann
                        for job in jobs]:
2640 fcb21ad7 Michael Hanselmann
      self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2641 fcb21ad7 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id,
2642 fcb21ad7 Michael Hanselmann
                                              [constants.JOB_STATUS_SUCCESS])
2643 fcb21ad7 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
2644 fcb21ad7 Michael Hanselmann
      self.assertFalse(self._status)
2645 fcb21ad7 Michael Hanselmann
      self.assertFalse(self._queue)
2646 fcb21ad7 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
2647 fcb21ad7 Michael Hanselmann
2648 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, waiters)
2649 fcb21ad7 Michael Hanselmann
2650 fcb21ad7 Michael Hanselmann
    def _MakeSet((name, mode, owner_names, pending)):
2651 fcb21ad7 Michael Hanselmann
      return (name, mode, owner_names,
2652 fcb21ad7 Michael Hanselmann
              [(pendmode, set(pend)) for (pendmode, pend) in pending])
2653 fcb21ad7 Michael Hanselmann
2654 fcb21ad7 Michael Hanselmann
    def _CheckLockInfo():
2655 fcb21ad7 Michael Hanselmann
      info = self.jdm.GetLockInfo([query.LQ_PENDING])
2656 fcb21ad7 Michael Hanselmann
      self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2657 fcb21ad7 Michael Hanselmann
        ("job/%s" % job_id, None, None,
2658 fcb21ad7 Michael Hanselmann
         [("job", set([job.id for job in jobs]))])
2659 fcb21ad7 Michael Hanselmann
        for job_id, jobs in waiters.items()
2660 fcb21ad7 Michael Hanselmann
        if jobs
2661 fcb21ad7 Michael Hanselmann
        ]))
2662 fcb21ad7 Michael Hanselmann
2663 fcb21ad7 Michael Hanselmann
    _CheckLockInfo()
2664 fcb21ad7 Michael Hanselmann
2665 fcb21ad7 Michael Hanselmann
    # Notify in random order
2666 fcb21ad7 Michael Hanselmann
    for job_id in rnd.sample(waiters, len(waiters)):
2667 fcb21ad7 Michael Hanselmann
      # Remove from pending waiter list
2668 fcb21ad7 Michael Hanselmann
      jobs = waiters.pop(job_id)
2669 fcb21ad7 Michael Hanselmann
      for job in jobs:
2670 fcb21ad7 Michael Hanselmann
        self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2671 fcb21ad7 Michael Hanselmann
        (result, _) = self.jdm.CheckAndRegister(job, job_id,
2672 fcb21ad7 Michael Hanselmann
                                                [constants.JOB_STATUS_SUCCESS])
2673 fcb21ad7 Michael Hanselmann
        self.assertEqual(result, self.jdm.CONTINUE)
2674 fcb21ad7 Michael Hanselmann
        self.assertFalse(self._status)
2675 fcb21ad7 Michael Hanselmann
        self.assertFalse(self._queue)
2676 fcb21ad7 Michael Hanselmann
        self.assertFalse(self.jdm.JobWaiting(job))
2677 fcb21ad7 Michael Hanselmann
2678 fcb21ad7 Michael Hanselmann
      _CheckLockInfo()
2679 fcb21ad7 Michael Hanselmann
2680 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2681 fcb21ad7 Michael Hanselmann
2682 fcb21ad7 Michael Hanselmann
    assert not waiters
2683 fcb21ad7 Michael Hanselmann
2684 b95479a5 Michael Hanselmann
  def testSelfDependency(self):
2685 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(18937)
2686 b95479a5 Michael Hanselmann
2687 b95479a5 Michael Hanselmann
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2688 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2689 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.ERROR)
2690 b95479a5 Michael Hanselmann
2691 b95479a5 Michael Hanselmann
  def testJobDisappears(self):
2692 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(30540)
2693 b95479a5 Michael Hanselmann
    job_id = str(23769)
2694 b95479a5 Michael Hanselmann
2695 b95479a5 Michael Hanselmann
    def _FakeStatus(_):
2696 b95479a5 Michael Hanselmann
      raise errors.JobLost("#msg#")
2697 b95479a5 Michael Hanselmann
2698 b95479a5 Michael Hanselmann
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2699 b95479a5 Michael Hanselmann
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2700 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.ERROR)
2701 b95479a5 Michael Hanselmann
    self.assertFalse(jdm.JobWaiting(job))
2702 fcb21ad7 Michael Hanselmann
    self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2703 b95479a5 Michael Hanselmann
2704 b95479a5 Michael Hanselmann
2705 989a8bee Michael Hanselmann
if __name__ == "__main__":
2706 989a8bee Michael Hanselmann
  testutils.GanetiTestProgram()