Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ fb60bc6a

History | View | Annotate | Download (94.5 kB)

1 989a8bee Michael Hanselmann
#!/usr/bin/python
2 989a8bee Michael Hanselmann
#
3 989a8bee Michael Hanselmann
4 76b62028 Iustin Pop
# Copyright (C) 2010, 2011, 2012 Google Inc.
5 989a8bee Michael Hanselmann
#
6 989a8bee Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 989a8bee Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 989a8bee Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 989a8bee Michael Hanselmann
# (at your option) any later version.
10 989a8bee Michael Hanselmann
#
11 989a8bee Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 989a8bee Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 989a8bee Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 989a8bee Michael Hanselmann
# General Public License for more details.
15 989a8bee Michael Hanselmann
#
16 989a8bee Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 989a8bee Michael Hanselmann
# along with this program; if not, write to the Free Software
18 989a8bee Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 989a8bee Michael Hanselmann
# 02110-1301, USA.
20 989a8bee Michael Hanselmann
21 989a8bee Michael Hanselmann
22 989a8bee Michael Hanselmann
"""Script for testing ganeti.jqueue"""
23 989a8bee Michael Hanselmann
24 989a8bee Michael Hanselmann
import os
25 989a8bee Michael Hanselmann
import sys
26 989a8bee Michael Hanselmann
import unittest
27 989a8bee Michael Hanselmann
import tempfile
28 989a8bee Michael Hanselmann
import shutil
29 989a8bee Michael Hanselmann
import errno
30 26d3fd2f Michael Hanselmann
import itertools
31 fcb21ad7 Michael Hanselmann
import random
32 4679547e Michael Hanselmann
import operator
33 989a8bee Michael Hanselmann
34 989a8bee Michael Hanselmann
from ganeti import constants
35 989a8bee Michael Hanselmann
from ganeti import utils
36 989a8bee Michael Hanselmann
from ganeti import errors
37 989a8bee Michael Hanselmann
from ganeti import jqueue
38 8f5c488d Michael Hanselmann
from ganeti import opcodes
39 8f5c488d Michael Hanselmann
from ganeti import compat
40 26d3fd2f Michael Hanselmann
from ganeti import mcpu
41 fcb21ad7 Michael Hanselmann
from ganeti import query
42 df5a5730 Michael Hanselmann
from ganeti import workerpool
43 989a8bee Michael Hanselmann
44 989a8bee Michael Hanselmann
import testutils
45 989a8bee Michael Hanselmann
46 989a8bee Michael Hanselmann
47 989a8bee Michael Hanselmann
class _FakeJob:
48 989a8bee Michael Hanselmann
  def __init__(self, job_id, status):
49 989a8bee Michael Hanselmann
    self.id = job_id
50 c0f6d0d8 Michael Hanselmann
    self.writable = False
51 989a8bee Michael Hanselmann
    self._status = status
52 989a8bee Michael Hanselmann
    self._log = []
53 989a8bee Michael Hanselmann
54 989a8bee Michael Hanselmann
  def SetStatus(self, status):
55 989a8bee Michael Hanselmann
    self._status = status
56 989a8bee Michael Hanselmann
57 989a8bee Michael Hanselmann
  def AddLogEntry(self, msg):
58 989a8bee Michael Hanselmann
    self._log.append((len(self._log), msg))
59 989a8bee Michael Hanselmann
60 989a8bee Michael Hanselmann
  def CalcStatus(self):
61 989a8bee Michael Hanselmann
    return self._status
62 989a8bee Michael Hanselmann
63 989a8bee Michael Hanselmann
  def GetInfo(self, fields):
64 989a8bee Michael Hanselmann
    result = []
65 989a8bee Michael Hanselmann
66 989a8bee Michael Hanselmann
    for name in fields:
67 989a8bee Michael Hanselmann
      if name == "status":
68 989a8bee Michael Hanselmann
        result.append(self._status)
69 989a8bee Michael Hanselmann
      else:
70 989a8bee Michael Hanselmann
        raise Exception("Unknown field")
71 989a8bee Michael Hanselmann
72 989a8bee Michael Hanselmann
    return result
73 989a8bee Michael Hanselmann
74 989a8bee Michael Hanselmann
  def GetLogEntries(self, newer_than):
75 989a8bee Michael Hanselmann
    assert newer_than is None or newer_than >= 0
76 989a8bee Michael Hanselmann
77 989a8bee Michael Hanselmann
    if newer_than is None:
78 989a8bee Michael Hanselmann
      return self._log
79 989a8bee Michael Hanselmann
80 989a8bee Michael Hanselmann
    return self._log[newer_than:]
81 989a8bee Michael Hanselmann
82 989a8bee Michael Hanselmann
83 989a8bee Michael Hanselmann
class TestJobChangesChecker(unittest.TestCase):
84 989a8bee Michael Hanselmann
  def testStatus(self):
85 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
86 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
87 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
88 989a8bee Michael Hanselmann
89 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
90 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
91 989a8bee Michael Hanselmann
92 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
93 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
94 989a8bee Michael Hanselmann
95 989a8bee Michael Hanselmann
    # job.id is used by checker
96 989a8bee Michael Hanselmann
    self.assertEqual(job.id, 9094)
97 989a8bee Michael Hanselmann
98 989a8bee Michael Hanselmann
  def testStatusWithPrev(self):
99 989a8bee Michael Hanselmann
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
100 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"],
101 989a8bee Michael Hanselmann
                                        [constants.JOB_STATUS_QUEUED], None)
102 989a8bee Michael Hanselmann
    self.assert_(checker(job) is None)
103 989a8bee Michael Hanselmann
104 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
105 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
106 989a8bee Michael Hanselmann
107 989a8bee Michael Hanselmann
  def testFinalStatus(self):
108 989a8bee Michael Hanselmann
    for status in constants.JOBS_FINALIZED:
109 989a8bee Michael Hanselmann
      job = _FakeJob(2178711, status)
110 989a8bee Michael Hanselmann
      checker = jqueue._JobChangesChecker(["status"], [status], None)
111 989a8bee Michael Hanselmann
      # There won't be any changes in this status, hence it should signal
112 989a8bee Michael Hanselmann
      # a change immediately
113 989a8bee Michael Hanselmann
      self.assertEqual(checker(job), ([status], []))
114 989a8bee Michael Hanselmann
115 989a8bee Michael Hanselmann
  def testLog(self):
116 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
117 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
118 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
119 989a8bee Michael Hanselmann
120 989a8bee Michael Hanselmann
    job.AddLogEntry("Hello World")
121 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker(job)
122 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
123 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"]])
124 989a8bee Michael Hanselmann
125 989a8bee Michael Hanselmann
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
126 989a8bee Michael Hanselmann
    self.assert_(checker2(job) is None)
127 989a8bee Michael Hanselmann
128 989a8bee Michael Hanselmann
    job.AddLogEntry("Foo Bar")
129 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_ERROR)
130 989a8bee Michael Hanselmann
131 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker2(job)
132 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
134 989a8bee Michael Hanselmann
135 989a8bee Michael Hanselmann
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
136 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker3(job)
137 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
138 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
139 989a8bee Michael Hanselmann
140 989a8bee Michael Hanselmann
141 989a8bee Michael Hanselmann
class TestJobChangesWaiter(unittest.TestCase):
142 989a8bee Michael Hanselmann
  def setUp(self):
143 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
144 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
145 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
146 989a8bee Michael Hanselmann
147 989a8bee Michael Hanselmann
  def tearDown(self):
148 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
149 989a8bee Michael Hanselmann
150 989a8bee Michael Hanselmann
  def _EnsureNotifierClosed(self, notifier):
151 989a8bee Michael Hanselmann
    try:
152 989a8bee Michael Hanselmann
      os.fstat(notifier._fd)
153 989a8bee Michael Hanselmann
    except EnvironmentError, err:
154 989a8bee Michael Hanselmann
      self.assertEqual(err.errno, errno.EBADF)
155 989a8bee Michael Hanselmann
    else:
156 989a8bee Michael Hanselmann
      self.fail("File descriptor wasn't closed")
157 989a8bee Michael Hanselmann
158 989a8bee Michael Hanselmann
  def testClose(self):
159 989a8bee Michael Hanselmann
    for wait in [False, True]:
160 989a8bee Michael Hanselmann
      waiter = jqueue._JobFileChangesWaiter(self.filename)
161 989a8bee Michael Hanselmann
      try:
162 989a8bee Michael Hanselmann
        if wait:
163 989a8bee Michael Hanselmann
          waiter.Wait(0.001)
164 989a8bee Michael Hanselmann
      finally:
165 989a8bee Michael Hanselmann
        waiter.Close()
166 989a8bee Michael Hanselmann
167 989a8bee Michael Hanselmann
      # Ensure file descriptor was closed
168 989a8bee Michael Hanselmann
      self._EnsureNotifierClosed(waiter._notifier)
169 989a8bee Michael Hanselmann
170 989a8bee Michael Hanselmann
  def testChangingFile(self):
171 989a8bee Michael Hanselmann
    waiter = jqueue._JobFileChangesWaiter(self.filename)
172 989a8bee Michael Hanselmann
    try:
173 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
174 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
175 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
176 989a8bee Michael Hanselmann
    finally:
177 989a8bee Michael Hanselmann
      waiter.Close()
178 989a8bee Michael Hanselmann
179 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._notifier)
180 989a8bee Michael Hanselmann
181 989a8bee Michael Hanselmann
  def testChangingFile2(self):
182 989a8bee Michael Hanselmann
    waiter = jqueue._JobChangesWaiter(self.filename)
183 989a8bee Michael Hanselmann
    try:
184 989a8bee Michael Hanselmann
      self.assertFalse(waiter._filewaiter)
185 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(0.1))
186 989a8bee Michael Hanselmann
      self.assert_(waiter._filewaiter)
187 989a8bee Michael Hanselmann
188 989a8bee Michael Hanselmann
      # File waiter is now used, but there have been no changes
189 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
190 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
191 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
192 989a8bee Michael Hanselmann
    finally:
193 989a8bee Michael Hanselmann
      waiter.Close()
194 989a8bee Michael Hanselmann
195 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
196 989a8bee Michael Hanselmann
197 989a8bee Michael Hanselmann
198 989a8bee Michael Hanselmann
class TestWaitForJobChangesHelper(unittest.TestCase):
199 989a8bee Michael Hanselmann
  def setUp(self):
200 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
201 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
202 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
203 989a8bee Michael Hanselmann
204 989a8bee Michael Hanselmann
  def tearDown(self):
205 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
206 989a8bee Michael Hanselmann
207 989a8bee Michael Hanselmann
  def _LoadWaitingJob(self):
208 47099cd1 Michael Hanselmann
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
209 989a8bee Michael Hanselmann
210 989a8bee Michael Hanselmann
  def _LoadLostJob(self):
211 989a8bee Michael Hanselmann
    return None
212 989a8bee Michael Hanselmann
213 989a8bee Michael Hanselmann
  def testNoChanges(self):
214 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
215 989a8bee Michael Hanselmann
216 989a8bee Michael Hanselmann
    # No change
217 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
218 47099cd1 Michael Hanselmann
                          [constants.JOB_STATUS_WAITING], None, 0.1),
219 989a8bee Michael Hanselmann
                     constants.JOB_NOTCHANGED)
220 989a8bee Michael Hanselmann
221 989a8bee Michael Hanselmann
    # No previous information
222 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
223 989a8bee Michael Hanselmann
                          ["status"], None, None, 1.0),
224 47099cd1 Michael Hanselmann
                     ([constants.JOB_STATUS_WAITING], []))
225 989a8bee Michael Hanselmann
226 989a8bee Michael Hanselmann
  def testLostJob(self):
227 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
228 989a8bee Michael Hanselmann
    self.assert_(wfjc(self.filename, self._LoadLostJob,
229 989a8bee Michael Hanselmann
                      ["status"], None, None, 1.0) is None)
230 989a8bee Michael Hanselmann
231 989a8bee Michael Hanselmann
232 6760e4ed Michael Hanselmann
class TestEncodeOpError(unittest.TestCase):
233 6760e4ed Michael Hanselmann
  def test(self):
234 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
235 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
236 6760e4ed Michael Hanselmann
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
237 6760e4ed Michael Hanselmann
238 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
239 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
240 6760e4ed Michael Hanselmann
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
241 6760e4ed Michael Hanselmann
242 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
243 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
244 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
245 6760e4ed Michael Hanselmann
246 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError("Hello World")
247 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
248 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
249 6760e4ed Michael Hanselmann
250 6760e4ed Michael Hanselmann
251 8f5c488d Michael Hanselmann
class TestQueuedOpCode(unittest.TestCase):
252 8f5c488d Michael Hanselmann
  def testDefaults(self):
253 8f5c488d Michael Hanselmann
    def _Check(op):
254 8f5c488d Michael Hanselmann
      self.assertFalse(hasattr(op.input, "dry_run"))
255 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
256 8f5c488d Michael Hanselmann
      self.assertFalse(op.log)
257 8f5c488d Michael Hanselmann
      self.assert_(op.start_timestamp is None)
258 8f5c488d Michael Hanselmann
      self.assert_(op.exec_timestamp is None)
259 8f5c488d Michael Hanselmann
      self.assert_(op.end_timestamp is None)
260 8f5c488d Michael Hanselmann
      self.assert_(op.result is None)
261 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
262 8f5c488d Michael Hanselmann
263 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
264 8f5c488d Michael Hanselmann
    _Check(op1)
265 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
266 8f5c488d Michael Hanselmann
    _Check(op2)
267 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
268 8f5c488d Michael Hanselmann
269 8f5c488d Michael Hanselmann
  def testPriority(self):
270 8f5c488d Michael Hanselmann
    def _Check(op):
271 8f5c488d Michael Hanselmann
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
272 8f5c488d Michael Hanselmann
             "Default priority equals high priority; test can't work"
273 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
274 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
275 8f5c488d Michael Hanselmann
276 c6afb1ca Iustin Pop
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
277 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(inpop)
278 8f5c488d Michael Hanselmann
    _Check(op1)
279 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
280 8f5c488d Michael Hanselmann
    _Check(op2)
281 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
282 8f5c488d Michael Hanselmann
283 8f5c488d Michael Hanselmann
284 8f5c488d Michael Hanselmann
class TestQueuedJob(unittest.TestCase):
285 4679547e Michael Hanselmann
  def testNoOpCodes(self):
286 5f6b0b71 Michael Hanselmann
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
287 c0f6d0d8 Michael Hanselmann
                      None, 1, [], False)
288 5f6b0b71 Michael Hanselmann
289 8f5c488d Michael Hanselmann
  def testDefaults(self):
290 8f5c488d Michael Hanselmann
    job_id = 4260
291 8f5c488d Michael Hanselmann
    ops = [
292 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(),
293 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
294 8f5c488d Michael Hanselmann
      ]
295 8f5c488d Michael Hanselmann
296 8f5c488d Michael Hanselmann
    def _Check(job):
297 c0f6d0d8 Michael Hanselmann
      self.assertTrue(job.writable)
298 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
299 8f5c488d Michael Hanselmann
      self.assertEqual(job.log_serial, 0)
300 8f5c488d Michael Hanselmann
      self.assert_(job.received_timestamp)
301 8f5c488d Michael Hanselmann
      self.assert_(job.start_timestamp is None)
302 8f5c488d Michael Hanselmann
      self.assert_(job.end_timestamp is None)
303 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
304 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
305 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
306 8f5c488d Michael Hanselmann
      self.assertEqual(len(job.ops), len(ops))
307 8f5c488d Michael Hanselmann
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
308 8f5c488d Michael Hanselmann
                              for (inp, op) in zip(ops, job.ops)))
309 a06c6ae8 Michael Hanselmann
      self.assertRaises(errors.OpPrereqError, job.GetInfo,
310 5f6b0b71 Michael Hanselmann
                        ["unknown-field"])
311 5f6b0b71 Michael Hanselmann
      self.assertEqual(job.GetInfo(["summary"]),
312 5f6b0b71 Michael Hanselmann
                       [[op.input.Summary() for op in job.ops]])
313 8a3cd185 Michael Hanselmann
      self.assertFalse(job.archived)
314 8f5c488d Michael Hanselmann
315 c0f6d0d8 Michael Hanselmann
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
316 8f5c488d Michael Hanselmann
    _Check(job1)
317 8a3cd185 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True, False)
318 8f5c488d Michael Hanselmann
    _Check(job2)
319 8f5c488d Michael Hanselmann
    self.assertEqual(job1.Serialize(), job2.Serialize())
320 8f5c488d Michael Hanselmann
321 c0f6d0d8 Michael Hanselmann
  def testWritable(self):
322 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
323 c0f6d0d8 Michael Hanselmann
    self.assertFalse(job.writable)
324 c0f6d0d8 Michael Hanselmann
325 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
326 c0f6d0d8 Michael Hanselmann
    self.assertTrue(job.writable)
327 c0f6d0d8 Michael Hanselmann
328 8a3cd185 Michael Hanselmann
  def testArchived(self):
329 8a3cd185 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
330 8a3cd185 Michael Hanselmann
    self.assertFalse(job.archived)
331 8a3cd185 Michael Hanselmann
332 8a3cd185 Michael Hanselmann
    newjob = jqueue._QueuedJob.Restore(None, job.Serialize(), True, True)
333 8a3cd185 Michael Hanselmann
    self.assertTrue(newjob.archived)
334 8a3cd185 Michael Hanselmann
335 8a3cd185 Michael Hanselmann
    newjob2 = jqueue._QueuedJob.Restore(None, newjob.Serialize(), True, False)
336 8a3cd185 Michael Hanselmann
    self.assertFalse(newjob2.archived)
337 8a3cd185 Michael Hanselmann
338 8f5c488d Michael Hanselmann
  def testPriority(self):
339 8f5c488d Michael Hanselmann
    job_id = 4283
340 8f5c488d Michael Hanselmann
    ops = [
341 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
342 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
343 8f5c488d Michael Hanselmann
      ]
344 8f5c488d Michael Hanselmann
345 8f5c488d Michael Hanselmann
    def _Check(job):
346 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
347 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
348 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
349 8f5c488d Michael Hanselmann
350 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, job_id, ops, True)
351 8f5c488d Michael Hanselmann
    _Check(job)
352 8f5c488d Michael Hanselmann
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
353 8f5c488d Michael Hanselmann
                            for op in job.ops))
354 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
355 8f5c488d Michael Hanselmann
356 8f5c488d Michael Hanselmann
    # Increase first
357 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 1
358 8f5c488d Michael Hanselmann
    _Check(job)
359 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
360 8f5c488d Michael Hanselmann
361 8f5c488d Michael Hanselmann
    # Mark opcode as finished
362 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
363 8f5c488d Michael Hanselmann
    _Check(job)
364 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
365 8f5c488d Michael Hanselmann
366 8f5c488d Michael Hanselmann
    # Increase second
367 8f5c488d Michael Hanselmann
    job.ops[1].priority -= 10
368 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
369 8f5c488d Michael Hanselmann
370 8f5c488d Michael Hanselmann
    # Test increasing first
371 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
372 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 19
373 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
374 8f5c488d Michael Hanselmann
375 4679547e Michael Hanselmann
  def _JobForPriority(self, job_id):
376 4679547e Michael Hanselmann
    ops = [
377 4679547e Michael Hanselmann
      opcodes.OpTagsGet(),
378 4679547e Michael Hanselmann
      opcodes.OpTestDelay(),
379 4679547e Michael Hanselmann
      opcodes.OpTagsGet(),
380 4679547e Michael Hanselmann
      opcodes.OpTestDelay(),
381 4679547e Michael Hanselmann
      ]
382 4679547e Michael Hanselmann
383 4679547e Michael Hanselmann
    job = jqueue._QueuedJob(None, job_id, ops, True)
384 4679547e Michael Hanselmann
385 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
386 4679547e Michael Hanselmann
                               for op in job.ops))
387 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
388 4679547e Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
389 4679547e Michael Hanselmann
                                for op in job.ops))
390 4679547e Michael Hanselmann
391 4679547e Michael Hanselmann
    return job
392 4679547e Michael Hanselmann
393 4679547e Michael Hanselmann
  def testChangePriorityAllQueued(self):
394 4679547e Michael Hanselmann
    job = self._JobForPriority(24984)
395 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
396 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
397 4679547e Michael Hanselmann
                               for op in job.ops))
398 4679547e Michael Hanselmann
    result = job.ChangePriority(-10)
399 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -10)
400 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
401 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.input.priority == -10 for op in job.ops))
402 4679547e Michael Hanselmann
    self.assertEqual(result,
403 4679547e Michael Hanselmann
                     (True, ("Priorities of pending opcodes for job 24984 have"
404 4679547e Michael Hanselmann
                             " been changed to -10")))
405 4679547e Michael Hanselmann
406 4679547e Michael Hanselmann
  def testChangePriorityAllFinished(self):
407 4679547e Michael Hanselmann
    job = self._JobForPriority(16405)
408 4679547e Michael Hanselmann
409 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
410 4679547e Michael Hanselmann
      if idx > 2:
411 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
412 4679547e Michael Hanselmann
      else:
413 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
414 4679547e Michael Hanselmann
415 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
416 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
417 4679547e Michael Hanselmann
    result = job.ChangePriority(-10)
418 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
419 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
420 4679547e Michael Hanselmann
                               for op in job.ops))
421 4679547e Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
422 4679547e Michael Hanselmann
                                for op in job.ops))
423 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
424 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
425 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
426 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
427 4679547e Michael Hanselmann
      constants.OP_STATUS_ERROR,
428 4679547e Michael Hanselmann
      ])
429 4679547e Michael Hanselmann
    self.assertEqual(result, (False, "Job 16405 is finished"))
430 4679547e Michael Hanselmann
431 4679547e Michael Hanselmann
  def testChangePriorityCancelling(self):
432 4679547e Michael Hanselmann
    job = self._JobForPriority(31572)
433 4679547e Michael Hanselmann
434 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
435 4679547e Michael Hanselmann
      if idx > 1:
436 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
437 4679547e Michael Hanselmann
      else:
438 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
439 4679547e Michael Hanselmann
440 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
441 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
442 4679547e Michael Hanselmann
    result = job.ChangePriority(5)
443 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
444 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
445 4679547e Michael Hanselmann
                               for op in job.ops))
446 4679547e Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
447 4679547e Michael Hanselmann
                                for op in job.ops))
448 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
449 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
450 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
451 4679547e Michael Hanselmann
      constants.OP_STATUS_CANCELING,
452 4679547e Michael Hanselmann
      constants.OP_STATUS_CANCELING,
453 4679547e Michael Hanselmann
      ])
454 4679547e Michael Hanselmann
    self.assertEqual(result, (False, "Job 31572 is cancelling"))
455 4679547e Michael Hanselmann
456 4679547e Michael Hanselmann
  def testChangePriorityFirstRunning(self):
457 4679547e Michael Hanselmann
    job = self._JobForPriority(1716215889)
458 4679547e Michael Hanselmann
459 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
460 4679547e Michael Hanselmann
      if idx == 0:
461 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_RUNNING
462 4679547e Michael Hanselmann
      else:
463 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_QUEUED
464 4679547e Michael Hanselmann
465 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
466 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
467 4679547e Michael Hanselmann
    result = job.ChangePriority(7)
468 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
469 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
470 4679547e Michael Hanselmann
                     [constants.OP_PRIO_DEFAULT, 7, 7, 7])
471 4679547e Michael Hanselmann
    self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
472 4679547e Michael Hanselmann
                     [None, 7, 7, 7])
473 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
474 4679547e Michael Hanselmann
      constants.OP_STATUS_RUNNING,
475 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
476 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
477 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
478 4679547e Michael Hanselmann
      ])
479 4679547e Michael Hanselmann
    self.assertEqual(result,
480 4679547e Michael Hanselmann
                     (True, ("Priorities of pending opcodes for job"
481 4679547e Michael Hanselmann
                             " 1716215889 have been changed to 7")))
482 4679547e Michael Hanselmann
483 4679547e Michael Hanselmann
  def testChangePriorityLastRunning(self):
484 4679547e Michael Hanselmann
    job = self._JobForPriority(1308)
485 4679547e Michael Hanselmann
486 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
487 4679547e Michael Hanselmann
      if idx == (len(job.ops) - 1):
488 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_RUNNING
489 4679547e Michael Hanselmann
      else:
490 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
491 4679547e Michael Hanselmann
492 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
493 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
494 4679547e Michael Hanselmann
    result = job.ChangePriority(-3)
495 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
496 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
497 4679547e Michael Hanselmann
                               for op in job.ops))
498 4679547e Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
499 4679547e Michael Hanselmann
                                for op in job.ops))
500 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
501 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
502 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
503 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
504 4679547e Michael Hanselmann
      constants.OP_STATUS_RUNNING,
505 4679547e Michael Hanselmann
      ])
506 4679547e Michael Hanselmann
    self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
507 4679547e Michael Hanselmann
508 4679547e Michael Hanselmann
  def testChangePrioritySecondOpcodeRunning(self):
509 4679547e Michael Hanselmann
    job = self._JobForPriority(27701)
510 4679547e Michael Hanselmann
511 4679547e Michael Hanselmann
    self.assertEqual(len(job.ops), 4)
512 4679547e Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
513 4679547e Michael Hanselmann
    job.ops[1].status = constants.OP_STATUS_RUNNING
514 4679547e Michael Hanselmann
    job.ops[2].status = constants.OP_STATUS_QUEUED
515 4679547e Michael Hanselmann
    job.ops[3].status = constants.OP_STATUS_QUEUED
516 4679547e Michael Hanselmann
517 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
518 4679547e Michael Hanselmann
    result = job.ChangePriority(-19)
519 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -19)
520 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
521 4679547e Michael Hanselmann
                     [constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
522 4679547e Michael Hanselmann
                      -19, -19])
523 4679547e Michael Hanselmann
    self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
524 4679547e Michael Hanselmann
                     [None, None, -19, -19])
525 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
526 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
527 4679547e Michael Hanselmann
      constants.OP_STATUS_RUNNING,
528 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
529 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
530 4679547e Michael Hanselmann
      ])
531 4679547e Michael Hanselmann
    self.assertEqual(result,
532 4679547e Michael Hanselmann
                     (True, ("Priorities of pending opcodes for job"
533 4679547e Michael Hanselmann
                             " 27701 have been changed to -19")))
534 4679547e Michael Hanselmann
535 4679547e Michael Hanselmann
  def testChangePriorityWithInconsistentJob(self):
536 4679547e Michael Hanselmann
    job = self._JobForPriority(30097)
537 4679547e Michael Hanselmann
538 4679547e Michael Hanselmann
    self.assertEqual(len(job.ops), 4)
539 4679547e Michael Hanselmann
540 4679547e Michael Hanselmann
    # This job is invalid (as it has two opcodes marked as running) and make
541 4679547e Michael Hanselmann
    # the call fail because an unprocessed opcode precedes a running one (which
542 4679547e Michael Hanselmann
    # should never happen in reality)
543 4679547e Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
544 4679547e Michael Hanselmann
    job.ops[1].status = constants.OP_STATUS_RUNNING
545 4679547e Michael Hanselmann
    job.ops[2].status = constants.OP_STATUS_QUEUED
546 4679547e Michael Hanselmann
    job.ops[3].status = constants.OP_STATUS_RUNNING
547 4679547e Michael Hanselmann
548 4679547e Michael Hanselmann
    self.assertRaises(AssertionError, job.ChangePriority, 19)
549 4679547e Michael Hanselmann
550 db5bce34 Michael Hanselmann
  def testCalcStatus(self):
551 db5bce34 Michael Hanselmann
    def _Queued(ops):
552 db5bce34 Michael Hanselmann
      # The default status is "queued"
553 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
554 db5bce34 Michael Hanselmann
                              for op in ops))
555 db5bce34 Michael Hanselmann
556 db5bce34 Michael Hanselmann
    def _Waitlock1(ops):
557 47099cd1 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_WAITING
558 db5bce34 Michael Hanselmann
559 db5bce34 Michael Hanselmann
    def _Waitlock2(ops):
560 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
561 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
562 47099cd1 Michael Hanselmann
      ops[2].status = constants.OP_STATUS_WAITING
563 db5bce34 Michael Hanselmann
564 db5bce34 Michael Hanselmann
    def _Running(ops):
565 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
566 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_RUNNING
567 db5bce34 Michael Hanselmann
      for op in ops[2:]:
568 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_QUEUED
569 db5bce34 Michael Hanselmann
570 db5bce34 Michael Hanselmann
    def _Canceling1(ops):
571 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
572 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
573 db5bce34 Michael Hanselmann
      for op in ops[2:]:
574 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
575 db5bce34 Michael Hanselmann
576 db5bce34 Michael Hanselmann
    def _Canceling2(ops):
577 db5bce34 Michael Hanselmann
      for op in ops:
578 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
579 db5bce34 Michael Hanselmann
580 db5bce34 Michael Hanselmann
    def _Canceled(ops):
581 db5bce34 Michael Hanselmann
      for op in ops:
582 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELED
583 db5bce34 Michael Hanselmann
584 db5bce34 Michael Hanselmann
    def _Error1(ops):
585 db5bce34 Michael Hanselmann
      for idx, op in enumerate(ops):
586 db5bce34 Michael Hanselmann
        if idx > 3:
587 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_ERROR
588 db5bce34 Michael Hanselmann
        else:
589 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_SUCCESS
590 db5bce34 Michael Hanselmann
591 db5bce34 Michael Hanselmann
    def _Error2(ops):
592 db5bce34 Michael Hanselmann
      for op in ops:
593 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
594 db5bce34 Michael Hanselmann
595 db5bce34 Michael Hanselmann
    def _Success(ops):
596 db5bce34 Michael Hanselmann
      for op in ops:
597 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
598 db5bce34 Michael Hanselmann
599 db5bce34 Michael Hanselmann
    tests = {
600 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_QUEUED: [_Queued],
601 47099cd1 Michael Hanselmann
      constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
602 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_RUNNING: [_Running],
603 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
604 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELED: [_Canceled],
605 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
606 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_SUCCESS: [_Success],
607 db5bce34 Michael Hanselmann
      }
608 db5bce34 Michael Hanselmann
609 db5bce34 Michael Hanselmann
    def _NewJob():
610 db5bce34 Michael Hanselmann
      job = jqueue._QueuedJob(None, 1,
611 c0f6d0d8 Michael Hanselmann
                              [opcodes.OpTestDelay() for _ in range(10)],
612 c0f6d0d8 Michael Hanselmann
                              True)
613 db5bce34 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
614 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
615 db5bce34 Michael Hanselmann
                              for op in job.ops))
616 db5bce34 Michael Hanselmann
      return job
617 db5bce34 Michael Hanselmann
618 db5bce34 Michael Hanselmann
    for status in constants.JOB_STATUS_ALL:
619 db5bce34 Michael Hanselmann
      sttests = tests[status]
620 db5bce34 Michael Hanselmann
      assert sttests
621 db5bce34 Michael Hanselmann
      for fn in sttests:
622 db5bce34 Michael Hanselmann
        job = _NewJob()
623 db5bce34 Michael Hanselmann
        fn(job.ops)
624 db5bce34 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), status)
625 db5bce34 Michael Hanselmann
626 8f5c488d Michael Hanselmann
627 b95479a5 Michael Hanselmann
class _FakeDependencyManager:
628 be760ba8 Michael Hanselmann
  def __init__(self):
629 b95479a5 Michael Hanselmann
    self._checks = []
630 b95479a5 Michael Hanselmann
    self._notifications = []
631 b95479a5 Michael Hanselmann
    self._waiting = set()
632 b95479a5 Michael Hanselmann
633 b95479a5 Michael Hanselmann
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
634 b95479a5 Michael Hanselmann
    self._checks.append((job, dep_job_id, dep_status, result))
635 b95479a5 Michael Hanselmann
636 b95479a5 Michael Hanselmann
  def CountPendingResults(self):
637 b95479a5 Michael Hanselmann
    return len(self._checks)
638 b95479a5 Michael Hanselmann
639 b95479a5 Michael Hanselmann
  def CountWaitingJobs(self):
640 b95479a5 Michael Hanselmann
    return len(self._waiting)
641 b95479a5 Michael Hanselmann
642 b95479a5 Michael Hanselmann
  def GetNextNotification(self):
643 b95479a5 Michael Hanselmann
    return self._notifications.pop(0)
644 b95479a5 Michael Hanselmann
645 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
646 b95479a5 Michael Hanselmann
    return job in self._waiting
647 b95479a5 Michael Hanselmann
648 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
649 b95479a5 Michael Hanselmann
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
650 b95479a5 Michael Hanselmann
651 b95479a5 Michael Hanselmann
    assert exp_job == job
652 b95479a5 Michael Hanselmann
    assert exp_dep_job_id == dep_job_id
653 b95479a5 Michael Hanselmann
    assert exp_dep_status == dep_status
654 b95479a5 Michael Hanselmann
655 b95479a5 Michael Hanselmann
    (result_status, _) = result
656 b95479a5 Michael Hanselmann
657 b95479a5 Michael Hanselmann
    if result_status == jqueue._JobDependencyManager.WAIT:
658 b95479a5 Michael Hanselmann
      self._waiting.add(job)
659 b95479a5 Michael Hanselmann
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
660 b95479a5 Michael Hanselmann
      self._waiting.remove(job)
661 b95479a5 Michael Hanselmann
662 b95479a5 Michael Hanselmann
    return result
663 b95479a5 Michael Hanselmann
664 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
665 b95479a5 Michael Hanselmann
    self._notifications.append(job_id)
666 b95479a5 Michael Hanselmann
667 b95479a5 Michael Hanselmann
668 b95479a5 Michael Hanselmann
class _DisabledFakeDependencyManager:
669 b95479a5 Michael Hanselmann
  def JobWaiting(self, _):
670 b95479a5 Michael Hanselmann
    return False
671 b95479a5 Michael Hanselmann
672 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, *args):
673 b95479a5 Michael Hanselmann
    assert False, "Should not be called"
674 b95479a5 Michael Hanselmann
675 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, _):
676 b95479a5 Michael Hanselmann
    pass
677 b95479a5 Michael Hanselmann
678 b95479a5 Michael Hanselmann
679 b95479a5 Michael Hanselmann
class _FakeQueueForProc:
680 b95479a5 Michael Hanselmann
  def __init__(self, depmgr=None):
681 be760ba8 Michael Hanselmann
    self._acquired = False
682 ebb2a2a3 Michael Hanselmann
    self._updates = []
683 6a373640 Michael Hanselmann
    self._submitted = []
684 942e2262 Michael Hanselmann
    self._accepting_jobs = True
685 6a373640 Michael Hanselmann
686 6a373640 Michael Hanselmann
    self._submit_count = itertools.count(1000)
687 be760ba8 Michael Hanselmann
688 b95479a5 Michael Hanselmann
    if depmgr:
689 b95479a5 Michael Hanselmann
      self.depmgr = depmgr
690 b95479a5 Michael Hanselmann
    else:
691 b95479a5 Michael Hanselmann
      self.depmgr = _DisabledFakeDependencyManager()
692 b95479a5 Michael Hanselmann
693 be760ba8 Michael Hanselmann
  def IsAcquired(self):
694 be760ba8 Michael Hanselmann
    return self._acquired
695 be760ba8 Michael Hanselmann
696 ebb2a2a3 Michael Hanselmann
  def GetNextUpdate(self):
697 ebb2a2a3 Michael Hanselmann
    return self._updates.pop(0)
698 ebb2a2a3 Michael Hanselmann
699 6a373640 Michael Hanselmann
  def GetNextSubmittedJob(self):
700 6a373640 Michael Hanselmann
    return self._submitted.pop(0)
701 6a373640 Michael Hanselmann
702 be760ba8 Michael Hanselmann
  def acquire(self, shared=0):
703 be760ba8 Michael Hanselmann
    assert shared == 1
704 be760ba8 Michael Hanselmann
    self._acquired = True
705 be760ba8 Michael Hanselmann
706 be760ba8 Michael Hanselmann
  def release(self):
707 be760ba8 Michael Hanselmann
    assert self._acquired
708 be760ba8 Michael Hanselmann
    self._acquired = False
709 be760ba8 Michael Hanselmann
710 ebb2a2a3 Michael Hanselmann
  def UpdateJobUnlocked(self, job, replicate=True):
711 ebb2a2a3 Michael Hanselmann
    assert self._acquired, "Lock not acquired while updating job"
712 ebb2a2a3 Michael Hanselmann
    self._updates.append((job, bool(replicate)))
713 be760ba8 Michael Hanselmann
714 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
715 6a373640 Michael Hanselmann
    assert not self._acquired, "Lock acquired while submitting jobs"
716 6a373640 Michael Hanselmann
    job_ids = [self._submit_count.next() for _ in jobs]
717 6a373640 Michael Hanselmann
    self._submitted.extend(zip(job_ids, jobs))
718 6a373640 Michael Hanselmann
    return job_ids
719 6a373640 Michael Hanselmann
720 942e2262 Michael Hanselmann
  def StopAcceptingJobs(self):
721 942e2262 Michael Hanselmann
    self._accepting_jobs = False
722 942e2262 Michael Hanselmann
723 942e2262 Michael Hanselmann
  def AcceptingJobsUnlocked(self):
724 942e2262 Michael Hanselmann
    return self._accepting_jobs
725 942e2262 Michael Hanselmann
726 be760ba8 Michael Hanselmann
727 be760ba8 Michael Hanselmann
class _FakeExecOpCodeForProc:
728 ebb2a2a3 Michael Hanselmann
  def __init__(self, queue, before_start, after_start):
729 ebb2a2a3 Michael Hanselmann
    self._queue = queue
730 be760ba8 Michael Hanselmann
    self._before_start = before_start
731 be760ba8 Michael Hanselmann
    self._after_start = after_start
732 be760ba8 Michael Hanselmann
733 e4e59de8 Michael Hanselmann
  def __call__(self, op, cbs, timeout=None):
734 be760ba8 Michael Hanselmann
    assert isinstance(op, opcodes.OpTestDummy)
735 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired(), \
736 ebb2a2a3 Michael Hanselmann
           "Queue lock not released when executing opcode"
737 be760ba8 Michael Hanselmann
738 be760ba8 Michael Hanselmann
    if self._before_start:
739 e4e59de8 Michael Hanselmann
      self._before_start(timeout, cbs.CurrentPriority())
740 be760ba8 Michael Hanselmann
741 be760ba8 Michael Hanselmann
    cbs.NotifyStart()
742 be760ba8 Michael Hanselmann
743 be760ba8 Michael Hanselmann
    if self._after_start:
744 be760ba8 Michael Hanselmann
      self._after_start(op, cbs)
745 be760ba8 Michael Hanselmann
746 ebb2a2a3 Michael Hanselmann
    # Check again after the callbacks
747 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired()
748 ebb2a2a3 Michael Hanselmann
749 be760ba8 Michael Hanselmann
    if op.fail:
750 be760ba8 Michael Hanselmann
      raise errors.OpExecError("Error requested (%s)" % op.result)
751 be760ba8 Michael Hanselmann
752 6a373640 Michael Hanselmann
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
753 6a373640 Michael Hanselmann
      return cbs.SubmitManyJobs(op.submit_jobs)
754 6a373640 Michael Hanselmann
755 be760ba8 Michael Hanselmann
    return op.result
756 be760ba8 Michael Hanselmann
757 be760ba8 Michael Hanselmann
758 26d3fd2f Michael Hanselmann
class _JobProcessorTestUtils:
759 be760ba8 Michael Hanselmann
  def _CreateJob(self, queue, job_id, ops):
760 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(queue, job_id, ops, True)
761 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
762 be760ba8 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
763 be760ba8 Michael Hanselmann
    self.assertEqual(len(ops), len(job.ops))
764 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.input == inp
765 be760ba8 Michael Hanselmann
                            for (op, inp) in zip(job.ops, ops)))
766 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
767 be760ba8 Michael Hanselmann
    return job
768 be760ba8 Michael Hanselmann
769 26d3fd2f Michael Hanselmann
770 26d3fd2f Michael Hanselmann
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
771 be760ba8 Michael Hanselmann
  def _GenericCheckJob(self, job):
772 be760ba8 Michael Hanselmann
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
773 be760ba8 Michael Hanselmann
                      for op in job.ops)
774 be760ba8 Michael Hanselmann
775 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
776 be760ba8 Michael Hanselmann
                     [[op.start_timestamp for op in job.ops],
777 be760ba8 Michael Hanselmann
                      [op.exec_timestamp for op in job.ops],
778 be760ba8 Michael Hanselmann
                      [op.end_timestamp for op in job.ops]])
779 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
780 be760ba8 Michael Hanselmann
                     [job.received_timestamp,
781 be760ba8 Michael Hanselmann
                      job.start_timestamp,
782 be760ba8 Michael Hanselmann
                      job.end_timestamp])
783 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
784 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
785 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
786 be760ba8 Michael Hanselmann
787 be760ba8 Michael Hanselmann
  def testSuccess(self):
788 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
789 be760ba8 Michael Hanselmann
790 be760ba8 Michael Hanselmann
    for (job_id, opcount) in [(25351, 1), (6637, 3),
791 be760ba8 Michael Hanselmann
                              (24644, 10), (32207, 100)]:
792 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
793 be760ba8 Michael Hanselmann
             for i in range(opcount)]
794 be760ba8 Michael Hanselmann
795 be760ba8 Michael Hanselmann
      # Create job
796 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
797 be760ba8 Michael Hanselmann
798 f23db633 Michael Hanselmann
      def _BeforeStart(timeout, priority):
799 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
800 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
801 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
802 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
803 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
804 be760ba8 Michael Hanselmann
805 be760ba8 Michael Hanselmann
      def _AfterStart(op, cbs):
806 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
807 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
808 ebb2a2a3 Michael Hanselmann
809 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
810 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
811 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
812 be760ba8 Michael Hanselmann
813 be760ba8 Michael Hanselmann
        # Job is running, cancelling shouldn't be possible
814 be760ba8 Michael Hanselmann
        (success, _) = job.Cancel()
815 be760ba8 Michael Hanselmann
        self.assertFalse(success)
816 be760ba8 Michael Hanselmann
817 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
818 be760ba8 Michael Hanselmann
819 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
820 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
821 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
822 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
823 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
824 be760ba8 Michael Hanselmann
        if idx == len(ops) - 1:
825 be760ba8 Michael Hanselmann
          # Last opcode
826 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
827 be760ba8 Michael Hanselmann
        else:
828 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.DEFER)
829 be760ba8 Michael Hanselmann
830 be760ba8 Michael Hanselmann
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
831 be760ba8 Michael Hanselmann
          self.assert_(job.start_timestamp)
832 be760ba8 Michael Hanselmann
          self.assertFalse(job.end_timestamp)
833 be760ba8 Michael Hanselmann
834 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
835 ebb2a2a3 Michael Hanselmann
836 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
837 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
838 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opresult"]),
839 be760ba8 Michael Hanselmann
                       [[op.input.result for op in job.ops]])
840 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
841 be760ba8 Michael Hanselmann
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
842 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
843 be760ba8 Michael Hanselmann
                              for op in job.ops))
844 be760ba8 Michael Hanselmann
845 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
846 be760ba8 Michael Hanselmann
847 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
848 75d81fc8 Michael Hanselmann
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
849 75d81fc8 Michael Hanselmann
                       jqueue._JobProcessor.FINISHED)
850 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
851 be760ba8 Michael Hanselmann
852 be760ba8 Michael Hanselmann
  def testOpcodeError(self):
853 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
854 be760ba8 Michael Hanselmann
855 be760ba8 Michael Hanselmann
    testdata = [
856 be760ba8 Michael Hanselmann
      (17077, 1, 0, 0),
857 be760ba8 Michael Hanselmann
      (1782, 5, 2, 2),
858 be760ba8 Michael Hanselmann
      (18179, 10, 9, 9),
859 be760ba8 Michael Hanselmann
      (4744, 10, 3, 8),
860 be760ba8 Michael Hanselmann
      (23816, 100, 39, 45),
861 be760ba8 Michael Hanselmann
      ]
862 be760ba8 Michael Hanselmann
863 be760ba8 Michael Hanselmann
    for (job_id, opcount, failfrom, failto) in testdata:
864 be760ba8 Michael Hanselmann
      # Prepare opcodes
865 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
866 be760ba8 Michael Hanselmann
                                 fail=(failfrom <= i and
867 be760ba8 Michael Hanselmann
                                       i <= failto))
868 be760ba8 Michael Hanselmann
             for i in range(opcount)]
869 be760ba8 Michael Hanselmann
870 be760ba8 Michael Hanselmann
      # Create job
871 a06c6ae8 Michael Hanselmann
      job = self._CreateJob(queue, str(job_id), ops)
872 be760ba8 Michael Hanselmann
873 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, None, None)
874 be760ba8 Michael Hanselmann
875 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
876 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
877 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
878 ebb2a2a3 Michael Hanselmann
        # queued to waitlock
879 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
880 ebb2a2a3 Michael Hanselmann
        # waitlock to running
881 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
882 ebb2a2a3 Michael Hanselmann
        # Opcode result
883 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
884 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
885 be760ba8 Michael Hanselmann
886 be760ba8 Michael Hanselmann
        if idx in (failfrom, len(ops) - 1):
887 be760ba8 Michael Hanselmann
          # Last opcode
888 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
889 be760ba8 Michael Hanselmann
          break
890 be760ba8 Michael Hanselmann
891 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
892 be760ba8 Michael Hanselmann
893 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
894 be760ba8 Michael Hanselmann
895 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
896 ebb2a2a3 Michael Hanselmann
897 be760ba8 Michael Hanselmann
      # Check job status
898 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
899 76b62028 Iustin Pop
      self.assertEqual(job.GetInfo(["id"]), [job_id])
900 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
901 be760ba8 Michael Hanselmann
902 be760ba8 Michael Hanselmann
      # Check opcode status
903 be760ba8 Michael Hanselmann
      data = zip(job.ops,
904 be760ba8 Michael Hanselmann
                 job.GetInfo(["opstatus"])[0],
905 be760ba8 Michael Hanselmann
                 job.GetInfo(["opresult"])[0])
906 be760ba8 Michael Hanselmann
907 be760ba8 Michael Hanselmann
      for idx, (op, opstatus, opresult) in enumerate(data):
908 be760ba8 Michael Hanselmann
        if idx < failfrom:
909 be760ba8 Michael Hanselmann
          assert not op.input.fail
910 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
911 be760ba8 Michael Hanselmann
          self.assertEqual(opresult, op.input.result)
912 be760ba8 Michael Hanselmann
        elif idx <= failto:
913 be760ba8 Michael Hanselmann
          assert op.input.fail
914 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
915 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
916 be760ba8 Michael Hanselmann
        else:
917 be760ba8 Michael Hanselmann
          assert not op.input.fail
918 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
919 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
920 be760ba8 Michael Hanselmann
921 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
922 be760ba8 Michael Hanselmann
                              for op in job.ops[:failfrom]))
923 be760ba8 Michael Hanselmann
924 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
925 be760ba8 Michael Hanselmann
926 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
927 75d81fc8 Michael Hanselmann
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
928 75d81fc8 Michael Hanselmann
                       jqueue._JobProcessor.FINISHED)
929 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
930 be760ba8 Michael Hanselmann
931 be760ba8 Michael Hanselmann
  def testCancelWhileInQueue(self):
932 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
933 be760ba8 Michael Hanselmann
934 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
935 be760ba8 Michael Hanselmann
           for i in range(5)]
936 be760ba8 Michael Hanselmann
937 be760ba8 Michael Hanselmann
    # Create job
938 be760ba8 Michael Hanselmann
    job_id = 17045
939 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
940 be760ba8 Michael Hanselmann
941 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
942 be760ba8 Michael Hanselmann
943 be760ba8 Michael Hanselmann
    # Mark as cancelled
944 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
945 be760ba8 Michael Hanselmann
    self.assert_(success)
946 be760ba8 Michael Hanselmann
947 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
948 ebb2a2a3 Michael Hanselmann
949 66bd7445 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
950 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
951 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
952 be760ba8 Michael Hanselmann
                            for op in job.ops))
953 be760ba8 Michael Hanselmann
954 66bd7445 Michael Hanselmann
    # Serialize to check for differences
955 66bd7445 Michael Hanselmann
    before_proc = job.Serialize()
956 66bd7445 Michael Hanselmann
957 66bd7445 Michael Hanselmann
    # Simulate processor called in workerpool
958 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
959 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
960 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
961 30c945d0 Michael Hanselmann
962 30c945d0 Michael Hanselmann
    # Check result
963 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
964 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
965 30c945d0 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
966 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
967 30c945d0 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
968 30c945d0 Michael Hanselmann
                                for op in job.ops))
969 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
970 30c945d0 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
971 30c945d0 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
972 30c945d0 Michael Hanselmann
973 66bd7445 Michael Hanselmann
    # Must not have changed or written
974 66bd7445 Michael Hanselmann
    self.assertEqual(before_proc, job.Serialize())
975 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
976 66bd7445 Michael Hanselmann
977 30c945d0 Michael Hanselmann
  def testCancelWhileWaitlockInQueue(self):
978 30c945d0 Michael Hanselmann
    queue = _FakeQueueForProc()
979 30c945d0 Michael Hanselmann
980 30c945d0 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
981 30c945d0 Michael Hanselmann
           for i in range(5)]
982 30c945d0 Michael Hanselmann
983 30c945d0 Michael Hanselmann
    # Create job
984 30c945d0 Michael Hanselmann
    job_id = 8645
985 30c945d0 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
986 30c945d0 Michael Hanselmann
987 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
988 30c945d0 Michael Hanselmann
989 47099cd1 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_WAITING
990 30c945d0 Michael Hanselmann
991 30c945d0 Michael Hanselmann
    assert len(job.ops) == 5
992 30c945d0 Michael Hanselmann
993 47099cd1 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
994 30c945d0 Michael Hanselmann
995 30c945d0 Michael Hanselmann
    # Mark as cancelling
996 30c945d0 Michael Hanselmann
    (success, _) = job.Cancel()
997 30c945d0 Michael Hanselmann
    self.assert_(success)
998 30c945d0 Michael Hanselmann
999 30c945d0 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1000 30c945d0 Michael Hanselmann
1001 30c945d0 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1002 30c945d0 Michael Hanselmann
                            for op in job.ops))
1003 30c945d0 Michael Hanselmann
1004 30c945d0 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1005 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1006 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1007 be760ba8 Michael Hanselmann
1008 be760ba8 Michael Hanselmann
    # Check result
1009 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1010 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1011 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
1012 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
1013 be760ba8 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1014 be760ba8 Michael Hanselmann
                                for op in job.ops))
1015 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1016 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1017 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1018 be760ba8 Michael Hanselmann
1019 be760ba8 Michael Hanselmann
  def testCancelWhileWaitlock(self):
1020 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1021 be760ba8 Michael Hanselmann
1022 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1023 be760ba8 Michael Hanselmann
           for i in range(5)]
1024 be760ba8 Michael Hanselmann
1025 be760ba8 Michael Hanselmann
    # Create job
1026 be760ba8 Michael Hanselmann
    job_id = 11009
1027 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1028 be760ba8 Michael Hanselmann
1029 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1030 be760ba8 Michael Hanselmann
1031 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1032 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1033 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1034 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1035 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1036 be760ba8 Michael Hanselmann
1037 be760ba8 Michael Hanselmann
      # Mark as cancelled
1038 be760ba8 Michael Hanselmann
      (success, _) = job.Cancel()
1039 be760ba8 Michael Hanselmann
      self.assert_(success)
1040 be760ba8 Michael Hanselmann
1041 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1042 be760ba8 Michael Hanselmann
                              for op in job.ops))
1043 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1044 be760ba8 Michael Hanselmann
1045 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
1046 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1047 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1048 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1049 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1050 be760ba8 Michael Hanselmann
1051 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1052 be760ba8 Michael Hanselmann
1053 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1054 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1055 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1056 ebb2a2a3 Michael Hanselmann
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1057 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1058 be760ba8 Michael Hanselmann
1059 be760ba8 Michael Hanselmann
    # Check result
1060 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1061 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1062 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
1063 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
1064 be760ba8 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1065 be760ba8 Michael Hanselmann
                                for op in job.ops))
1066 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1067 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1068 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1069 be760ba8 Michael Hanselmann
1070 942e2262 Michael Hanselmann
  def _TestCancelWhileSomething(self, cb):
1071 9e49dfc5 Michael Hanselmann
    queue = _FakeQueueForProc()
1072 9e49dfc5 Michael Hanselmann
1073 9e49dfc5 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1074 9e49dfc5 Michael Hanselmann
           for i in range(5)]
1075 9e49dfc5 Michael Hanselmann
1076 9e49dfc5 Michael Hanselmann
    # Create job
1077 9e49dfc5 Michael Hanselmann
    job_id = 24314
1078 9e49dfc5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1079 9e49dfc5 Michael Hanselmann
1080 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1081 9e49dfc5 Michael Hanselmann
1082 9e49dfc5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1083 9e49dfc5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1084 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1085 9e49dfc5 Michael Hanselmann
1086 9e49dfc5 Michael Hanselmann
      # Mark as cancelled
1087 9e49dfc5 Michael Hanselmann
      (success, _) = job.Cancel()
1088 9e49dfc5 Michael Hanselmann
      self.assert_(success)
1089 9e49dfc5 Michael Hanselmann
1090 9e49dfc5 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1091 9e49dfc5 Michael Hanselmann
                              for op in job.ops))
1092 9e49dfc5 Michael Hanselmann
1093 942e2262 Michael Hanselmann
      cb(queue)
1094 9e49dfc5 Michael Hanselmann
1095 9e49dfc5 Michael Hanselmann
    def _AfterStart(op, cbs):
1096 9e49dfc5 Michael Hanselmann
      self.fail("Should not reach this")
1097 9e49dfc5 Michael Hanselmann
1098 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1099 9e49dfc5 Michael Hanselmann
1100 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1101 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1102 9e49dfc5 Michael Hanselmann
1103 9e49dfc5 Michael Hanselmann
    # Check result
1104 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1105 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1106 9e49dfc5 Michael Hanselmann
    self.assert_(job.start_timestamp)
1107 9e49dfc5 Michael Hanselmann
    self.assert_(job.end_timestamp)
1108 9e49dfc5 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1109 9e49dfc5 Michael Hanselmann
                                for op in job.ops))
1110 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1111 9e49dfc5 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1112 9e49dfc5 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1113 9e49dfc5 Michael Hanselmann
1114 942e2262 Michael Hanselmann
    return queue
1115 942e2262 Michael Hanselmann
1116 942e2262 Michael Hanselmann
  def testCancelWhileWaitlockWithTimeout(self):
1117 942e2262 Michael Hanselmann
    def fn(_):
1118 942e2262 Michael Hanselmann
      # Fake an acquire attempt timing out
1119 942e2262 Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
1120 942e2262 Michael Hanselmann
1121 942e2262 Michael Hanselmann
    self._TestCancelWhileSomething(fn)
1122 942e2262 Michael Hanselmann
1123 942e2262 Michael Hanselmann
  def testCancelDuringQueueShutdown(self):
1124 942e2262 Michael Hanselmann
    queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs())
1125 942e2262 Michael Hanselmann
    self.assertFalse(queue.AcceptingJobsUnlocked())
1126 942e2262 Michael Hanselmann
1127 be760ba8 Michael Hanselmann
  def testCancelWhileRunning(self):
1128 be760ba8 Michael Hanselmann
    # Tests canceling a job with finished opcodes and more, unprocessed ones
1129 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1130 be760ba8 Michael Hanselmann
1131 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1132 be760ba8 Michael Hanselmann
           for i in range(3)]
1133 be760ba8 Michael Hanselmann
1134 be760ba8 Michael Hanselmann
    # Create job
1135 76b62028 Iustin Pop
    job_id = 28492
1136 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1137 be760ba8 Michael Hanselmann
1138 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1139 be760ba8 Michael Hanselmann
1140 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1141 be760ba8 Michael Hanselmann
1142 be760ba8 Michael Hanselmann
    # Run one opcode
1143 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1144 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1145 be760ba8 Michael Hanselmann
1146 be760ba8 Michael Hanselmann
    # Job goes back to queued
1147 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1148 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1149 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1150 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
1151 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1152 be760ba8 Michael Hanselmann
                      ["Res0", None, None]])
1153 be760ba8 Michael Hanselmann
1154 be760ba8 Michael Hanselmann
    # Mark as cancelled
1155 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
1156 be760ba8 Michael Hanselmann
    self.assert_(success)
1157 be760ba8 Michael Hanselmann
1158 be760ba8 Michael Hanselmann
    # Try processing another opcode (this will actually cancel the job)
1159 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1160 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1161 be760ba8 Michael Hanselmann
1162 be760ba8 Michael Hanselmann
    # Check result
1163 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1164 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
1165 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1166 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1167 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1168 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
1169 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
1170 be760ba8 Michael Hanselmann
                      ["Res0", "Job canceled by request",
1171 be760ba8 Michael Hanselmann
                       "Job canceled by request"]])
1172 be760ba8 Michael Hanselmann
1173 942e2262 Michael Hanselmann
  def _TestQueueShutdown(self, queue, opexec, job, runcount):
1174 942e2262 Michael Hanselmann
    self.assertTrue(queue.AcceptingJobsUnlocked())
1175 942e2262 Michael Hanselmann
1176 942e2262 Michael Hanselmann
    # Simulate shutdown
1177 942e2262 Michael Hanselmann
    queue.StopAcceptingJobs()
1178 942e2262 Michael Hanselmann
1179 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1180 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1181 942e2262 Michael Hanselmann
1182 942e2262 Michael Hanselmann
    # Check result
1183 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1184 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
1185 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1186 942e2262 Michael Hanselmann
    self.assertTrue(job.start_timestamp)
1187 942e2262 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
1188 942e2262 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1189 942e2262 Michael Hanselmann
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1190 942e2262 Michael Hanselmann
                               for op in job.ops[:runcount]))
1191 942e2262 Michael Hanselmann
    self.assertFalse(job.ops[runcount].end_timestamp)
1192 942e2262 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1193 942e2262 Michael Hanselmann
                                for op in job.ops[(runcount + 1):]))
1194 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1195 942e2262 Michael Hanselmann
                     [(([constants.OP_STATUS_SUCCESS] * runcount) +
1196 942e2262 Michael Hanselmann
                       ([constants.OP_STATUS_QUEUED] *
1197 942e2262 Michael Hanselmann
                        (len(job.ops) - runcount))),
1198 942e2262 Michael Hanselmann
                      (["Res%s" % i for i in range(runcount)] +
1199 942e2262 Michael Hanselmann
                       ([None] * (len(job.ops) - runcount)))])
1200 942e2262 Michael Hanselmann
1201 942e2262 Michael Hanselmann
    # Must have been written and replicated
1202 942e2262 Michael Hanselmann
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1203 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1204 942e2262 Michael Hanselmann
1205 942e2262 Michael Hanselmann
  def testQueueShutdownWhileRunning(self):
1206 942e2262 Michael Hanselmann
    # Tests shutting down the queue while a job is running
1207 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1208 942e2262 Michael Hanselmann
1209 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1210 942e2262 Michael Hanselmann
           for i in range(3)]
1211 942e2262 Michael Hanselmann
1212 942e2262 Michael Hanselmann
    # Create job
1213 942e2262 Michael Hanselmann
    job_id = 2718211587
1214 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1215 942e2262 Michael Hanselmann
1216 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1217 942e2262 Michael Hanselmann
1218 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1219 942e2262 Michael Hanselmann
1220 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1221 942e2262 Michael Hanselmann
1222 942e2262 Michael Hanselmann
    # Run one opcode
1223 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1224 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1225 942e2262 Michael Hanselmann
1226 942e2262 Michael Hanselmann
    # Job goes back to queued
1227 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1228 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1229 942e2262 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1230 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
1231 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1232 942e2262 Michael Hanselmann
                      ["Res0", None, None]])
1233 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1234 942e2262 Michael Hanselmann
1235 942e2262 Michael Hanselmann
    # Writes for waiting, running and result
1236 942e2262 Michael Hanselmann
    for _ in range(3):
1237 942e2262 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1238 942e2262 Michael Hanselmann
1239 942e2262 Michael Hanselmann
    # Run second opcode
1240 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1241 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1242 942e2262 Michael Hanselmann
1243 942e2262 Michael Hanselmann
    # Job goes back to queued
1244 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1245 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1246 942e2262 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1247 942e2262 Michael Hanselmann
                       constants.OP_STATUS_SUCCESS,
1248 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1249 942e2262 Michael Hanselmann
                      ["Res0", "Res1", None]])
1250 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1251 942e2262 Michael Hanselmann
1252 942e2262 Michael Hanselmann
    # Writes for waiting, running and result
1253 942e2262 Michael Hanselmann
    for _ in range(3):
1254 942e2262 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1255 942e2262 Michael Hanselmann
1256 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 2)
1257 942e2262 Michael Hanselmann
1258 942e2262 Michael Hanselmann
  def testQueueShutdownWithLockTimeout(self):
1259 942e2262 Michael Hanselmann
    # Tests shutting down while a lock acquire times out
1260 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1261 942e2262 Michael Hanselmann
1262 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1263 942e2262 Michael Hanselmann
           for i in range(3)]
1264 942e2262 Michael Hanselmann
1265 942e2262 Michael Hanselmann
    # Create job
1266 942e2262 Michael Hanselmann
    job_id = 1304231178
1267 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1268 942e2262 Michael Hanselmann
1269 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1270 942e2262 Michael Hanselmann
1271 942e2262 Michael Hanselmann
    acquire_timeout = False
1272 942e2262 Michael Hanselmann
1273 942e2262 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1274 942e2262 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1275 942e2262 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1276 942e2262 Michael Hanselmann
      if acquire_timeout:
1277 942e2262 Michael Hanselmann
        raise mcpu.LockAcquireTimeout()
1278 942e2262 Michael Hanselmann
1279 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None)
1280 942e2262 Michael Hanselmann
1281 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1282 942e2262 Michael Hanselmann
1283 942e2262 Michael Hanselmann
    # Run one opcode
1284 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1285 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1286 942e2262 Michael Hanselmann
1287 942e2262 Michael Hanselmann
    # Job goes back to queued
1288 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1289 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1290 942e2262 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1291 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
1292 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1293 942e2262 Michael Hanselmann
                      ["Res0", None, None]])
1294 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1295 942e2262 Michael Hanselmann
1296 942e2262 Michael Hanselmann
    # Writes for waiting, running and result
1297 942e2262 Michael Hanselmann
    for _ in range(3):
1298 942e2262 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1299 942e2262 Michael Hanselmann
1300 942e2262 Michael Hanselmann
    # The next opcode should have expiring lock acquires
1301 942e2262 Michael Hanselmann
    acquire_timeout = True
1302 942e2262 Michael Hanselmann
1303 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 1)
1304 942e2262 Michael Hanselmann
1305 942e2262 Michael Hanselmann
  def testQueueShutdownWhileInQueue(self):
1306 942e2262 Michael Hanselmann
    # This should never happen in reality (no new jobs are started by the
1307 942e2262 Michael Hanselmann
    # workerpool once a shutdown has been initiated), but it's better to test
1308 942e2262 Michael Hanselmann
    # the job processor for this scenario
1309 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1310 942e2262 Michael Hanselmann
1311 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1312 942e2262 Michael Hanselmann
           for i in range(5)]
1313 942e2262 Michael Hanselmann
1314 942e2262 Michael Hanselmann
    # Create job
1315 942e2262 Michael Hanselmann
    job_id = 2031
1316 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1317 942e2262 Michael Hanselmann
1318 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1319 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1320 942e2262 Michael Hanselmann
1321 942e2262 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
1322 942e2262 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
1323 942e2262 Michael Hanselmann
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
1324 942e2262 Michael Hanselmann
                               for op in job.ops))
1325 942e2262 Michael Hanselmann
1326 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1327 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 0)
1328 942e2262 Michael Hanselmann
1329 942e2262 Michael Hanselmann
  def testQueueShutdownWhileWaitlockInQueue(self):
1330 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1331 942e2262 Michael Hanselmann
1332 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1333 942e2262 Michael Hanselmann
           for i in range(5)]
1334 942e2262 Michael Hanselmann
1335 942e2262 Michael Hanselmann
    # Create job
1336 942e2262 Michael Hanselmann
    job_id = 53125685
1337 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1338 942e2262 Michael Hanselmann
1339 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1340 942e2262 Michael Hanselmann
1341 942e2262 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_WAITING
1342 942e2262 Michael Hanselmann
1343 942e2262 Michael Hanselmann
    assert len(job.ops) == 5
1344 942e2262 Michael Hanselmann
1345 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1346 942e2262 Michael Hanselmann
1347 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1348 942e2262 Michael Hanselmann
1349 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1350 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 0)
1351 942e2262 Michael Hanselmann
1352 be760ba8 Michael Hanselmann
  def testPartiallyRun(self):
1353 be760ba8 Michael Hanselmann
    # Tests calling the processor on a job that's been partially run before the
1354 be760ba8 Michael Hanselmann
    # program was restarted
1355 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1356 be760ba8 Michael Hanselmann
1357 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1358 be760ba8 Michael Hanselmann
1359 be760ba8 Michael Hanselmann
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
1360 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1361 be760ba8 Michael Hanselmann
             for i in range(10)]
1362 be760ba8 Michael Hanselmann
1363 be760ba8 Michael Hanselmann
      # Create job
1364 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
1365 be760ba8 Michael Hanselmann
1366 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1367 be760ba8 Michael Hanselmann
1368 be760ba8 Michael Hanselmann
      for _ in range(successcount):
1369 75d81fc8 Michael Hanselmann
        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1370 75d81fc8 Michael Hanselmann
                         jqueue._JobProcessor.DEFER)
1371 be760ba8 Michael Hanselmann
1372 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1373 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
1374 be760ba8 Michael Hanselmann
                       [[constants.OP_STATUS_SUCCESS
1375 be760ba8 Michael Hanselmann
                         for _ in range(successcount)] +
1376 be760ba8 Michael Hanselmann
                        [constants.OP_STATUS_QUEUED
1377 be760ba8 Michael Hanselmann
                         for _ in range(len(ops) - successcount)]])
1378 be760ba8 Michael Hanselmann
1379 03b63608 Michael Hanselmann
      self.assert_(job.ops_iter)
1380 be760ba8 Michael Hanselmann
1381 be760ba8 Michael Hanselmann
      # Serialize and restore (simulates program restart)
1382 8a3cd185 Michael Hanselmann
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1383 03b63608 Michael Hanselmann
      self.assertFalse(newjob.ops_iter)
1384 be760ba8 Michael Hanselmann
      self._TestPartial(newjob, successcount)
1385 be760ba8 Michael Hanselmann
1386 be760ba8 Michael Hanselmann
  def _TestPartial(self, job, successcount):
1387 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1388 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1389 be760ba8 Michael Hanselmann
1390 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1391 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1392 be760ba8 Michael Hanselmann
1393 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops) - successcount)):
1394 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1395 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1396 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1397 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1398 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1399 be760ba8 Michael Hanselmann
1400 be760ba8 Michael Hanselmann
      if remaining == 0:
1401 be760ba8 Michael Hanselmann
        # Last opcode
1402 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1403 be760ba8 Michael Hanselmann
        break
1404 be760ba8 Michael Hanselmann
1405 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1406 be760ba8 Michael Hanselmann
1407 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1408 be760ba8 Michael Hanselmann
1409 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1410 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1411 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1412 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1413 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1414 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1415 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1416 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1417 be760ba8 Michael Hanselmann
                            for op in job.ops))
1418 be760ba8 Michael Hanselmann
1419 be760ba8 Michael Hanselmann
    self._GenericCheckJob(job)
1420 be760ba8 Michael Hanselmann
1421 66bd7445 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1422 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1423 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1424 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1425 be760ba8 Michael Hanselmann
1426 be760ba8 Michael Hanselmann
    # ... also after being restored
1427 8a3cd185 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1428 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1429 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1430 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1431 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1432 be760ba8 Michael Hanselmann
1433 be760ba8 Michael Hanselmann
  def testProcessorOnRunningJob(self):
1434 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1435 be760ba8 Michael Hanselmann
1436 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1437 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1438 be760ba8 Michael Hanselmann
1439 be760ba8 Michael Hanselmann
    # Create job
1440 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 9571, ops)
1441 be760ba8 Michael Hanselmann
1442 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1443 be760ba8 Michael Hanselmann
1444 be760ba8 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
1445 be760ba8 Michael Hanselmann
1446 be760ba8 Michael Hanselmann
    assert len(job.ops) == 1
1447 be760ba8 Michael Hanselmann
1448 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1449 be760ba8 Michael Hanselmann
1450 be760ba8 Michael Hanselmann
    # Calling on running job must fail
1451 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
1452 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job))
1453 be760ba8 Michael Hanselmann
1454 be760ba8 Michael Hanselmann
  def testLogMessages(self):
1455 be760ba8 Michael Hanselmann
    # Tests the "Feedback" callback function
1456 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1457 be760ba8 Michael Hanselmann
1458 be760ba8 Michael Hanselmann
    messages = {
1459 be760ba8 Michael Hanselmann
      1: [
1460 be760ba8 Michael Hanselmann
        (None, "Hello"),
1461 be760ba8 Michael Hanselmann
        (None, "World"),
1462 be760ba8 Michael Hanselmann
        (constants.ELOG_MESSAGE, "there"),
1463 be760ba8 Michael Hanselmann
        ],
1464 be760ba8 Michael Hanselmann
      4: [
1465 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1466 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1467 be760ba8 Michael Hanselmann
        ],
1468 be760ba8 Michael Hanselmann
      }
1469 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1470 be760ba8 Michael Hanselmann
                               messages=messages.get(i, []))
1471 be760ba8 Michael Hanselmann
           for i in range(5)]
1472 be760ba8 Michael Hanselmann
1473 be760ba8 Michael Hanselmann
    # Create job
1474 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 29386, ops)
1475 be760ba8 Michael Hanselmann
1476 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1477 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1478 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1479 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1480 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1481 be760ba8 Michael Hanselmann
1482 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
1483 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1484 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1485 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1486 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1487 be760ba8 Michael Hanselmann
1488 be760ba8 Michael Hanselmann
      self.assertRaises(AssertionError, cbs.Feedback,
1489 be760ba8 Michael Hanselmann
                        "too", "many", "arguments")
1490 be760ba8 Michael Hanselmann
1491 be760ba8 Michael Hanselmann
      for (log_type, msg) in op.messages:
1492 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1493 be760ba8 Michael Hanselmann
        if log_type:
1494 be760ba8 Michael Hanselmann
          cbs.Feedback(log_type, msg)
1495 be760ba8 Michael Hanselmann
        else:
1496 be760ba8 Michael Hanselmann
          cbs.Feedback(msg)
1497 ebb2a2a3 Michael Hanselmann
        # Check for job update without replication
1498 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1499 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1500 be760ba8 Michael Hanselmann
1501 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1502 be760ba8 Michael Hanselmann
1503 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops))):
1504 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1505 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1506 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1507 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1508 be760ba8 Michael Hanselmann
1509 be760ba8 Michael Hanselmann
      if remaining == 0:
1510 be760ba8 Michael Hanselmann
        # Last opcode
1511 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1512 be760ba8 Michael Hanselmann
        break
1513 be760ba8 Michael Hanselmann
1514 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1515 be760ba8 Michael Hanselmann
1516 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1517 be760ba8 Michael Hanselmann
1518 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1519 ebb2a2a3 Michael Hanselmann
1520 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1521 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1522 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1523 be760ba8 Michael Hanselmann
1524 be760ba8 Michael Hanselmann
    logmsgcount = sum(len(m) for m in messages.values())
1525 be760ba8 Michael Hanselmann
1526 be760ba8 Michael Hanselmann
    self._CheckLogMessages(job, logmsgcount)
1527 be760ba8 Michael Hanselmann
1528 be760ba8 Michael Hanselmann
    # Serialize and restore (simulates program restart)
1529 8a3cd185 Michael Hanselmann
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1530 be760ba8 Michael Hanselmann
    self._CheckLogMessages(newjob, logmsgcount)
1531 be760ba8 Michael Hanselmann
1532 be760ba8 Michael Hanselmann
    # Check each message
1533 be760ba8 Michael Hanselmann
    prevserial = -1
1534 be760ba8 Michael Hanselmann
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1535 be760ba8 Michael Hanselmann
      for (serial, timestamp, log_type, msg) in oplog:
1536 be760ba8 Michael Hanselmann
        (exptype, expmsg) = messages.get(idx).pop(0)
1537 be760ba8 Michael Hanselmann
        if exptype:
1538 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, exptype)
1539 be760ba8 Michael Hanselmann
        else:
1540 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1541 be760ba8 Michael Hanselmann
        self.assertEqual(expmsg, msg)
1542 be760ba8 Michael Hanselmann
        self.assert_(serial > prevserial)
1543 be760ba8 Michael Hanselmann
        prevserial = serial
1544 be760ba8 Michael Hanselmann
1545 be760ba8 Michael Hanselmann
  def _CheckLogMessages(self, job, count):
1546 be760ba8 Michael Hanselmann
    # Check serial
1547 be760ba8 Michael Hanselmann
    self.assertEqual(job.log_serial, count)
1548 be760ba8 Michael Hanselmann
1549 be760ba8 Michael Hanselmann
    # No filter
1550 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(None),
1551 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1552 be760ba8 Michael Hanselmann
                      for entry in entries])
1553 be760ba8 Michael Hanselmann
1554 be760ba8 Michael Hanselmann
    # Filter with serial
1555 be760ba8 Michael Hanselmann
    assert count > 3
1556 be760ba8 Michael Hanselmann
    self.assert_(job.GetLogEntries(3))
1557 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(3),
1558 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1559 be760ba8 Michael Hanselmann
                      for entry in entries][3:])
1560 be760ba8 Michael Hanselmann
1561 be760ba8 Michael Hanselmann
    # No log message after highest serial
1562 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count))
1563 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count + 3))
1564 be760ba8 Michael Hanselmann
1565 6a373640 Michael Hanselmann
  def testSubmitManyJobs(self):
1566 6a373640 Michael Hanselmann
    queue = _FakeQueueForProc()
1567 6a373640 Michael Hanselmann
1568 6a373640 Michael Hanselmann
    job_id = 15656
1569 6a373640 Michael Hanselmann
    ops = [
1570 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1571 6a373640 Michael Hanselmann
                          submit_jobs=[]),
1572 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1573 6a373640 Michael Hanselmann
                          submit_jobs=[
1574 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1575 6a373640 Michael Hanselmann
                            ]),
1576 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False,
1577 6a373640 Michael Hanselmann
                          submit_jobs=[
1578 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1579 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1580 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1581 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1582 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1583 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1584 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1585 6a373640 Michael Hanselmann
                            ]),
1586 6a373640 Michael Hanselmann
      ]
1587 6a373640 Michael Hanselmann
1588 6a373640 Michael Hanselmann
    # Create job
1589 6a373640 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1590 6a373640 Michael Hanselmann
1591 6a373640 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1592 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1593 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1594 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1595 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1596 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1597 6a373640 Michael Hanselmann
1598 6a373640 Michael Hanselmann
    def _AfterStart(op, cbs):
1599 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1600 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1601 6a373640 Michael Hanselmann
1602 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1603 6a373640 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1604 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1605 6a373640 Michael Hanselmann
1606 6a373640 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1607 6a373640 Michael Hanselmann
      (success, _) = job.Cancel()
1608 6a373640 Michael Hanselmann
      self.assertFalse(success)
1609 6a373640 Michael Hanselmann
1610 6a373640 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1611 6a373640 Michael Hanselmann
1612 6a373640 Michael Hanselmann
    for idx in range(len(ops)):
1613 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1614 6a373640 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1615 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1616 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1617 6a373640 Michael Hanselmann
      if idx == len(ops) - 1:
1618 6a373640 Michael Hanselmann
        # Last opcode
1619 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1620 6a373640 Michael Hanselmann
      else:
1621 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
1622 6a373640 Michael Hanselmann
1623 6a373640 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1624 6a373640 Michael Hanselmann
        self.assert_(job.start_timestamp)
1625 6a373640 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1626 6a373640 Michael Hanselmann
1627 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1628 6a373640 Michael Hanselmann
1629 6a373640 Michael Hanselmann
    for idx, submitted_ops in enumerate(job_ops
1630 6a373640 Michael Hanselmann
                                        for op in ops
1631 6a373640 Michael Hanselmann
                                        for job_ops in op.submit_jobs):
1632 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextSubmittedJob(),
1633 6a373640 Michael Hanselmann
                       (1000 + idx, submitted_ops))
1634 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1635 6a373640 Michael Hanselmann
1636 6a373640 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1637 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1638 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1639 6a373640 Michael Hanselmann
                     [[[], [1000], [1001, 1002, 1003]]])
1640 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1641 6a373640 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1642 6a373640 Michael Hanselmann
1643 6a373640 Michael Hanselmann
    self._GenericCheckJob(job)
1644 6a373640 Michael Hanselmann
1645 1e6d5750 Iustin Pop
    # Calling the processor on a finished job should be a no-op
1646 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1647 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1648 1e6d5750 Iustin Pop
    self.assertRaises(IndexError, queue.GetNextUpdate)
1649 6a373640 Michael Hanselmann
1650 b95479a5 Michael Hanselmann
  def testJobDependency(self):
1651 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1652 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1653 b95479a5 Michael Hanselmann
1654 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1655 b95479a5 Michael Hanselmann
1656 b95479a5 Michael Hanselmann
    prev_job_id = 22113
1657 b95479a5 Michael Hanselmann
    prev_job_id2 = 28102
1658 b95479a5 Michael Hanselmann
    job_id = 29929
1659 b95479a5 Michael Hanselmann
    ops = [
1660 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1661 b95479a5 Michael Hanselmann
                          depends=[
1662 b95479a5 Michael Hanselmann
                            [prev_job_id2, None],
1663 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1664 b95479a5 Michael Hanselmann
                            ]),
1665 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False),
1666 b95479a5 Michael Hanselmann
      ]
1667 b95479a5 Michael Hanselmann
1668 b95479a5 Michael Hanselmann
    # Create job
1669 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1670 b95479a5 Michael Hanselmann
1671 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1672 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1673 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1674 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1675 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1676 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1677 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1678 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1679 b95479a5 Michael Hanselmann
1680 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1681 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1682 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1683 b95479a5 Michael Hanselmann
1684 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1685 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1686 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1687 b95479a5 Michael Hanselmann
1688 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1689 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1690 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1691 b95479a5 Michael Hanselmann
1692 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1693 b95479a5 Michael Hanselmann
1694 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1695 b95479a5 Michael Hanselmann
1696 b95479a5 Michael Hanselmann
    counter = itertools.count()
1697 b95479a5 Michael Hanselmann
    while True:
1698 b95479a5 Michael Hanselmann
      attempt = counter.next()
1699 b95479a5 Michael Hanselmann
1700 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1701 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1702 b95479a5 Michael Hanselmann
1703 b95479a5 Michael Hanselmann
      if attempt < 2:
1704 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1705 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1706 b95479a5 Michael Hanselmann
      elif attempt == 2:
1707 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1708 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1709 b95479a5 Michael Hanselmann
        # The processor will ask for the next dependency immediately
1710 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1711 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1712 b95479a5 Michael Hanselmann
      elif attempt < 5:
1713 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1714 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1715 b95479a5 Michael Hanselmann
      elif attempt == 5:
1716 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1717 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1718 b95479a5 Michael Hanselmann
      if attempt == 2:
1719 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 2)
1720 b95479a5 Michael Hanselmann
      elif attempt > 5:
1721 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1722 b95479a5 Michael Hanselmann
      else:
1723 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1724 b95479a5 Michael Hanselmann
1725 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1726 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt >= 5:
1727 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1728 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1729 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1730 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1731 b95479a5 Michael Hanselmann
1732 b95479a5 Michael Hanselmann
      if attempt < 5:
1733 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1734 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1735 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1736 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1737 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1738 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1739 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1740 b95479a5 Michael Hanselmann
        continue
1741 b95479a5 Michael Hanselmann
1742 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1743 b95479a5 Michael Hanselmann
        # Last opcode
1744 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1745 b95479a5 Michael Hanselmann
        break
1746 b95479a5 Michael Hanselmann
1747 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1748 b95479a5 Michael Hanselmann
1749 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1750 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1751 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1752 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1753 b95479a5 Michael Hanselmann
1754 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1755 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1756 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1757 b95479a5 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1758 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1759 b95479a5 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1760 b95479a5 Michael Hanselmann
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1761 b95479a5 Michael Hanselmann
                               for op in job.ops))
1762 b95479a5 Michael Hanselmann
1763 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1764 b95479a5 Michael Hanselmann
1765 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1766 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1767 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1768 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountWaitingJobs())
1769 b95479a5 Michael Hanselmann
1770 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1771 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1772 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1773 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1774 b95479a5 Michael Hanselmann
1775 b95479a5 Michael Hanselmann
  def testJobDependencyCancel(self):
1776 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1777 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1778 b95479a5 Michael Hanselmann
1779 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1780 b95479a5 Michael Hanselmann
1781 b95479a5 Michael Hanselmann
    prev_job_id = 13623
1782 b95479a5 Michael Hanselmann
    job_id = 30876
1783 b95479a5 Michael Hanselmann
    ops = [
1784 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1785 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1786 b95479a5 Michael Hanselmann
                          depends=[
1787 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1788 b95479a5 Michael Hanselmann
                            ]),
1789 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1790 b95479a5 Michael Hanselmann
      ]
1791 b95479a5 Michael Hanselmann
1792 b95479a5 Michael Hanselmann
    # Create job
1793 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1794 b95479a5 Michael Hanselmann
1795 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1796 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1797 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1798 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1799 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1800 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1801 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1802 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1803 b95479a5 Michael Hanselmann
1804 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1805 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1806 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1807 b95479a5 Michael Hanselmann
1808 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1809 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1810 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1811 b95479a5 Michael Hanselmann
1812 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1813 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1814 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1815 b95479a5 Michael Hanselmann
1816 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1817 b95479a5 Michael Hanselmann
1818 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1819 b95479a5 Michael Hanselmann
1820 b95479a5 Michael Hanselmann
    counter = itertools.count()
1821 b95479a5 Michael Hanselmann
    while True:
1822 b95479a5 Michael Hanselmann
      attempt = counter.next()
1823 b95479a5 Michael Hanselmann
1824 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1825 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1826 b95479a5 Michael Hanselmann
1827 b95479a5 Michael Hanselmann
      if attempt == 0:
1828 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1829 b95479a5 Michael Hanselmann
        pass
1830 b95479a5 Michael Hanselmann
      elif attempt < 4:
1831 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1832 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1833 b95479a5 Michael Hanselmann
      elif attempt == 4:
1834 b95479a5 Michael Hanselmann
        # Other job was cancelled
1835 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1836 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1837 b95479a5 Michael Hanselmann
1838 b95479a5 Michael Hanselmann
      if attempt == 0:
1839 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1840 b95479a5 Michael Hanselmann
      else:
1841 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1842 b95479a5 Michael Hanselmann
1843 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1844 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
1845 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1846 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1847 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1848 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1849 b95479a5 Michael Hanselmann
1850 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
1851 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1852 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1853 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1854 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1855 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1856 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1857 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1858 b95479a5 Michael Hanselmann
        continue
1859 b95479a5 Michael Hanselmann
1860 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1861 b95479a5 Michael Hanselmann
        # Last opcode
1862 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1863 b95479a5 Michael Hanselmann
        break
1864 b95479a5 Michael Hanselmann
1865 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1866 b95479a5 Michael Hanselmann
1867 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1868 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1869 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1870 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1871 b95479a5 Michael Hanselmann
1872 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1873 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1874 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1875 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1876 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
1877 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
1878 b95479a5 Michael Hanselmann
                      ["Res0", "Job canceled by request",
1879 b95479a5 Michael Hanselmann
                       "Job canceled by request"]])
1880 b95479a5 Michael Hanselmann
1881 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1882 b95479a5 Michael Hanselmann
1883 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1884 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1885 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1886 b95479a5 Michael Hanselmann
1887 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1888 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1889 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1890 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1891 b95479a5 Michael Hanselmann
1892 b95479a5 Michael Hanselmann
  def testJobDependencyWrongstatus(self):
1893 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1894 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1895 b95479a5 Michael Hanselmann
1896 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1897 b95479a5 Michael Hanselmann
1898 b95479a5 Michael Hanselmann
    prev_job_id = 9741
1899 b95479a5 Michael Hanselmann
    job_id = 11763
1900 b95479a5 Michael Hanselmann
    ops = [
1901 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1902 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1903 b95479a5 Michael Hanselmann
                          depends=[
1904 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1905 b95479a5 Michael Hanselmann
                            ]),
1906 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1907 b95479a5 Michael Hanselmann
      ]
1908 b95479a5 Michael Hanselmann
1909 b95479a5 Michael Hanselmann
    # Create job
1910 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1911 b95479a5 Michael Hanselmann
1912 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1913 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1914 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1915 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1916 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1917 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1918 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1919 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1920 b95479a5 Michael Hanselmann
1921 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1922 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1923 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1924 b95479a5 Michael Hanselmann
1925 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1926 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1927 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1928 b95479a5 Michael Hanselmann
1929 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1930 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1931 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1932 b95479a5 Michael Hanselmann
1933 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1934 b95479a5 Michael Hanselmann
1935 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1936 b95479a5 Michael Hanselmann
1937 b95479a5 Michael Hanselmann
    counter = itertools.count()
1938 b95479a5 Michael Hanselmann
    while True:
1939 b95479a5 Michael Hanselmann
      attempt = counter.next()
1940 b95479a5 Michael Hanselmann
1941 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1942 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1943 b95479a5 Michael Hanselmann
1944 b95479a5 Michael Hanselmann
      if attempt == 0:
1945 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1946 b95479a5 Michael Hanselmann
        pass
1947 b95479a5 Michael Hanselmann
      elif attempt < 4:
1948 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1949 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1950 b95479a5 Michael Hanselmann
      elif attempt == 4:
1951 b95479a5 Michael Hanselmann
        # Other job failed
1952 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1953 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1954 b95479a5 Michael Hanselmann
1955 b95479a5 Michael Hanselmann
      if attempt == 0:
1956 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1957 b95479a5 Michael Hanselmann
      else:
1958 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1959 b95479a5 Michael Hanselmann
1960 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1961 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
1962 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1963 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1964 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1965 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1966 b95479a5 Michael Hanselmann
1967 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
1968 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1969 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1970 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1971 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1972 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1973 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1974 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1975 b95479a5 Michael Hanselmann
        continue
1976 b95479a5 Michael Hanselmann
1977 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1978 b95479a5 Michael Hanselmann
        # Last opcode
1979 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1980 b95479a5 Michael Hanselmann
        break
1981 b95479a5 Michael Hanselmann
1982 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1983 b95479a5 Michael Hanselmann
1984 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1985 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1986 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1987 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1988 b95479a5 Michael Hanselmann
1989 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1990 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1991 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1992 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1993 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR,
1994 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR]]),
1995 b95479a5 Michael Hanselmann
1996 b95479a5 Michael Hanselmann
    (opresult, ) = job.GetInfo(["opresult"])
1997 b95479a5 Michael Hanselmann
    self.assertEqual(len(opresult), len(ops))
1998 b95479a5 Michael Hanselmann
    self.assertEqual(opresult[0], "Res0")
1999 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[1]))
2000 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[2]))
2001 b95479a5 Michael Hanselmann
2002 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
2003 b95479a5 Michael Hanselmann
2004 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
2005 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2006 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
2007 b95479a5 Michael Hanselmann
2008 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
2009 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
2010 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
2011 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
2012 b95479a5 Michael Hanselmann
2013 be760ba8 Michael Hanselmann
2014 df5a5730 Michael Hanselmann
class TestEvaluateJobProcessorResult(unittest.TestCase):
2015 df5a5730 Michael Hanselmann
  def testFinished(self):
2016 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2017 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(30953)
2018 df5a5730 Michael Hanselmann
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2019 df5a5730 Michael Hanselmann
                                       jqueue._JobProcessor.FINISHED)
2020 df5a5730 Michael Hanselmann
    self.assertEqual(depmgr.GetNextNotification(), job.id)
2021 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2022 df5a5730 Michael Hanselmann
2023 df5a5730 Michael Hanselmann
  def testDefer(self):
2024 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2025 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(11326, priority=5463)
2026 df5a5730 Michael Hanselmann
    try:
2027 df5a5730 Michael Hanselmann
      jqueue._EvaluateJobProcessorResult(depmgr, job,
2028 df5a5730 Michael Hanselmann
                                         jqueue._JobProcessor.DEFER)
2029 df5a5730 Michael Hanselmann
    except workerpool.DeferTask, err:
2030 df5a5730 Michael Hanselmann
      self.assertEqual(err.priority, 5463)
2031 df5a5730 Michael Hanselmann
    else:
2032 df5a5730 Michael Hanselmann
      self.fail("Didn't raise exception")
2033 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2034 df5a5730 Michael Hanselmann
2035 df5a5730 Michael Hanselmann
  def testWaitdep(self):
2036 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2037 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(21317)
2038 df5a5730 Michael Hanselmann
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2039 df5a5730 Michael Hanselmann
                                       jqueue._JobProcessor.WAITDEP)
2040 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2041 df5a5730 Michael Hanselmann
2042 df5a5730 Michael Hanselmann
  def testOther(self):
2043 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2044 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(5813)
2045 df5a5730 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
2046 df5a5730 Michael Hanselmann
                      jqueue._EvaluateJobProcessorResult,
2047 df5a5730 Michael Hanselmann
                      depmgr, job, "Other result")
2048 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2049 df5a5730 Michael Hanselmann
2050 df5a5730 Michael Hanselmann
2051 26d3fd2f Michael Hanselmann
class _FakeTimeoutStrategy:
2052 26d3fd2f Michael Hanselmann
  def __init__(self, timeouts):
2053 26d3fd2f Michael Hanselmann
    self.timeouts = timeouts
2054 26d3fd2f Michael Hanselmann
    self.attempts = 0
2055 26d3fd2f Michael Hanselmann
    self.last_timeout = None
2056 26d3fd2f Michael Hanselmann
2057 26d3fd2f Michael Hanselmann
  def NextAttempt(self):
2058 26d3fd2f Michael Hanselmann
    self.attempts += 1
2059 26d3fd2f Michael Hanselmann
    if self.timeouts:
2060 26d3fd2f Michael Hanselmann
      timeout = self.timeouts.pop(0)
2061 26d3fd2f Michael Hanselmann
    else:
2062 26d3fd2f Michael Hanselmann
      timeout = None
2063 26d3fd2f Michael Hanselmann
    self.last_timeout = timeout
2064 26d3fd2f Michael Hanselmann
    return timeout
2065 26d3fd2f Michael Hanselmann
2066 26d3fd2f Michael Hanselmann
2067 26d3fd2f Michael Hanselmann
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
2068 26d3fd2f Michael Hanselmann
  def setUp(self):
2069 26d3fd2f Michael Hanselmann
    self.queue = _FakeQueueForProc()
2070 26d3fd2f Michael Hanselmann
    self.job = None
2071 26d3fd2f Michael Hanselmann
    self.curop = None
2072 26d3fd2f Michael Hanselmann
    self.opcounter = None
2073 26d3fd2f Michael Hanselmann
    self.timeout_strategy = None
2074 26d3fd2f Michael Hanselmann
    self.retries = 0
2075 26d3fd2f Michael Hanselmann
    self.prev_tsop = None
2076 26d3fd2f Michael Hanselmann
    self.prev_prio = None
2077 5fd6b694 Michael Hanselmann
    self.prev_status = None
2078 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = None
2079 26d3fd2f Michael Hanselmann
    self.gave_lock = None
2080 26d3fd2f Michael Hanselmann
    self.done_lock_before_blocking = False
2081 26d3fd2f Michael Hanselmann
2082 f23db633 Michael Hanselmann
  def _BeforeStart(self, timeout, priority):
2083 26d3fd2f Michael Hanselmann
    job = self.job
2084 26d3fd2f Michael Hanselmann
2085 5fd6b694 Michael Hanselmann
    # If status has changed, job must've been written
2086 5fd6b694 Michael Hanselmann
    if self.prev_status != self.job.ops[self.curop].status:
2087 5fd6b694 Michael Hanselmann
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2088 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2089 5fd6b694 Michael Hanselmann
2090 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
2091 47099cd1 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2092 26d3fd2f Michael Hanselmann
2093 26d3fd2f Michael Hanselmann
    ts = self.timeout_strategy
2094 26d3fd2f Michael Hanselmann
2095 26d3fd2f Michael Hanselmann
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
2096 26d3fd2f Michael Hanselmann
    self.assertEqual(timeout, ts.last_timeout)
2097 f23db633 Michael Hanselmann
    self.assertEqual(priority, job.ops[self.curop].priority)
2098 26d3fd2f Michael Hanselmann
2099 26d3fd2f Michael Hanselmann
    self.gave_lock = True
2100 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = priority
2101 26d3fd2f Michael Hanselmann
2102 26d3fd2f Michael Hanselmann
    if (self.curop == 3 and
2103 26d3fd2f Michael Hanselmann
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
2104 26d3fd2f Michael Hanselmann
      # Give locks before running into blocking acquire
2105 26d3fd2f Michael Hanselmann
      assert self.retries == 7
2106 26d3fd2f Michael Hanselmann
      self.retries = 0
2107 26d3fd2f Michael Hanselmann
      self.done_lock_before_blocking = True
2108 26d3fd2f Michael Hanselmann
      return
2109 26d3fd2f Michael Hanselmann
2110 26d3fd2f Michael Hanselmann
    if self.retries > 0:
2111 26d3fd2f Michael Hanselmann
      self.assert_(timeout is not None)
2112 26d3fd2f Michael Hanselmann
      self.retries -= 1
2113 26d3fd2f Michael Hanselmann
      self.gave_lock = False
2114 26d3fd2f Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
2115 26d3fd2f Michael Hanselmann
2116 26d3fd2f Michael Hanselmann
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
2117 26d3fd2f Michael Hanselmann
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
2118 26d3fd2f Michael Hanselmann
      assert not ts.timeouts
2119 26d3fd2f Michael Hanselmann
      self.assert_(timeout is None)
2120 26d3fd2f Michael Hanselmann
2121 26d3fd2f Michael Hanselmann
  def _AfterStart(self, op, cbs):
2122 26d3fd2f Michael Hanselmann
    job = self.job
2123 26d3fd2f Michael Hanselmann
2124 5fd6b694 Michael Hanselmann
    # Setting to "running" requires an update
2125 ebb2a2a3 Michael Hanselmann
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2126 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2127 5fd6b694 Michael Hanselmann
2128 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
2129 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
2130 26d3fd2f Michael Hanselmann
2131 26d3fd2f Michael Hanselmann
    # Job is running, cancelling shouldn't be possible
2132 26d3fd2f Michael Hanselmann
    (success, _) = job.Cancel()
2133 26d3fd2f Michael Hanselmann
    self.assertFalse(success)
2134 26d3fd2f Michael Hanselmann
2135 26d3fd2f Michael Hanselmann
  def _NextOpcode(self):
2136 26d3fd2f Michael Hanselmann
    self.curop = self.opcounter.next()
2137 26d3fd2f Michael Hanselmann
    self.prev_prio = self.job.ops[self.curop].priority
2138 5fd6b694 Michael Hanselmann
    self.prev_status = self.job.ops[self.curop].status
2139 26d3fd2f Michael Hanselmann
2140 26d3fd2f Michael Hanselmann
  def _NewTimeoutStrategy(self):
2141 26d3fd2f Michael Hanselmann
    job = self.job
2142 26d3fd2f Michael Hanselmann
2143 26d3fd2f Michael Hanselmann
    self.assertEqual(self.retries, 0)
2144 26d3fd2f Michael Hanselmann
2145 26d3fd2f Michael Hanselmann
    if self.prev_tsop == self.curop:
2146 26d3fd2f Michael Hanselmann
      # Still on the same opcode, priority must've been increased
2147 26d3fd2f Michael Hanselmann
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
2148 26d3fd2f Michael Hanselmann
2149 26d3fd2f Michael Hanselmann
    if self.curop == 1:
2150 26d3fd2f Michael Hanselmann
      # Normal retry
2151 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
2152 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts) - 1
2153 26d3fd2f Michael Hanselmann
2154 26d3fd2f Michael Hanselmann
    elif self.curop == 2:
2155 26d3fd2f Michael Hanselmann
      # Let this run into a blocking acquire
2156 26d3fd2f Michael Hanselmann
      timeouts = range(11, 61, 12)
2157 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
2158 26d3fd2f Michael Hanselmann
2159 26d3fd2f Michael Hanselmann
    elif self.curop == 3:
2160 26d3fd2f Michael Hanselmann
      # Wait for priority to increase, but give lock before blocking acquire
2161 26d3fd2f Michael Hanselmann
      timeouts = range(12, 100, 14)
2162 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
2163 26d3fd2f Michael Hanselmann
2164 26d3fd2f Michael Hanselmann
      self.assertFalse(self.done_lock_before_blocking)
2165 26d3fd2f Michael Hanselmann
2166 26d3fd2f Michael Hanselmann
    elif self.curop == 4:
2167 26d3fd2f Michael Hanselmann
      self.assert_(self.done_lock_before_blocking)
2168 26d3fd2f Michael Hanselmann
2169 26d3fd2f Michael Hanselmann
      # Timeouts, but no need to retry
2170 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
2171 26d3fd2f Michael Hanselmann
      self.retries = 0
2172 26d3fd2f Michael Hanselmann
2173 26d3fd2f Michael Hanselmann
    elif self.curop == 5:
2174 26d3fd2f Michael Hanselmann
      # Normal retry
2175 26d3fd2f Michael Hanselmann
      timeouts = range(19, 100, 11)
2176 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
2177 26d3fd2f Michael Hanselmann
2178 26d3fd2f Michael Hanselmann
    else:
2179 26d3fd2f Michael Hanselmann
      timeouts = []
2180 26d3fd2f Michael Hanselmann
      self.retries = 0
2181 26d3fd2f Michael Hanselmann
2182 26d3fd2f Michael Hanselmann
    assert len(job.ops) == 10
2183 26d3fd2f Michael Hanselmann
    assert self.retries <= len(timeouts)
2184 26d3fd2f Michael Hanselmann
2185 26d3fd2f Michael Hanselmann
    ts = _FakeTimeoutStrategy(timeouts)
2186 26d3fd2f Michael Hanselmann
2187 26d3fd2f Michael Hanselmann
    self.timeout_strategy = ts
2188 26d3fd2f Michael Hanselmann
    self.prev_tsop = self.curop
2189 26d3fd2f Michael Hanselmann
    self.prev_prio = job.ops[self.curop].priority
2190 26d3fd2f Michael Hanselmann
2191 26d3fd2f Michael Hanselmann
    return ts
2192 26d3fd2f Michael Hanselmann
2193 26d3fd2f Michael Hanselmann
  def testTimeout(self):
2194 26d3fd2f Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2195 26d3fd2f Michael Hanselmann
           for i in range(10)]
2196 26d3fd2f Michael Hanselmann
2197 26d3fd2f Michael Hanselmann
    # Create job
2198 26d3fd2f Michael Hanselmann
    job_id = 15801
2199 26d3fd2f Michael Hanselmann
    job = self._CreateJob(self.queue, job_id, ops)
2200 26d3fd2f Michael Hanselmann
    self.job = job
2201 26d3fd2f Michael Hanselmann
2202 26d3fd2f Michael Hanselmann
    self.opcounter = itertools.count(0)
2203 26d3fd2f Michael Hanselmann
2204 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
2205 ebb2a2a3 Michael Hanselmann
                                    self._AfterStart)
2206 26d3fd2f Michael Hanselmann
    tsf = self._NewTimeoutStrategy
2207 26d3fd2f Michael Hanselmann
2208 26d3fd2f Michael Hanselmann
    self.assertFalse(self.done_lock_before_blocking)
2209 26d3fd2f Michael Hanselmann
2210 5fd6b694 Michael Hanselmann
    while True:
2211 26d3fd2f Michael Hanselmann
      proc = jqueue._JobProcessor(self.queue, opexec, job,
2212 26d3fd2f Michael Hanselmann
                                  _timeout_strategy_factory=tsf)
2213 26d3fd2f Michael Hanselmann
2214 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
2215 5fd6b694 Michael Hanselmann
2216 5fd6b694 Michael Hanselmann
      if self.curop is not None:
2217 5fd6b694 Michael Hanselmann
        self.prev_status = self.job.ops[self.curop].status
2218 5fd6b694 Michael Hanselmann
2219 5fd6b694 Michael Hanselmann
      self.lock_acq_prio = None
2220 5fd6b694 Michael Hanselmann
2221 26d3fd2f Michael Hanselmann
      result = proc(_nextop_fn=self._NextOpcode)
2222 5fd6b694 Michael Hanselmann
      assert self.curop is not None
2223 5fd6b694 Michael Hanselmann
2224 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
2225 5fd6b694 Michael Hanselmann
        # Got lock and/or job is done, result must've been written
2226 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
2227 5fd6b694 Michael Hanselmann
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2228 5fd6b694 Michael Hanselmann
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
2229 5fd6b694 Michael Hanselmann
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
2230 5fd6b694 Michael Hanselmann
        self.assert_(job.ops[self.curop].exec_timestamp)
2231 5fd6b694 Michael Hanselmann
2232 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
2233 26d3fd2f Michael Hanselmann
        self.assertFalse(job.cur_opctx)
2234 26d3fd2f Michael Hanselmann
        break
2235 26d3fd2f Michael Hanselmann
2236 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
2237 26d3fd2f Michael Hanselmann
2238 5fd6b694 Michael Hanselmann
      if self.curop == 0:
2239 5fd6b694 Michael Hanselmann
        self.assertEqual(job.ops[self.curop].start_timestamp,
2240 5fd6b694 Michael Hanselmann
                         job.start_timestamp)
2241 5fd6b694 Michael Hanselmann
2242 26d3fd2f Michael Hanselmann
      if self.gave_lock:
2243 5fd6b694 Michael Hanselmann
        # Opcode finished, but job not yet done
2244 5fd6b694 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2245 26d3fd2f Michael Hanselmann
      else:
2246 5fd6b694 Michael Hanselmann
        # Did not get locks
2247 26d3fd2f Michael Hanselmann
        self.assert_(job.cur_opctx)
2248 26d3fd2f Michael Hanselmann
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
2249 26d3fd2f Michael Hanselmann
                         self.timeout_strategy.NextAttempt)
2250 5fd6b694 Michael Hanselmann
        self.assertFalse(job.ops[self.curop].exec_timestamp)
2251 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2252 5fd6b694 Michael Hanselmann
2253 5fd6b694 Michael Hanselmann
        # If priority has changed since acquiring locks, the job must've been
2254 5fd6b694 Michael Hanselmann
        # updated
2255 5fd6b694 Michael Hanselmann
        if self.lock_acq_prio != job.ops[self.curop].priority:
2256 5fd6b694 Michael Hanselmann
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2257 5fd6b694 Michael Hanselmann
2258 5fd6b694 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
2259 26d3fd2f Michael Hanselmann
2260 26d3fd2f Michael Hanselmann
      self.assert_(job.start_timestamp)
2261 26d3fd2f Michael Hanselmann
      self.assertFalse(job.end_timestamp)
2262 26d3fd2f Michael Hanselmann
2263 26d3fd2f Michael Hanselmann
    self.assertEqual(self.curop, len(job.ops) - 1)
2264 26d3fd2f Michael Hanselmann
    self.assertEqual(self.job, job)
2265 26d3fd2f Michael Hanselmann
    self.assertEqual(self.opcounter.next(), len(job.ops))
2266 26d3fd2f Michael Hanselmann
    self.assert_(self.done_lock_before_blocking)
2267 26d3fd2f Michael Hanselmann
2268 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2269 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2270 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2271 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
2272 26d3fd2f Michael Hanselmann
                     [[op.input.result for op in job.ops]])
2273 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
2274 26d3fd2f Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
2275 26d3fd2f Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
2276 26d3fd2f Michael Hanselmann
                            for op in job.ops))
2277 26d3fd2f Michael Hanselmann
2278 66bd7445 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
2279 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2280 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
2281 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2282 26d3fd2f Michael Hanselmann
2283 26d3fd2f Michael Hanselmann
2284 4679547e Michael Hanselmann
class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
2285 4679547e Michael Hanselmann
  def setUp(self):
2286 4679547e Michael Hanselmann
    self.queue = _FakeQueueForProc()
2287 4679547e Michael Hanselmann
    self.opexecprio = []
2288 4679547e Michael Hanselmann
2289 4679547e Michael Hanselmann
  def _BeforeStart(self, timeout, priority):
2290 4679547e Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
2291 4679547e Michael Hanselmann
    self.opexecprio.append(priority)
2292 4679547e Michael Hanselmann
2293 4679547e Michael Hanselmann
  def testChangePriorityWhileRunning(self):
2294 4679547e Michael Hanselmann
    # Tests changing the priority on a job while it has finished opcodes
2295 4679547e Michael Hanselmann
    # (successful) and more, unprocessed ones
2296 4679547e Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2297 4679547e Michael Hanselmann
           for i in range(3)]
2298 4679547e Michael Hanselmann
2299 4679547e Michael Hanselmann
    # Create job
2300 4679547e Michael Hanselmann
    job_id = 3499
2301 4679547e Michael Hanselmann
    job = self._CreateJob(self.queue, job_id, ops)
2302 4679547e Michael Hanselmann
2303 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2304 4679547e Michael Hanselmann
2305 4679547e Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
2306 4679547e Michael Hanselmann
2307 4679547e Michael Hanselmann
    # Run first opcode
2308 4679547e Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2309 4679547e Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
2310 4679547e Michael Hanselmann
2311 4679547e Michael Hanselmann
    # Job goes back to queued
2312 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2313 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2314 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2315 4679547e Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
2316 4679547e Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
2317 4679547e Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
2318 4679547e Michael Hanselmann
                      ["Res0", None, None]])
2319 4679547e Michael Hanselmann
2320 4679547e Michael Hanselmann
    self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
2321 4679547e Michael Hanselmann
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2322 4679547e Michael Hanselmann
2323 4679547e Michael Hanselmann
    # Change priority
2324 4679547e Michael Hanselmann
    self.assertEqual(job.ChangePriority(-10),
2325 4679547e Michael Hanselmann
                     (True,
2326 4679547e Michael Hanselmann
                      ("Priorities of pending opcodes for job 3499 have"
2327 4679547e Michael Hanselmann
                       " been changed to -10")))
2328 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -10)
2329 4679547e Michael Hanselmann
2330 4679547e Michael Hanselmann
    # Process second opcode
2331 4679547e Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2332 4679547e Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
2333 4679547e Michael Hanselmann
2334 4679547e Michael Hanselmann
    self.assertEqual(self.opexecprio.pop(0), -10)
2335 4679547e Michael Hanselmann
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2336 4679547e Michael Hanselmann
2337 4679547e Michael Hanselmann
    # Check status
2338 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2339 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -10)
2340 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2341 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
2342 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2343 4679547e Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
2344 4679547e Michael Hanselmann
                       constants.OP_STATUS_SUCCESS,
2345 4679547e Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
2346 4679547e Michael Hanselmann
                      ["Res0", "Res1", None]])
2347 4679547e Michael Hanselmann
2348 4679547e Michael Hanselmann
    # Change priority once more
2349 4679547e Michael Hanselmann
    self.assertEqual(job.ChangePriority(5),
2350 4679547e Michael Hanselmann
                     (True,
2351 4679547e Michael Hanselmann
                      ("Priorities of pending opcodes for job 3499 have"
2352 4679547e Michael Hanselmann
                       " been changed to 5")))
2353 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), 5)
2354 4679547e Michael Hanselmann
2355 4679547e Michael Hanselmann
    # Process third opcode
2356 4679547e Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2357 4679547e Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
2358 4679547e Michael Hanselmann
2359 4679547e Michael Hanselmann
    self.assertEqual(self.opexecprio.pop(0), 5)
2360 4679547e Michael Hanselmann
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2361 4679547e Michael Hanselmann
2362 4679547e Michael Hanselmann
    # Check status
2363 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2364 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2365 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2366 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2367 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2368 4679547e Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
2369 4679547e Michael Hanselmann
                       constants.OP_STATUS_SUCCESS,
2370 4679547e Michael Hanselmann
                       constants.OP_STATUS_SUCCESS],
2371 4679547e Michael Hanselmann
                      ["Res0", "Res1", "Res2"]])
2372 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
2373 4679547e Michael Hanselmann
                     [constants.OP_PRIO_DEFAULT, -10, 5])
2374 4679547e Michael Hanselmann
2375 4679547e Michael Hanselmann
2376 1b5150ab Michael Hanselmann
class _IdOnlyFakeJob:
2377 1b5150ab Michael Hanselmann
  def __init__(self, job_id, priority=NotImplemented):
2378 1b5150ab Michael Hanselmann
    self.id = str(job_id)
2379 1b5150ab Michael Hanselmann
    self._priority = priority
2380 1b5150ab Michael Hanselmann
2381 1b5150ab Michael Hanselmann
  def CalcPriority(self):
2382 1b5150ab Michael Hanselmann
    return self._priority
2383 b95479a5 Michael Hanselmann
2384 1b5150ab Michael Hanselmann
2385 1b5150ab Michael Hanselmann
class TestJobDependencyManager(unittest.TestCase):
2386 b95479a5 Michael Hanselmann
  def setUp(self):
2387 b95479a5 Michael Hanselmann
    self._status = []
2388 b95479a5 Michael Hanselmann
    self._queue = []
2389 b95479a5 Michael Hanselmann
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
2390 b95479a5 Michael Hanselmann
2391 b95479a5 Michael Hanselmann
  def _GetStatus(self, job_id):
2392 b95479a5 Michael Hanselmann
    (exp_job_id, result) = self._status.pop(0)
2393 b95479a5 Michael Hanselmann
    self.assertEqual(exp_job_id, job_id)
2394 b95479a5 Michael Hanselmann
    return result
2395 b95479a5 Michael Hanselmann
2396 b95479a5 Michael Hanselmann
  def _Enqueue(self, jobs):
2397 37d76f1e Michael Hanselmann
    self.assertFalse(self.jdm._lock.is_owned(),
2398 37d76f1e Michael Hanselmann
                     msg=("Must not own manager lock while re-adding jobs"
2399 37d76f1e Michael Hanselmann
                          " (potential deadlock)"))
2400 b95479a5 Michael Hanselmann
    self._queue.append(jobs)
2401 b95479a5 Michael Hanselmann
2402 b95479a5 Michael Hanselmann
  def testNotFinalizedThenCancel(self):
2403 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(17697)
2404 b95479a5 Michael Hanselmann
    job_id = str(28625)
2405 b95479a5 Michael Hanselmann
2406 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2407 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2408 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2409 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2410 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2411 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2412 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2413 b95479a5 Michael Hanselmann
      job_id: set([job]),
2414 b95479a5 Michael Hanselmann
      })
2415 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2416 fcb21ad7 Michael Hanselmann
      ("job/28625", None, None, [("job", [job.id])])
2417 fcb21ad7 Michael Hanselmann
      ])
2418 b95479a5 Michael Hanselmann
2419 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2420 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2421 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CANCEL)
2422 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2423 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2424 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2425 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2426 b95479a5 Michael Hanselmann
2427 942e2262 Michael Hanselmann
  def testNotFinalizedThenQueued(self):
2428 942e2262 Michael Hanselmann
    # This can happen on a queue shutdown
2429 942e2262 Michael Hanselmann
    job = _IdOnlyFakeJob(1320)
2430 942e2262 Michael Hanselmann
    job_id = str(22971)
2431 942e2262 Michael Hanselmann
2432 942e2262 Michael Hanselmann
    for i in range(5):
2433 942e2262 Michael Hanselmann
      if i > 2:
2434 942e2262 Michael Hanselmann
        self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2435 942e2262 Michael Hanselmann
      else:
2436 942e2262 Michael Hanselmann
        self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2437 942e2262 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2438 942e2262 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
2439 942e2262 Michael Hanselmann
      self.assertFalse(self._status)
2440 942e2262 Michael Hanselmann
      self.assertFalse(self._queue)
2441 942e2262 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
2442 942e2262 Michael Hanselmann
      self.assertEqual(self.jdm._waiters, {
2443 942e2262 Michael Hanselmann
        job_id: set([job]),
2444 942e2262 Michael Hanselmann
        })
2445 942e2262 Michael Hanselmann
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2446 942e2262 Michael Hanselmann
        ("job/22971", None, None, [("job", [job.id])])
2447 942e2262 Michael Hanselmann
        ])
2448 942e2262 Michael Hanselmann
2449 b95479a5 Michael Hanselmann
  def testRequireCancel(self):
2450 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(5278)
2451 b95479a5 Michael Hanselmann
    job_id = str(9610)
2452 b95479a5 Michael Hanselmann
    dep_status = [constants.JOB_STATUS_CANCELED]
2453 b95479a5 Michael Hanselmann
2454 47099cd1 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
2455 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2456 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2457 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2458 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2459 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2460 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2461 b95479a5 Michael Hanselmann
      job_id: set([job]),
2462 b95479a5 Michael Hanselmann
      })
2463 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2464 fcb21ad7 Michael Hanselmann
      ("job/9610", None, None, [("job", [job.id])])
2465 fcb21ad7 Michael Hanselmann
      ])
2466 b95479a5 Michael Hanselmann
2467 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2468 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2469 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2470 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2471 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2472 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2473 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2474 b95479a5 Michael Hanselmann
2475 b95479a5 Michael Hanselmann
  def testRequireError(self):
2476 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(21459)
2477 b95479a5 Michael Hanselmann
    job_id = str(25519)
2478 b95479a5 Michael Hanselmann
    dep_status = [constants.JOB_STATUS_ERROR]
2479 b95479a5 Michael Hanselmann
2480 47099cd1 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
2481 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2482 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2483 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2484 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2485 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2486 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2487 b95479a5 Michael Hanselmann
      job_id: set([job]),
2488 b95479a5 Michael Hanselmann
      })
2489 b95479a5 Michael Hanselmann
2490 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2491 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2492 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2493 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2494 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2495 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2496 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2497 b95479a5 Michael Hanselmann
2498 b95479a5 Michael Hanselmann
  def testRequireMultiple(self):
2499 b95479a5 Michael Hanselmann
    dep_status = list(constants.JOBS_FINALIZED)
2500 b95479a5 Michael Hanselmann
2501 b95479a5 Michael Hanselmann
    for end_status in dep_status:
2502 1b5150ab Michael Hanselmann
      job = _IdOnlyFakeJob(21343)
2503 b95479a5 Michael Hanselmann
      job_id = str(14609)
2504 b95479a5 Michael Hanselmann
2505 47099cd1 Michael Hanselmann
      self._status.append((job_id, constants.JOB_STATUS_WAITING))
2506 b95479a5 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2507 b95479a5 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
2508 b95479a5 Michael Hanselmann
      self.assertFalse(self._status)
2509 b95479a5 Michael Hanselmann
      self.assertFalse(self._queue)
2510 b95479a5 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
2511 b95479a5 Michael Hanselmann
      self.assertEqual(self.jdm._waiters, {
2512 b95479a5 Michael Hanselmann
        job_id: set([job]),
2513 b95479a5 Michael Hanselmann
        })
2514 fcb21ad7 Michael Hanselmann
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2515 fcb21ad7 Michael Hanselmann
        ("job/14609", None, None, [("job", [job.id])])
2516 fcb21ad7 Michael Hanselmann
        ])
2517 b95479a5 Michael Hanselmann
2518 b95479a5 Michael Hanselmann
      self._status.append((job_id, end_status))
2519 b95479a5 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2520 b95479a5 Michael Hanselmann
      self.assertEqual(result, self.jdm.CONTINUE)
2521 b95479a5 Michael Hanselmann
      self.assertFalse(self._status)
2522 b95479a5 Michael Hanselmann
      self.assertFalse(self._queue)
2523 b95479a5 Michael Hanselmann
      self.assertFalse(self.jdm.JobWaiting(job))
2524 fcb21ad7 Michael Hanselmann
      self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2525 b95479a5 Michael Hanselmann
2526 b95479a5 Michael Hanselmann
  def testNotify(self):
2527 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(8227)
2528 b95479a5 Michael Hanselmann
    job_id = str(4113)
2529 b95479a5 Michael Hanselmann
2530 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2531 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2532 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2533 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2534 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2535 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2536 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2537 b95479a5 Michael Hanselmann
      job_id: set([job]),
2538 b95479a5 Michael Hanselmann
      })
2539 b95479a5 Michael Hanselmann
2540 b95479a5 Michael Hanselmann
    self.jdm.NotifyWaiters(job_id)
2541 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2542 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm._waiters)
2543 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2544 b95479a5 Michael Hanselmann
    self.assertEqual(self._queue, [set([job])])
2545 b95479a5 Michael Hanselmann
2546 b95479a5 Michael Hanselmann
  def testWrongStatus(self):
2547 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(10102)
2548 b95479a5 Michael Hanselmann
    job_id = str(1271)
2549 b95479a5 Michael Hanselmann
2550 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2551 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2552 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2553 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2554 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2555 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2556 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2557 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2558 b95479a5 Michael Hanselmann
      job_id: set([job]),
2559 b95479a5 Michael Hanselmann
      })
2560 b95479a5 Michael Hanselmann
2561 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2562 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2563 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2564 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WRONGSTATUS)
2565 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2566 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2567 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2568 b95479a5 Michael Hanselmann
2569 b95479a5 Michael Hanselmann
  def testCorrectStatus(self):
2570 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(24273)
2571 b95479a5 Michael Hanselmann
    job_id = str(23885)
2572 b95479a5 Michael Hanselmann
2573 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2574 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2575 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2576 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2577 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2578 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2579 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2580 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2581 b95479a5 Michael Hanselmann
      job_id: set([job]),
2582 b95479a5 Michael Hanselmann
      })
2583 b95479a5 Michael Hanselmann
2584 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2585 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2586 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2587 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2588 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2589 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2590 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2591 b95479a5 Michael Hanselmann
2592 b95479a5 Michael Hanselmann
  def testFinalizedRightAway(self):
2593 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(224)
2594 b95479a5 Michael Hanselmann
    job_id = str(3081)
2595 b95479a5 Michael Hanselmann
2596 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2597 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2598 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2599 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2600 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2601 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2602 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2603 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2604 b95479a5 Michael Hanselmann
      job_id: set(),
2605 b95479a5 Michael Hanselmann
      })
2606 b95479a5 Michael Hanselmann
2607 b95479a5 Michael Hanselmann
    # Force cleanup
2608 b95479a5 Michael Hanselmann
    self.jdm.NotifyWaiters("0")
2609 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm._waiters)
2610 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2611 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2612 b95479a5 Michael Hanselmann
2613 fcb21ad7 Michael Hanselmann
  def testMultipleWaiting(self):
2614 fcb21ad7 Michael Hanselmann
    # Use a deterministic random generator
2615 fcb21ad7 Michael Hanselmann
    rnd = random.Random(21402)
2616 fcb21ad7 Michael Hanselmann
2617 fcb21ad7 Michael Hanselmann
    job_ids = map(str, rnd.sample(range(1, 10000), 150))
2618 fcb21ad7 Michael Hanselmann
2619 fcb21ad7 Michael Hanselmann
    waiters = dict((job_ids.pop(),
2620 1b5150ab Michael Hanselmann
                    set(map(_IdOnlyFakeJob,
2621 fcb21ad7 Michael Hanselmann
                            [job_ids.pop()
2622 fcb21ad7 Michael Hanselmann
                             for _ in range(rnd.randint(1, 20))])))
2623 fcb21ad7 Michael Hanselmann
                   for _ in range(10))
2624 fcb21ad7 Michael Hanselmann
2625 fcb21ad7 Michael Hanselmann
    # Ensure there are no duplicate job IDs
2626 fcb21ad7 Michael Hanselmann
    assert not utils.FindDuplicates(waiters.keys() +
2627 fcb21ad7 Michael Hanselmann
                                    [job.id
2628 fcb21ad7 Michael Hanselmann
                                     for jobs in waiters.values()
2629 fcb21ad7 Michael Hanselmann
                                     for job in jobs])
2630 fcb21ad7 Michael Hanselmann
2631 fcb21ad7 Michael Hanselmann
    # Register all jobs as waiters
2632 fcb21ad7 Michael Hanselmann
    for job_id, job in [(job_id, job)
2633 fcb21ad7 Michael Hanselmann
                        for (job_id, jobs) in waiters.items()
2634 fcb21ad7 Michael Hanselmann
                        for job in jobs]:
2635 fcb21ad7 Michael Hanselmann
      self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2636 fcb21ad7 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id,
2637 fcb21ad7 Michael Hanselmann
                                              [constants.JOB_STATUS_SUCCESS])
2638 fcb21ad7 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
2639 fcb21ad7 Michael Hanselmann
      self.assertFalse(self._status)
2640 fcb21ad7 Michael Hanselmann
      self.assertFalse(self._queue)
2641 fcb21ad7 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
2642 fcb21ad7 Michael Hanselmann
2643 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, waiters)
2644 fcb21ad7 Michael Hanselmann
2645 fcb21ad7 Michael Hanselmann
    def _MakeSet((name, mode, owner_names, pending)):
2646 fcb21ad7 Michael Hanselmann
      return (name, mode, owner_names,
2647 fcb21ad7 Michael Hanselmann
              [(pendmode, set(pend)) for (pendmode, pend) in pending])
2648 fcb21ad7 Michael Hanselmann
2649 fcb21ad7 Michael Hanselmann
    def _CheckLockInfo():
2650 fcb21ad7 Michael Hanselmann
      info = self.jdm.GetLockInfo([query.LQ_PENDING])
2651 fcb21ad7 Michael Hanselmann
      self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2652 fcb21ad7 Michael Hanselmann
        ("job/%s" % job_id, None, None,
2653 fcb21ad7 Michael Hanselmann
         [("job", set([job.id for job in jobs]))])
2654 fcb21ad7 Michael Hanselmann
        for job_id, jobs in waiters.items()
2655 fcb21ad7 Michael Hanselmann
        if jobs
2656 fcb21ad7 Michael Hanselmann
        ]))
2657 fcb21ad7 Michael Hanselmann
2658 fcb21ad7 Michael Hanselmann
    _CheckLockInfo()
2659 fcb21ad7 Michael Hanselmann
2660 fcb21ad7 Michael Hanselmann
    # Notify in random order
2661 fcb21ad7 Michael Hanselmann
    for job_id in rnd.sample(waiters, len(waiters)):
2662 fcb21ad7 Michael Hanselmann
      # Remove from pending waiter list
2663 fcb21ad7 Michael Hanselmann
      jobs = waiters.pop(job_id)
2664 fcb21ad7 Michael Hanselmann
      for job in jobs:
2665 fcb21ad7 Michael Hanselmann
        self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2666 fcb21ad7 Michael Hanselmann
        (result, _) = self.jdm.CheckAndRegister(job, job_id,
2667 fcb21ad7 Michael Hanselmann
                                                [constants.JOB_STATUS_SUCCESS])
2668 fcb21ad7 Michael Hanselmann
        self.assertEqual(result, self.jdm.CONTINUE)
2669 fcb21ad7 Michael Hanselmann
        self.assertFalse(self._status)
2670 fcb21ad7 Michael Hanselmann
        self.assertFalse(self._queue)
2671 fcb21ad7 Michael Hanselmann
        self.assertFalse(self.jdm.JobWaiting(job))
2672 fcb21ad7 Michael Hanselmann
2673 fcb21ad7 Michael Hanselmann
      _CheckLockInfo()
2674 fcb21ad7 Michael Hanselmann
2675 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2676 fcb21ad7 Michael Hanselmann
2677 fcb21ad7 Michael Hanselmann
    assert not waiters
2678 fcb21ad7 Michael Hanselmann
2679 b95479a5 Michael Hanselmann
  def testSelfDependency(self):
2680 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(18937)
2681 b95479a5 Michael Hanselmann
2682 b95479a5 Michael Hanselmann
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2683 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2684 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.ERROR)
2685 b95479a5 Michael Hanselmann
2686 b95479a5 Michael Hanselmann
  def testJobDisappears(self):
2687 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(30540)
2688 b95479a5 Michael Hanselmann
    job_id = str(23769)
2689 b95479a5 Michael Hanselmann
2690 b95479a5 Michael Hanselmann
    def _FakeStatus(_):
2691 b95479a5 Michael Hanselmann
      raise errors.JobLost("#msg#")
2692 b95479a5 Michael Hanselmann
2693 b95479a5 Michael Hanselmann
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2694 b95479a5 Michael Hanselmann
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2695 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.ERROR)
2696 b95479a5 Michael Hanselmann
    self.assertFalse(jdm.JobWaiting(job))
2697 fcb21ad7 Michael Hanselmann
    self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2698 b95479a5 Michael Hanselmann
2699 b95479a5 Michael Hanselmann
2700 989a8bee Michael Hanselmann
if __name__ == "__main__":
2701 989a8bee Michael Hanselmann
  testutils.GanetiTestProgram()