Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.jqueue_unittest.py @ 91c17910

History | View | Annotate | Download (96.3 kB)

1 989a8bee Michael Hanselmann
#!/usr/bin/python
2 989a8bee Michael Hanselmann
#
3 989a8bee Michael Hanselmann
4 76b62028 Iustin Pop
# Copyright (C) 2010, 2011, 2012 Google Inc.
5 989a8bee Michael Hanselmann
#
6 989a8bee Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 989a8bee Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 989a8bee Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 989a8bee Michael Hanselmann
# (at your option) any later version.
10 989a8bee Michael Hanselmann
#
11 989a8bee Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 989a8bee Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 989a8bee Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 989a8bee Michael Hanselmann
# General Public License for more details.
15 989a8bee Michael Hanselmann
#
16 989a8bee Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 989a8bee Michael Hanselmann
# along with this program; if not, write to the Free Software
18 989a8bee Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 989a8bee Michael Hanselmann
# 02110-1301, USA.
20 989a8bee Michael Hanselmann
21 989a8bee Michael Hanselmann
22 989a8bee Michael Hanselmann
"""Script for testing ganeti.jqueue"""
23 989a8bee Michael Hanselmann
24 989a8bee Michael Hanselmann
import os
25 989a8bee Michael Hanselmann
import sys
26 989a8bee Michael Hanselmann
import unittest
27 989a8bee Michael Hanselmann
import tempfile
28 989a8bee Michael Hanselmann
import shutil
29 989a8bee Michael Hanselmann
import errno
30 26d3fd2f Michael Hanselmann
import itertools
31 fcb21ad7 Michael Hanselmann
import random
32 4679547e Michael Hanselmann
import operator
33 989a8bee Michael Hanselmann
34 383477e9 Michael Hanselmann
try:
35 383477e9 Michael Hanselmann
  # pylint: disable=E0611
36 383477e9 Michael Hanselmann
  from pyinotify import pyinotify
37 383477e9 Michael Hanselmann
except ImportError:
38 383477e9 Michael Hanselmann
  import pyinotify
39 383477e9 Michael Hanselmann
40 989a8bee Michael Hanselmann
from ganeti import constants
41 989a8bee Michael Hanselmann
from ganeti import utils
42 989a8bee Michael Hanselmann
from ganeti import errors
43 989a8bee Michael Hanselmann
from ganeti import jqueue
44 8f5c488d Michael Hanselmann
from ganeti import opcodes
45 8f5c488d Michael Hanselmann
from ganeti import compat
46 26d3fd2f Michael Hanselmann
from ganeti import mcpu
47 fcb21ad7 Michael Hanselmann
from ganeti import query
48 df5a5730 Michael Hanselmann
from ganeti import workerpool
49 989a8bee Michael Hanselmann
50 989a8bee Michael Hanselmann
import testutils
51 989a8bee Michael Hanselmann
52 989a8bee Michael Hanselmann
53 989a8bee Michael Hanselmann
class _FakeJob:
54 989a8bee Michael Hanselmann
  def __init__(self, job_id, status):
55 989a8bee Michael Hanselmann
    self.id = job_id
56 c0f6d0d8 Michael Hanselmann
    self.writable = False
57 989a8bee Michael Hanselmann
    self._status = status
58 989a8bee Michael Hanselmann
    self._log = []
59 989a8bee Michael Hanselmann
60 989a8bee Michael Hanselmann
  def SetStatus(self, status):
61 989a8bee Michael Hanselmann
    self._status = status
62 989a8bee Michael Hanselmann
63 989a8bee Michael Hanselmann
  def AddLogEntry(self, msg):
64 989a8bee Michael Hanselmann
    self._log.append((len(self._log), msg))
65 989a8bee Michael Hanselmann
66 989a8bee Michael Hanselmann
  def CalcStatus(self):
67 989a8bee Michael Hanselmann
    return self._status
68 989a8bee Michael Hanselmann
69 989a8bee Michael Hanselmann
  def GetInfo(self, fields):
70 989a8bee Michael Hanselmann
    result = []
71 989a8bee Michael Hanselmann
72 989a8bee Michael Hanselmann
    for name in fields:
73 989a8bee Michael Hanselmann
      if name == "status":
74 989a8bee Michael Hanselmann
        result.append(self._status)
75 989a8bee Michael Hanselmann
      else:
76 989a8bee Michael Hanselmann
        raise Exception("Unknown field")
77 989a8bee Michael Hanselmann
78 989a8bee Michael Hanselmann
    return result
79 989a8bee Michael Hanselmann
80 989a8bee Michael Hanselmann
  def GetLogEntries(self, newer_than):
81 989a8bee Michael Hanselmann
    assert newer_than is None or newer_than >= 0
82 989a8bee Michael Hanselmann
83 989a8bee Michael Hanselmann
    if newer_than is None:
84 989a8bee Michael Hanselmann
      return self._log
85 989a8bee Michael Hanselmann
86 989a8bee Michael Hanselmann
    return self._log[newer_than:]
87 989a8bee Michael Hanselmann
88 989a8bee Michael Hanselmann
89 989a8bee Michael Hanselmann
class TestJobChangesChecker(unittest.TestCase):
90 989a8bee Michael Hanselmann
  def testStatus(self):
91 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
92 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
93 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
94 989a8bee Michael Hanselmann
95 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
96 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
97 989a8bee Michael Hanselmann
98 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
99 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
100 989a8bee Michael Hanselmann
101 989a8bee Michael Hanselmann
    # job.id is used by checker
102 989a8bee Michael Hanselmann
    self.assertEqual(job.id, 9094)
103 989a8bee Michael Hanselmann
104 989a8bee Michael Hanselmann
  def testStatusWithPrev(self):
105 989a8bee Michael Hanselmann
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
106 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"],
107 989a8bee Michael Hanselmann
                                        [constants.JOB_STATUS_QUEUED], None)
108 989a8bee Michael Hanselmann
    self.assert_(checker(job) is None)
109 989a8bee Michael Hanselmann
110 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
111 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
112 989a8bee Michael Hanselmann
113 989a8bee Michael Hanselmann
  def testFinalStatus(self):
114 989a8bee Michael Hanselmann
    for status in constants.JOBS_FINALIZED:
115 989a8bee Michael Hanselmann
      job = _FakeJob(2178711, status)
116 989a8bee Michael Hanselmann
      checker = jqueue._JobChangesChecker(["status"], [status], None)
117 989a8bee Michael Hanselmann
      # There won't be any changes in this status, hence it should signal
118 989a8bee Michael Hanselmann
      # a change immediately
119 989a8bee Michael Hanselmann
      self.assertEqual(checker(job), ([status], []))
120 989a8bee Michael Hanselmann
121 989a8bee Michael Hanselmann
  def testLog(self):
122 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
123 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
124 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
125 989a8bee Michael Hanselmann
126 989a8bee Michael Hanselmann
    job.AddLogEntry("Hello World")
127 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker(job)
128 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
129 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"]])
130 989a8bee Michael Hanselmann
131 989a8bee Michael Hanselmann
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
132 989a8bee Michael Hanselmann
    self.assert_(checker2(job) is None)
133 989a8bee Michael Hanselmann
134 989a8bee Michael Hanselmann
    job.AddLogEntry("Foo Bar")
135 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_ERROR)
136 989a8bee Michael Hanselmann
137 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker2(job)
138 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
139 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
140 989a8bee Michael Hanselmann
141 989a8bee Michael Hanselmann
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
142 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker3(job)
143 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
144 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
145 989a8bee Michael Hanselmann
146 989a8bee Michael Hanselmann
147 989a8bee Michael Hanselmann
class TestJobChangesWaiter(unittest.TestCase):
148 989a8bee Michael Hanselmann
  def setUp(self):
149 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
150 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
151 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
152 989a8bee Michael Hanselmann
153 989a8bee Michael Hanselmann
  def tearDown(self):
154 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
155 989a8bee Michael Hanselmann
156 989a8bee Michael Hanselmann
  def _EnsureNotifierClosed(self, notifier):
157 989a8bee Michael Hanselmann
    try:
158 989a8bee Michael Hanselmann
      os.fstat(notifier._fd)
159 989a8bee Michael Hanselmann
    except EnvironmentError, err:
160 989a8bee Michael Hanselmann
      self.assertEqual(err.errno, errno.EBADF)
161 989a8bee Michael Hanselmann
    else:
162 989a8bee Michael Hanselmann
      self.fail("File descriptor wasn't closed")
163 989a8bee Michael Hanselmann
164 989a8bee Michael Hanselmann
  def testClose(self):
165 989a8bee Michael Hanselmann
    for wait in [False, True]:
166 989a8bee Michael Hanselmann
      waiter = jqueue._JobFileChangesWaiter(self.filename)
167 989a8bee Michael Hanselmann
      try:
168 989a8bee Michael Hanselmann
        if wait:
169 989a8bee Michael Hanselmann
          waiter.Wait(0.001)
170 989a8bee Michael Hanselmann
      finally:
171 989a8bee Michael Hanselmann
        waiter.Close()
172 989a8bee Michael Hanselmann
173 989a8bee Michael Hanselmann
      # Ensure file descriptor was closed
174 989a8bee Michael Hanselmann
      self._EnsureNotifierClosed(waiter._notifier)
175 989a8bee Michael Hanselmann
176 989a8bee Michael Hanselmann
  def testChangingFile(self):
177 989a8bee Michael Hanselmann
    waiter = jqueue._JobFileChangesWaiter(self.filename)
178 989a8bee Michael Hanselmann
    try:
179 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
180 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
181 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
182 989a8bee Michael Hanselmann
    finally:
183 989a8bee Michael Hanselmann
      waiter.Close()
184 989a8bee Michael Hanselmann
185 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._notifier)
186 989a8bee Michael Hanselmann
187 989a8bee Michael Hanselmann
  def testChangingFile2(self):
188 989a8bee Michael Hanselmann
    waiter = jqueue._JobChangesWaiter(self.filename)
189 989a8bee Michael Hanselmann
    try:
190 989a8bee Michael Hanselmann
      self.assertFalse(waiter._filewaiter)
191 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(0.1))
192 989a8bee Michael Hanselmann
      self.assert_(waiter._filewaiter)
193 989a8bee Michael Hanselmann
194 989a8bee Michael Hanselmann
      # File waiter is now used, but there have been no changes
195 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
196 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
197 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
198 989a8bee Michael Hanselmann
    finally:
199 989a8bee Michael Hanselmann
      waiter.Close()
200 989a8bee Michael Hanselmann
201 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
202 989a8bee Michael Hanselmann
203 989a8bee Michael Hanselmann
204 383477e9 Michael Hanselmann
class _FailingWatchManager(pyinotify.WatchManager):
205 383477e9 Michael Hanselmann
  """Subclass of L{pyinotify.WatchManager} which always fails to register.
206 383477e9 Michael Hanselmann

207 383477e9 Michael Hanselmann
  """
208 383477e9 Michael Hanselmann
  def add_watch(self, filename, mask):
209 383477e9 Michael Hanselmann
    assert mask == (pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] |
210 383477e9 Michael Hanselmann
                    pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"])
211 383477e9 Michael Hanselmann
212 383477e9 Michael Hanselmann
    return {
213 383477e9 Michael Hanselmann
      filename: -1,
214 383477e9 Michael Hanselmann
      }
215 383477e9 Michael Hanselmann
216 383477e9 Michael Hanselmann
217 989a8bee Michael Hanselmann
class TestWaitForJobChangesHelper(unittest.TestCase):
218 989a8bee Michael Hanselmann
  def setUp(self):
219 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
220 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
221 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
222 989a8bee Michael Hanselmann
223 989a8bee Michael Hanselmann
  def tearDown(self):
224 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
225 989a8bee Michael Hanselmann
226 989a8bee Michael Hanselmann
  def _LoadWaitingJob(self):
227 47099cd1 Michael Hanselmann
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
228 989a8bee Michael Hanselmann
229 989a8bee Michael Hanselmann
  def _LoadLostJob(self):
230 989a8bee Michael Hanselmann
    return None
231 989a8bee Michael Hanselmann
232 989a8bee Michael Hanselmann
  def testNoChanges(self):
233 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
234 989a8bee Michael Hanselmann
235 989a8bee Michael Hanselmann
    # No change
236 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
237 47099cd1 Michael Hanselmann
                          [constants.JOB_STATUS_WAITING], None, 0.1),
238 989a8bee Michael Hanselmann
                     constants.JOB_NOTCHANGED)
239 989a8bee Michael Hanselmann
240 989a8bee Michael Hanselmann
    # No previous information
241 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
242 989a8bee Michael Hanselmann
                          ["status"], None, None, 1.0),
243 47099cd1 Michael Hanselmann
                     ([constants.JOB_STATUS_WAITING], []))
244 989a8bee Michael Hanselmann
245 989a8bee Michael Hanselmann
  def testLostJob(self):
246 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
247 989a8bee Michael Hanselmann
    self.assert_(wfjc(self.filename, self._LoadLostJob,
248 989a8bee Michael Hanselmann
                      ["status"], None, None, 1.0) is None)
249 989a8bee Michael Hanselmann
250 383477e9 Michael Hanselmann
  def testNonExistentFile(self):
251 383477e9 Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
252 383477e9 Michael Hanselmann
253 383477e9 Michael Hanselmann
    filename = utils.PathJoin(self.tmpdir, "does-not-exist")
254 383477e9 Michael Hanselmann
    self.assertFalse(os.path.exists(filename))
255 383477e9 Michael Hanselmann
256 383477e9 Michael Hanselmann
    result = wfjc(filename, self._LoadLostJob, ["status"], None, None, 1.0,
257 383477e9 Michael Hanselmann
                  _waiter_cls=compat.partial(jqueue._JobChangesWaiter,
258 383477e9 Michael Hanselmann
                                             _waiter_cls=NotImplemented))
259 383477e9 Michael Hanselmann
    self.assertTrue(result is None)
260 383477e9 Michael Hanselmann
261 383477e9 Michael Hanselmann
  def testInotifyError(self):
262 383477e9 Michael Hanselmann
    jobfile_waiter_cls = \
263 383477e9 Michael Hanselmann
      compat.partial(jqueue._JobFileChangesWaiter,
264 383477e9 Michael Hanselmann
                     _inotify_wm_cls=_FailingWatchManager)
265 383477e9 Michael Hanselmann
266 383477e9 Michael Hanselmann
    jobchange_waiter_cls = \
267 383477e9 Michael Hanselmann
      compat.partial(jqueue._JobChangesWaiter, _waiter_cls=jobfile_waiter_cls)
268 383477e9 Michael Hanselmann
269 383477e9 Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
270 383477e9 Michael Hanselmann
271 383477e9 Michael Hanselmann
    # Test if failing to watch a job file (e.g. due to
272 383477e9 Michael Hanselmann
    # fs.inotify.max_user_watches being too low) raises errors.InotifyError
273 383477e9 Michael Hanselmann
    self.assertRaises(errors.InotifyError, wfjc,
274 383477e9 Michael Hanselmann
                      self.filename, self._LoadWaitingJob,
275 383477e9 Michael Hanselmann
                      ["status"], [constants.JOB_STATUS_WAITING], None, 1.0,
276 383477e9 Michael Hanselmann
                      _waiter_cls=jobchange_waiter_cls)
277 383477e9 Michael Hanselmann
278 989a8bee Michael Hanselmann
279 6760e4ed Michael Hanselmann
class TestEncodeOpError(unittest.TestCase):
280 6760e4ed Michael Hanselmann
  def test(self):
281 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
282 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
283 6760e4ed Michael Hanselmann
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
284 6760e4ed Michael Hanselmann
285 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
286 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
287 6760e4ed Michael Hanselmann
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
288 6760e4ed Michael Hanselmann
289 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
290 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
291 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
292 6760e4ed Michael Hanselmann
293 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError("Hello World")
294 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
295 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
296 6760e4ed Michael Hanselmann
297 6760e4ed Michael Hanselmann
298 8f5c488d Michael Hanselmann
class TestQueuedOpCode(unittest.TestCase):
299 8f5c488d Michael Hanselmann
  def testDefaults(self):
300 8f5c488d Michael Hanselmann
    def _Check(op):
301 8f5c488d Michael Hanselmann
      self.assertFalse(hasattr(op.input, "dry_run"))
302 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
303 8f5c488d Michael Hanselmann
      self.assertFalse(op.log)
304 8f5c488d Michael Hanselmann
      self.assert_(op.start_timestamp is None)
305 8f5c488d Michael Hanselmann
      self.assert_(op.exec_timestamp is None)
306 8f5c488d Michael Hanselmann
      self.assert_(op.end_timestamp is None)
307 8f5c488d Michael Hanselmann
      self.assert_(op.result is None)
308 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
309 8f5c488d Michael Hanselmann
310 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
311 8f5c488d Michael Hanselmann
    _Check(op1)
312 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
313 8f5c488d Michael Hanselmann
    _Check(op2)
314 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
315 8f5c488d Michael Hanselmann
316 8f5c488d Michael Hanselmann
  def testPriority(self):
317 8f5c488d Michael Hanselmann
    def _Check(op):
318 8f5c488d Michael Hanselmann
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
319 8f5c488d Michael Hanselmann
             "Default priority equals high priority; test can't work"
320 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
321 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
322 8f5c488d Michael Hanselmann
323 c6afb1ca Iustin Pop
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
324 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(inpop)
325 8f5c488d Michael Hanselmann
    _Check(op1)
326 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
327 8f5c488d Michael Hanselmann
    _Check(op2)
328 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
329 8f5c488d Michael Hanselmann
330 8f5c488d Michael Hanselmann
331 8f5c488d Michael Hanselmann
class TestQueuedJob(unittest.TestCase):
332 4679547e Michael Hanselmann
  def testNoOpCodes(self):
333 5f6b0b71 Michael Hanselmann
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
334 c0f6d0d8 Michael Hanselmann
                      None, 1, [], False)
335 5f6b0b71 Michael Hanselmann
336 8f5c488d Michael Hanselmann
  def testDefaults(self):
337 8f5c488d Michael Hanselmann
    job_id = 4260
338 8f5c488d Michael Hanselmann
    ops = [
339 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(),
340 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
341 8f5c488d Michael Hanselmann
      ]
342 8f5c488d Michael Hanselmann
343 8f5c488d Michael Hanselmann
    def _Check(job):
344 c0f6d0d8 Michael Hanselmann
      self.assertTrue(job.writable)
345 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
346 8f5c488d Michael Hanselmann
      self.assertEqual(job.log_serial, 0)
347 8f5c488d Michael Hanselmann
      self.assert_(job.received_timestamp)
348 8f5c488d Michael Hanselmann
      self.assert_(job.start_timestamp is None)
349 8f5c488d Michael Hanselmann
      self.assert_(job.end_timestamp is None)
350 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
351 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
352 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
353 8f5c488d Michael Hanselmann
      self.assertEqual(len(job.ops), len(ops))
354 8f5c488d Michael Hanselmann
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
355 8f5c488d Michael Hanselmann
                              for (inp, op) in zip(ops, job.ops)))
356 a06c6ae8 Michael Hanselmann
      self.assertRaises(errors.OpPrereqError, job.GetInfo,
357 5f6b0b71 Michael Hanselmann
                        ["unknown-field"])
358 5f6b0b71 Michael Hanselmann
      self.assertEqual(job.GetInfo(["summary"]),
359 5f6b0b71 Michael Hanselmann
                       [[op.input.Summary() for op in job.ops]])
360 8a3cd185 Michael Hanselmann
      self.assertFalse(job.archived)
361 8f5c488d Michael Hanselmann
362 c0f6d0d8 Michael Hanselmann
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
363 8f5c488d Michael Hanselmann
    _Check(job1)
364 8a3cd185 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True, False)
365 8f5c488d Michael Hanselmann
    _Check(job2)
366 8f5c488d Michael Hanselmann
    self.assertEqual(job1.Serialize(), job2.Serialize())
367 8f5c488d Michael Hanselmann
368 c0f6d0d8 Michael Hanselmann
  def testWritable(self):
369 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
370 c0f6d0d8 Michael Hanselmann
    self.assertFalse(job.writable)
371 c0f6d0d8 Michael Hanselmann
372 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
373 c0f6d0d8 Michael Hanselmann
    self.assertTrue(job.writable)
374 c0f6d0d8 Michael Hanselmann
375 8a3cd185 Michael Hanselmann
  def testArchived(self):
376 8a3cd185 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
377 8a3cd185 Michael Hanselmann
    self.assertFalse(job.archived)
378 8a3cd185 Michael Hanselmann
379 8a3cd185 Michael Hanselmann
    newjob = jqueue._QueuedJob.Restore(None, job.Serialize(), True, True)
380 8a3cd185 Michael Hanselmann
    self.assertTrue(newjob.archived)
381 8a3cd185 Michael Hanselmann
382 8a3cd185 Michael Hanselmann
    newjob2 = jqueue._QueuedJob.Restore(None, newjob.Serialize(), True, False)
383 8a3cd185 Michael Hanselmann
    self.assertFalse(newjob2.archived)
384 8a3cd185 Michael Hanselmann
385 8f5c488d Michael Hanselmann
  def testPriority(self):
386 8f5c488d Michael Hanselmann
    job_id = 4283
387 8f5c488d Michael Hanselmann
    ops = [
388 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
389 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
390 8f5c488d Michael Hanselmann
      ]
391 8f5c488d Michael Hanselmann
392 8f5c488d Michael Hanselmann
    def _Check(job):
393 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
394 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
395 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
396 8f5c488d Michael Hanselmann
397 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, job_id, ops, True)
398 8f5c488d Michael Hanselmann
    _Check(job)
399 8f5c488d Michael Hanselmann
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
400 8f5c488d Michael Hanselmann
                            for op in job.ops))
401 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
402 8f5c488d Michael Hanselmann
403 8f5c488d Michael Hanselmann
    # Increase first
404 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 1
405 8f5c488d Michael Hanselmann
    _Check(job)
406 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
407 8f5c488d Michael Hanselmann
408 8f5c488d Michael Hanselmann
    # Mark opcode as finished
409 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
410 8f5c488d Michael Hanselmann
    _Check(job)
411 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
412 8f5c488d Michael Hanselmann
413 8f5c488d Michael Hanselmann
    # Increase second
414 8f5c488d Michael Hanselmann
    job.ops[1].priority -= 10
415 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
416 8f5c488d Michael Hanselmann
417 8f5c488d Michael Hanselmann
    # Test increasing first
418 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
419 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 19
420 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
421 8f5c488d Michael Hanselmann
422 4679547e Michael Hanselmann
  def _JobForPriority(self, job_id):
423 4679547e Michael Hanselmann
    ops = [
424 4679547e Michael Hanselmann
      opcodes.OpTagsGet(),
425 4679547e Michael Hanselmann
      opcodes.OpTestDelay(),
426 4679547e Michael Hanselmann
      opcodes.OpTagsGet(),
427 4679547e Michael Hanselmann
      opcodes.OpTestDelay(),
428 4679547e Michael Hanselmann
      ]
429 4679547e Michael Hanselmann
430 4679547e Michael Hanselmann
    job = jqueue._QueuedJob(None, job_id, ops, True)
431 4679547e Michael Hanselmann
432 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
433 4679547e Michael Hanselmann
                               for op in job.ops))
434 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
435 4679547e Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
436 4679547e Michael Hanselmann
                                for op in job.ops))
437 4679547e Michael Hanselmann
438 4679547e Michael Hanselmann
    return job
439 4679547e Michael Hanselmann
440 4679547e Michael Hanselmann
  def testChangePriorityAllQueued(self):
441 4679547e Michael Hanselmann
    job = self._JobForPriority(24984)
442 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
443 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
444 4679547e Michael Hanselmann
                               for op in job.ops))
445 4679547e Michael Hanselmann
    result = job.ChangePriority(-10)
446 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -10)
447 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
448 3c631ea2 Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
449 3c631ea2 Michael Hanselmann
                                for op in job.ops))
450 4679547e Michael Hanselmann
    self.assertEqual(result,
451 4679547e Michael Hanselmann
                     (True, ("Priorities of pending opcodes for job 24984 have"
452 4679547e Michael Hanselmann
                             " been changed to -10")))
453 4679547e Michael Hanselmann
454 4679547e Michael Hanselmann
  def testChangePriorityAllFinished(self):
455 4679547e Michael Hanselmann
    job = self._JobForPriority(16405)
456 4679547e Michael Hanselmann
457 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
458 4679547e Michael Hanselmann
      if idx > 2:
459 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
460 4679547e Michael Hanselmann
      else:
461 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
462 4679547e Michael Hanselmann
463 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
464 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
465 4679547e Michael Hanselmann
    result = job.ChangePriority(-10)
466 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
467 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
468 4679547e Michael Hanselmann
                               for op in job.ops))
469 4679547e Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
470 4679547e Michael Hanselmann
                                for op in job.ops))
471 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
472 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
473 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
474 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
475 4679547e Michael Hanselmann
      constants.OP_STATUS_ERROR,
476 4679547e Michael Hanselmann
      ])
477 4679547e Michael Hanselmann
    self.assertEqual(result, (False, "Job 16405 is finished"))
478 4679547e Michael Hanselmann
479 4679547e Michael Hanselmann
  def testChangePriorityCancelling(self):
480 4679547e Michael Hanselmann
    job = self._JobForPriority(31572)
481 4679547e Michael Hanselmann
482 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
483 4679547e Michael Hanselmann
      if idx > 1:
484 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
485 4679547e Michael Hanselmann
      else:
486 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
487 4679547e Michael Hanselmann
488 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
489 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
490 4679547e Michael Hanselmann
    result = job.ChangePriority(5)
491 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
492 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
493 4679547e Michael Hanselmann
                               for op in job.ops))
494 4679547e Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
495 4679547e Michael Hanselmann
                                for op in job.ops))
496 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
497 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
498 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
499 4679547e Michael Hanselmann
      constants.OP_STATUS_CANCELING,
500 4679547e Michael Hanselmann
      constants.OP_STATUS_CANCELING,
501 4679547e Michael Hanselmann
      ])
502 4679547e Michael Hanselmann
    self.assertEqual(result, (False, "Job 31572 is cancelling"))
503 4679547e Michael Hanselmann
504 4679547e Michael Hanselmann
  def testChangePriorityFirstRunning(self):
505 4679547e Michael Hanselmann
    job = self._JobForPriority(1716215889)
506 4679547e Michael Hanselmann
507 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
508 4679547e Michael Hanselmann
      if idx == 0:
509 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_RUNNING
510 4679547e Michael Hanselmann
      else:
511 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_QUEUED
512 4679547e Michael Hanselmann
513 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
514 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
515 4679547e Michael Hanselmann
    result = job.ChangePriority(7)
516 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
517 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
518 4679547e Michael Hanselmann
                     [constants.OP_PRIO_DEFAULT, 7, 7, 7])
519 3c631ea2 Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
520 3c631ea2 Michael Hanselmann
                                for op in job.ops))
521 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
522 4679547e Michael Hanselmann
      constants.OP_STATUS_RUNNING,
523 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
524 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
525 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
526 4679547e Michael Hanselmann
      ])
527 4679547e Michael Hanselmann
    self.assertEqual(result,
528 4679547e Michael Hanselmann
                     (True, ("Priorities of pending opcodes for job"
529 4679547e Michael Hanselmann
                             " 1716215889 have been changed to 7")))
530 4679547e Michael Hanselmann
531 4679547e Michael Hanselmann
  def testChangePriorityLastRunning(self):
532 4679547e Michael Hanselmann
    job = self._JobForPriority(1308)
533 4679547e Michael Hanselmann
534 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
535 4679547e Michael Hanselmann
      if idx == (len(job.ops) - 1):
536 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_RUNNING
537 4679547e Michael Hanselmann
      else:
538 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
539 4679547e Michael Hanselmann
540 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
541 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
542 4679547e Michael Hanselmann
    result = job.ChangePriority(-3)
543 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
544 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
545 4679547e Michael Hanselmann
                               for op in job.ops))
546 4679547e Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
547 4679547e Michael Hanselmann
                                for op in job.ops))
548 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
549 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
550 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
551 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
552 4679547e Michael Hanselmann
      constants.OP_STATUS_RUNNING,
553 4679547e Michael Hanselmann
      ])
554 4679547e Michael Hanselmann
    self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
555 4679547e Michael Hanselmann
556 4679547e Michael Hanselmann
  def testChangePrioritySecondOpcodeRunning(self):
557 4679547e Michael Hanselmann
    job = self._JobForPriority(27701)
558 4679547e Michael Hanselmann
559 4679547e Michael Hanselmann
    self.assertEqual(len(job.ops), 4)
560 4679547e Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
561 4679547e Michael Hanselmann
    job.ops[1].status = constants.OP_STATUS_RUNNING
562 4679547e Michael Hanselmann
    job.ops[2].status = constants.OP_STATUS_QUEUED
563 4679547e Michael Hanselmann
    job.ops[3].status = constants.OP_STATUS_QUEUED
564 4679547e Michael Hanselmann
565 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
566 4679547e Michael Hanselmann
    result = job.ChangePriority(-19)
567 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -19)
568 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
569 4679547e Michael Hanselmann
                     [constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
570 4679547e Michael Hanselmann
                      -19, -19])
571 3c631ea2 Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
572 3c631ea2 Michael Hanselmann
                                for op in job.ops))
573 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
574 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
575 4679547e Michael Hanselmann
      constants.OP_STATUS_RUNNING,
576 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
577 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
578 4679547e Michael Hanselmann
      ])
579 4679547e Michael Hanselmann
    self.assertEqual(result,
580 4679547e Michael Hanselmann
                     (True, ("Priorities of pending opcodes for job"
581 4679547e Michael Hanselmann
                             " 27701 have been changed to -19")))
582 4679547e Michael Hanselmann
583 4679547e Michael Hanselmann
  def testChangePriorityWithInconsistentJob(self):
584 4679547e Michael Hanselmann
    job = self._JobForPriority(30097)
585 4679547e Michael Hanselmann
586 4679547e Michael Hanselmann
    self.assertEqual(len(job.ops), 4)
587 4679547e Michael Hanselmann
588 4679547e Michael Hanselmann
    # This job is invalid (as it has two opcodes marked as running) and make
589 4679547e Michael Hanselmann
    # the call fail because an unprocessed opcode precedes a running one (which
590 4679547e Michael Hanselmann
    # should never happen in reality)
591 4679547e Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
592 4679547e Michael Hanselmann
    job.ops[1].status = constants.OP_STATUS_RUNNING
593 4679547e Michael Hanselmann
    job.ops[2].status = constants.OP_STATUS_QUEUED
594 4679547e Michael Hanselmann
    job.ops[3].status = constants.OP_STATUS_RUNNING
595 4679547e Michael Hanselmann
596 4679547e Michael Hanselmann
    self.assertRaises(AssertionError, job.ChangePriority, 19)
597 4679547e Michael Hanselmann
598 db5bce34 Michael Hanselmann
  def testCalcStatus(self):
599 db5bce34 Michael Hanselmann
    def _Queued(ops):
600 db5bce34 Michael Hanselmann
      # The default status is "queued"
601 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
602 db5bce34 Michael Hanselmann
                              for op in ops))
603 db5bce34 Michael Hanselmann
604 db5bce34 Michael Hanselmann
    def _Waitlock1(ops):
605 47099cd1 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_WAITING
606 db5bce34 Michael Hanselmann
607 db5bce34 Michael Hanselmann
    def _Waitlock2(ops):
608 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
609 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
610 47099cd1 Michael Hanselmann
      ops[2].status = constants.OP_STATUS_WAITING
611 db5bce34 Michael Hanselmann
612 db5bce34 Michael Hanselmann
    def _Running(ops):
613 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
614 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_RUNNING
615 db5bce34 Michael Hanselmann
      for op in ops[2:]:
616 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_QUEUED
617 db5bce34 Michael Hanselmann
618 db5bce34 Michael Hanselmann
    def _Canceling1(ops):
619 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
620 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
621 db5bce34 Michael Hanselmann
      for op in ops[2:]:
622 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
623 db5bce34 Michael Hanselmann
624 db5bce34 Michael Hanselmann
    def _Canceling2(ops):
625 db5bce34 Michael Hanselmann
      for op in ops:
626 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
627 db5bce34 Michael Hanselmann
628 db5bce34 Michael Hanselmann
    def _Canceled(ops):
629 db5bce34 Michael Hanselmann
      for op in ops:
630 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELED
631 db5bce34 Michael Hanselmann
632 db5bce34 Michael Hanselmann
    def _Error1(ops):
633 db5bce34 Michael Hanselmann
      for idx, op in enumerate(ops):
634 db5bce34 Michael Hanselmann
        if idx > 3:
635 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_ERROR
636 db5bce34 Michael Hanselmann
        else:
637 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_SUCCESS
638 db5bce34 Michael Hanselmann
639 db5bce34 Michael Hanselmann
    def _Error2(ops):
640 db5bce34 Michael Hanselmann
      for op in ops:
641 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
642 db5bce34 Michael Hanselmann
643 db5bce34 Michael Hanselmann
    def _Success(ops):
644 db5bce34 Michael Hanselmann
      for op in ops:
645 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
646 db5bce34 Michael Hanselmann
647 db5bce34 Michael Hanselmann
    tests = {
648 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_QUEUED: [_Queued],
649 47099cd1 Michael Hanselmann
      constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
650 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_RUNNING: [_Running],
651 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
652 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELED: [_Canceled],
653 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
654 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_SUCCESS: [_Success],
655 db5bce34 Michael Hanselmann
      }
656 db5bce34 Michael Hanselmann
657 db5bce34 Michael Hanselmann
    def _NewJob():
658 db5bce34 Michael Hanselmann
      job = jqueue._QueuedJob(None, 1,
659 c0f6d0d8 Michael Hanselmann
                              [opcodes.OpTestDelay() for _ in range(10)],
660 c0f6d0d8 Michael Hanselmann
                              True)
661 db5bce34 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
662 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
663 db5bce34 Michael Hanselmann
                              for op in job.ops))
664 db5bce34 Michael Hanselmann
      return job
665 db5bce34 Michael Hanselmann
666 db5bce34 Michael Hanselmann
    for status in constants.JOB_STATUS_ALL:
667 db5bce34 Michael Hanselmann
      sttests = tests[status]
668 db5bce34 Michael Hanselmann
      assert sttests
669 db5bce34 Michael Hanselmann
      for fn in sttests:
670 db5bce34 Michael Hanselmann
        job = _NewJob()
671 db5bce34 Michael Hanselmann
        fn(job.ops)
672 db5bce34 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), status)
673 db5bce34 Michael Hanselmann
674 8f5c488d Michael Hanselmann
675 b95479a5 Michael Hanselmann
class _FakeDependencyManager:
676 be760ba8 Michael Hanselmann
  def __init__(self):
677 b95479a5 Michael Hanselmann
    self._checks = []
678 b95479a5 Michael Hanselmann
    self._notifications = []
679 b95479a5 Michael Hanselmann
    self._waiting = set()
680 b95479a5 Michael Hanselmann
681 b95479a5 Michael Hanselmann
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
682 b95479a5 Michael Hanselmann
    self._checks.append((job, dep_job_id, dep_status, result))
683 b95479a5 Michael Hanselmann
684 b95479a5 Michael Hanselmann
  def CountPendingResults(self):
685 b95479a5 Michael Hanselmann
    return len(self._checks)
686 b95479a5 Michael Hanselmann
687 b95479a5 Michael Hanselmann
  def CountWaitingJobs(self):
688 b95479a5 Michael Hanselmann
    return len(self._waiting)
689 b95479a5 Michael Hanselmann
690 b95479a5 Michael Hanselmann
  def GetNextNotification(self):
691 b95479a5 Michael Hanselmann
    return self._notifications.pop(0)
692 b95479a5 Michael Hanselmann
693 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
694 b95479a5 Michael Hanselmann
    return job in self._waiting
695 b95479a5 Michael Hanselmann
696 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
697 b95479a5 Michael Hanselmann
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
698 b95479a5 Michael Hanselmann
699 b95479a5 Michael Hanselmann
    assert exp_job == job
700 b95479a5 Michael Hanselmann
    assert exp_dep_job_id == dep_job_id
701 b95479a5 Michael Hanselmann
    assert exp_dep_status == dep_status
702 b95479a5 Michael Hanselmann
703 b95479a5 Michael Hanselmann
    (result_status, _) = result
704 b95479a5 Michael Hanselmann
705 b95479a5 Michael Hanselmann
    if result_status == jqueue._JobDependencyManager.WAIT:
706 b95479a5 Michael Hanselmann
      self._waiting.add(job)
707 b95479a5 Michael Hanselmann
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
708 b95479a5 Michael Hanselmann
      self._waiting.remove(job)
709 b95479a5 Michael Hanselmann
710 b95479a5 Michael Hanselmann
    return result
711 b95479a5 Michael Hanselmann
712 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
713 b95479a5 Michael Hanselmann
    self._notifications.append(job_id)
714 b95479a5 Michael Hanselmann
715 b95479a5 Michael Hanselmann
716 b95479a5 Michael Hanselmann
class _DisabledFakeDependencyManager:
717 b95479a5 Michael Hanselmann
  def JobWaiting(self, _):
718 b95479a5 Michael Hanselmann
    return False
719 b95479a5 Michael Hanselmann
720 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, *args):
721 b95479a5 Michael Hanselmann
    assert False, "Should not be called"
722 b95479a5 Michael Hanselmann
723 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, _):
724 b95479a5 Michael Hanselmann
    pass
725 b95479a5 Michael Hanselmann
726 b95479a5 Michael Hanselmann
727 b95479a5 Michael Hanselmann
class _FakeQueueForProc:
728 b95479a5 Michael Hanselmann
  def __init__(self, depmgr=None):
729 be760ba8 Michael Hanselmann
    self._acquired = False
730 ebb2a2a3 Michael Hanselmann
    self._updates = []
731 6a373640 Michael Hanselmann
    self._submitted = []
732 942e2262 Michael Hanselmann
    self._accepting_jobs = True
733 6a373640 Michael Hanselmann
734 6a373640 Michael Hanselmann
    self._submit_count = itertools.count(1000)
735 be760ba8 Michael Hanselmann
736 b95479a5 Michael Hanselmann
    if depmgr:
737 b95479a5 Michael Hanselmann
      self.depmgr = depmgr
738 b95479a5 Michael Hanselmann
    else:
739 b95479a5 Michael Hanselmann
      self.depmgr = _DisabledFakeDependencyManager()
740 b95479a5 Michael Hanselmann
741 be760ba8 Michael Hanselmann
  def IsAcquired(self):
742 be760ba8 Michael Hanselmann
    return self._acquired
743 be760ba8 Michael Hanselmann
744 ebb2a2a3 Michael Hanselmann
  def GetNextUpdate(self):
745 ebb2a2a3 Michael Hanselmann
    return self._updates.pop(0)
746 ebb2a2a3 Michael Hanselmann
747 6a373640 Michael Hanselmann
  def GetNextSubmittedJob(self):
748 6a373640 Michael Hanselmann
    return self._submitted.pop(0)
749 6a373640 Michael Hanselmann
750 be760ba8 Michael Hanselmann
  def acquire(self, shared=0):
751 be760ba8 Michael Hanselmann
    assert shared == 1
752 be760ba8 Michael Hanselmann
    self._acquired = True
753 be760ba8 Michael Hanselmann
754 be760ba8 Michael Hanselmann
  def release(self):
755 be760ba8 Michael Hanselmann
    assert self._acquired
756 be760ba8 Michael Hanselmann
    self._acquired = False
757 be760ba8 Michael Hanselmann
758 ebb2a2a3 Michael Hanselmann
  def UpdateJobUnlocked(self, job, replicate=True):
759 ebb2a2a3 Michael Hanselmann
    assert self._acquired, "Lock not acquired while updating job"
760 ebb2a2a3 Michael Hanselmann
    self._updates.append((job, bool(replicate)))
761 be760ba8 Michael Hanselmann
762 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
763 6a373640 Michael Hanselmann
    assert not self._acquired, "Lock acquired while submitting jobs"
764 6a373640 Michael Hanselmann
    job_ids = [self._submit_count.next() for _ in jobs]
765 6a373640 Michael Hanselmann
    self._submitted.extend(zip(job_ids, jobs))
766 6a373640 Michael Hanselmann
    return job_ids
767 6a373640 Michael Hanselmann
768 942e2262 Michael Hanselmann
  def StopAcceptingJobs(self):
769 942e2262 Michael Hanselmann
    self._accepting_jobs = False
770 942e2262 Michael Hanselmann
771 942e2262 Michael Hanselmann
  def AcceptingJobsUnlocked(self):
772 942e2262 Michael Hanselmann
    return self._accepting_jobs
773 942e2262 Michael Hanselmann
774 be760ba8 Michael Hanselmann
775 be760ba8 Michael Hanselmann
class _FakeExecOpCodeForProc:
776 ebb2a2a3 Michael Hanselmann
  def __init__(self, queue, before_start, after_start):
777 ebb2a2a3 Michael Hanselmann
    self._queue = queue
778 be760ba8 Michael Hanselmann
    self._before_start = before_start
779 be760ba8 Michael Hanselmann
    self._after_start = after_start
780 be760ba8 Michael Hanselmann
781 e4e59de8 Michael Hanselmann
  def __call__(self, op, cbs, timeout=None):
782 be760ba8 Michael Hanselmann
    assert isinstance(op, opcodes.OpTestDummy)
783 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired(), \
784 ebb2a2a3 Michael Hanselmann
           "Queue lock not released when executing opcode"
785 be760ba8 Michael Hanselmann
786 be760ba8 Michael Hanselmann
    if self._before_start:
787 e4e59de8 Michael Hanselmann
      self._before_start(timeout, cbs.CurrentPriority())
788 be760ba8 Michael Hanselmann
789 be760ba8 Michael Hanselmann
    cbs.NotifyStart()
790 be760ba8 Michael Hanselmann
791 be760ba8 Michael Hanselmann
    if self._after_start:
792 be760ba8 Michael Hanselmann
      self._after_start(op, cbs)
793 be760ba8 Michael Hanselmann
794 ebb2a2a3 Michael Hanselmann
    # Check again after the callbacks
795 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired()
796 ebb2a2a3 Michael Hanselmann
797 be760ba8 Michael Hanselmann
    if op.fail:
798 be760ba8 Michael Hanselmann
      raise errors.OpExecError("Error requested (%s)" % op.result)
799 be760ba8 Michael Hanselmann
800 6a373640 Michael Hanselmann
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
801 6a373640 Michael Hanselmann
      return cbs.SubmitManyJobs(op.submit_jobs)
802 6a373640 Michael Hanselmann
803 be760ba8 Michael Hanselmann
    return op.result
804 be760ba8 Michael Hanselmann
805 be760ba8 Michael Hanselmann
806 26d3fd2f Michael Hanselmann
class _JobProcessorTestUtils:
807 be760ba8 Michael Hanselmann
  def _CreateJob(self, queue, job_id, ops):
808 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(queue, job_id, ops, True)
809 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
810 be760ba8 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
811 be760ba8 Michael Hanselmann
    self.assertEqual(len(ops), len(job.ops))
812 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.input == inp
813 be760ba8 Michael Hanselmann
                            for (op, inp) in zip(job.ops, ops)))
814 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
815 be760ba8 Michael Hanselmann
    return job
816 be760ba8 Michael Hanselmann
817 26d3fd2f Michael Hanselmann
818 26d3fd2f Michael Hanselmann
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
819 be760ba8 Michael Hanselmann
  def _GenericCheckJob(self, job):
820 be760ba8 Michael Hanselmann
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
821 be760ba8 Michael Hanselmann
                      for op in job.ops)
822 be760ba8 Michael Hanselmann
823 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
824 be760ba8 Michael Hanselmann
                     [[op.start_timestamp for op in job.ops],
825 be760ba8 Michael Hanselmann
                      [op.exec_timestamp for op in job.ops],
826 be760ba8 Michael Hanselmann
                      [op.end_timestamp for op in job.ops]])
827 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
828 be760ba8 Michael Hanselmann
                     [job.received_timestamp,
829 be760ba8 Michael Hanselmann
                      job.start_timestamp,
830 be760ba8 Michael Hanselmann
                      job.end_timestamp])
831 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
832 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
833 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
834 be760ba8 Michael Hanselmann
835 be760ba8 Michael Hanselmann
  def testSuccess(self):
836 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
837 be760ba8 Michael Hanselmann
838 be760ba8 Michael Hanselmann
    for (job_id, opcount) in [(25351, 1), (6637, 3),
839 be760ba8 Michael Hanselmann
                              (24644, 10), (32207, 100)]:
840 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
841 be760ba8 Michael Hanselmann
             for i in range(opcount)]
842 be760ba8 Michael Hanselmann
843 be760ba8 Michael Hanselmann
      # Create job
844 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
845 be760ba8 Michael Hanselmann
846 f23db633 Michael Hanselmann
      def _BeforeStart(timeout, priority):
847 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
848 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
849 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
850 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
851 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
852 be760ba8 Michael Hanselmann
853 be760ba8 Michael Hanselmann
      def _AfterStart(op, cbs):
854 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
855 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
856 ebb2a2a3 Michael Hanselmann
857 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
858 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
859 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
860 be760ba8 Michael Hanselmann
861 be760ba8 Michael Hanselmann
        # Job is running, cancelling shouldn't be possible
862 be760ba8 Michael Hanselmann
        (success, _) = job.Cancel()
863 be760ba8 Michael Hanselmann
        self.assertFalse(success)
864 be760ba8 Michael Hanselmann
865 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
866 be760ba8 Michael Hanselmann
867 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
868 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
869 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
870 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
871 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
872 be760ba8 Michael Hanselmann
        if idx == len(ops) - 1:
873 be760ba8 Michael Hanselmann
          # Last opcode
874 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
875 be760ba8 Michael Hanselmann
        else:
876 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.DEFER)
877 be760ba8 Michael Hanselmann
878 be760ba8 Michael Hanselmann
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
879 be760ba8 Michael Hanselmann
          self.assert_(job.start_timestamp)
880 be760ba8 Michael Hanselmann
          self.assertFalse(job.end_timestamp)
881 be760ba8 Michael Hanselmann
882 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
883 ebb2a2a3 Michael Hanselmann
884 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
885 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
886 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opresult"]),
887 be760ba8 Michael Hanselmann
                       [[op.input.result for op in job.ops]])
888 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
889 be760ba8 Michael Hanselmann
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
890 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
891 be760ba8 Michael Hanselmann
                              for op in job.ops))
892 be760ba8 Michael Hanselmann
893 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
894 be760ba8 Michael Hanselmann
895 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
896 75d81fc8 Michael Hanselmann
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
897 75d81fc8 Michael Hanselmann
                       jqueue._JobProcessor.FINISHED)
898 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
899 be760ba8 Michael Hanselmann
900 be760ba8 Michael Hanselmann
  def testOpcodeError(self):
901 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
902 be760ba8 Michael Hanselmann
903 be760ba8 Michael Hanselmann
    testdata = [
904 be760ba8 Michael Hanselmann
      (17077, 1, 0, 0),
905 be760ba8 Michael Hanselmann
      (1782, 5, 2, 2),
906 be760ba8 Michael Hanselmann
      (18179, 10, 9, 9),
907 be760ba8 Michael Hanselmann
      (4744, 10, 3, 8),
908 be760ba8 Michael Hanselmann
      (23816, 100, 39, 45),
909 be760ba8 Michael Hanselmann
      ]
910 be760ba8 Michael Hanselmann
911 be760ba8 Michael Hanselmann
    for (job_id, opcount, failfrom, failto) in testdata:
912 be760ba8 Michael Hanselmann
      # Prepare opcodes
913 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
914 be760ba8 Michael Hanselmann
                                 fail=(failfrom <= i and
915 be760ba8 Michael Hanselmann
                                       i <= failto))
916 be760ba8 Michael Hanselmann
             for i in range(opcount)]
917 be760ba8 Michael Hanselmann
918 be760ba8 Michael Hanselmann
      # Create job
919 a06c6ae8 Michael Hanselmann
      job = self._CreateJob(queue, str(job_id), ops)
920 be760ba8 Michael Hanselmann
921 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, None, None)
922 be760ba8 Michael Hanselmann
923 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
924 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
925 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
926 ebb2a2a3 Michael Hanselmann
        # queued to waitlock
927 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
928 ebb2a2a3 Michael Hanselmann
        # waitlock to running
929 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
930 ebb2a2a3 Michael Hanselmann
        # Opcode result
931 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
932 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
933 be760ba8 Michael Hanselmann
934 be760ba8 Michael Hanselmann
        if idx in (failfrom, len(ops) - 1):
935 be760ba8 Michael Hanselmann
          # Last opcode
936 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
937 be760ba8 Michael Hanselmann
          break
938 be760ba8 Michael Hanselmann
939 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
940 be760ba8 Michael Hanselmann
941 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
942 be760ba8 Michael Hanselmann
943 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
944 ebb2a2a3 Michael Hanselmann
945 be760ba8 Michael Hanselmann
      # Check job status
946 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
947 76b62028 Iustin Pop
      self.assertEqual(job.GetInfo(["id"]), [job_id])
948 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
949 be760ba8 Michael Hanselmann
950 be760ba8 Michael Hanselmann
      # Check opcode status
951 be760ba8 Michael Hanselmann
      data = zip(job.ops,
952 be760ba8 Michael Hanselmann
                 job.GetInfo(["opstatus"])[0],
953 be760ba8 Michael Hanselmann
                 job.GetInfo(["opresult"])[0])
954 be760ba8 Michael Hanselmann
955 be760ba8 Michael Hanselmann
      for idx, (op, opstatus, opresult) in enumerate(data):
956 be760ba8 Michael Hanselmann
        if idx < failfrom:
957 be760ba8 Michael Hanselmann
          assert not op.input.fail
958 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
959 be760ba8 Michael Hanselmann
          self.assertEqual(opresult, op.input.result)
960 be760ba8 Michael Hanselmann
        elif idx <= failto:
961 be760ba8 Michael Hanselmann
          assert op.input.fail
962 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
963 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
964 be760ba8 Michael Hanselmann
        else:
965 be760ba8 Michael Hanselmann
          assert not op.input.fail
966 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
967 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
968 be760ba8 Michael Hanselmann
969 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
970 be760ba8 Michael Hanselmann
                              for op in job.ops[:failfrom]))
971 be760ba8 Michael Hanselmann
972 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
973 be760ba8 Michael Hanselmann
974 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
975 75d81fc8 Michael Hanselmann
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
976 75d81fc8 Michael Hanselmann
                       jqueue._JobProcessor.FINISHED)
977 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
978 be760ba8 Michael Hanselmann
979 be760ba8 Michael Hanselmann
  def testCancelWhileInQueue(self):
980 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
981 be760ba8 Michael Hanselmann
982 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
983 be760ba8 Michael Hanselmann
           for i in range(5)]
984 be760ba8 Michael Hanselmann
985 be760ba8 Michael Hanselmann
    # Create job
986 be760ba8 Michael Hanselmann
    job_id = 17045
987 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
988 be760ba8 Michael Hanselmann
989 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
990 be760ba8 Michael Hanselmann
991 be760ba8 Michael Hanselmann
    # Mark as cancelled
992 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
993 be760ba8 Michael Hanselmann
    self.assert_(success)
994 be760ba8 Michael Hanselmann
995 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
996 ebb2a2a3 Michael Hanselmann
997 66bd7445 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
998 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
999 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
1000 be760ba8 Michael Hanselmann
                            for op in job.ops))
1001 be760ba8 Michael Hanselmann
1002 66bd7445 Michael Hanselmann
    # Serialize to check for differences
1003 66bd7445 Michael Hanselmann
    before_proc = job.Serialize()
1004 66bd7445 Michael Hanselmann
1005 66bd7445 Michael Hanselmann
    # Simulate processor called in workerpool
1006 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1007 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1008 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1009 30c945d0 Michael Hanselmann
1010 30c945d0 Michael Hanselmann
    # Check result
1011 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1012 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1013 30c945d0 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
1014 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
1015 30c945d0 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1016 30c945d0 Michael Hanselmann
                                for op in job.ops))
1017 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1018 30c945d0 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1019 30c945d0 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1020 30c945d0 Michael Hanselmann
1021 66bd7445 Michael Hanselmann
    # Must not have changed or written
1022 66bd7445 Michael Hanselmann
    self.assertEqual(before_proc, job.Serialize())
1023 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1024 66bd7445 Michael Hanselmann
1025 30c945d0 Michael Hanselmann
  def testCancelWhileWaitlockInQueue(self):
1026 30c945d0 Michael Hanselmann
    queue = _FakeQueueForProc()
1027 30c945d0 Michael Hanselmann
1028 30c945d0 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1029 30c945d0 Michael Hanselmann
           for i in range(5)]
1030 30c945d0 Michael Hanselmann
1031 30c945d0 Michael Hanselmann
    # Create job
1032 30c945d0 Michael Hanselmann
    job_id = 8645
1033 30c945d0 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1034 30c945d0 Michael Hanselmann
1035 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1036 30c945d0 Michael Hanselmann
1037 47099cd1 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_WAITING
1038 30c945d0 Michael Hanselmann
1039 30c945d0 Michael Hanselmann
    assert len(job.ops) == 5
1040 30c945d0 Michael Hanselmann
1041 47099cd1 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1042 30c945d0 Michael Hanselmann
1043 30c945d0 Michael Hanselmann
    # Mark as cancelling
1044 30c945d0 Michael Hanselmann
    (success, _) = job.Cancel()
1045 30c945d0 Michael Hanselmann
    self.assert_(success)
1046 30c945d0 Michael Hanselmann
1047 30c945d0 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1048 30c945d0 Michael Hanselmann
1049 30c945d0 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1050 30c945d0 Michael Hanselmann
                            for op in job.ops))
1051 30c945d0 Michael Hanselmann
1052 30c945d0 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1053 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1054 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1055 be760ba8 Michael Hanselmann
1056 be760ba8 Michael Hanselmann
    # Check result
1057 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1058 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1059 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
1060 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
1061 be760ba8 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1062 be760ba8 Michael Hanselmann
                                for op in job.ops))
1063 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1064 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1065 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1066 be760ba8 Michael Hanselmann
1067 be760ba8 Michael Hanselmann
  def testCancelWhileWaitlock(self):
1068 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1069 be760ba8 Michael Hanselmann
1070 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1071 be760ba8 Michael Hanselmann
           for i in range(5)]
1072 be760ba8 Michael Hanselmann
1073 be760ba8 Michael Hanselmann
    # Create job
1074 be760ba8 Michael Hanselmann
    job_id = 11009
1075 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1076 be760ba8 Michael Hanselmann
1077 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1078 be760ba8 Michael Hanselmann
1079 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1080 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1081 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1082 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1083 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1084 be760ba8 Michael Hanselmann
1085 be760ba8 Michael Hanselmann
      # Mark as cancelled
1086 be760ba8 Michael Hanselmann
      (success, _) = job.Cancel()
1087 be760ba8 Michael Hanselmann
      self.assert_(success)
1088 be760ba8 Michael Hanselmann
1089 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1090 be760ba8 Michael Hanselmann
                              for op in job.ops))
1091 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1092 be760ba8 Michael Hanselmann
1093 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
1094 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1095 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1096 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1097 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1098 be760ba8 Michael Hanselmann
1099 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1100 be760ba8 Michael Hanselmann
1101 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1102 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1103 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1104 ebb2a2a3 Michael Hanselmann
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1105 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1106 be760ba8 Michael Hanselmann
1107 be760ba8 Michael Hanselmann
    # Check result
1108 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1109 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1110 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
1111 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
1112 be760ba8 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1113 be760ba8 Michael Hanselmann
                                for op in job.ops))
1114 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1115 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1116 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1117 be760ba8 Michael Hanselmann
1118 942e2262 Michael Hanselmann
  def _TestCancelWhileSomething(self, cb):
1119 9e49dfc5 Michael Hanselmann
    queue = _FakeQueueForProc()
1120 9e49dfc5 Michael Hanselmann
1121 9e49dfc5 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1122 9e49dfc5 Michael Hanselmann
           for i in range(5)]
1123 9e49dfc5 Michael Hanselmann
1124 9e49dfc5 Michael Hanselmann
    # Create job
1125 9e49dfc5 Michael Hanselmann
    job_id = 24314
1126 9e49dfc5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1127 9e49dfc5 Michael Hanselmann
1128 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1129 9e49dfc5 Michael Hanselmann
1130 9e49dfc5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1131 9e49dfc5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1132 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1133 9e49dfc5 Michael Hanselmann
1134 9e49dfc5 Michael Hanselmann
      # Mark as cancelled
1135 9e49dfc5 Michael Hanselmann
      (success, _) = job.Cancel()
1136 9e49dfc5 Michael Hanselmann
      self.assert_(success)
1137 9e49dfc5 Michael Hanselmann
1138 9e49dfc5 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1139 9e49dfc5 Michael Hanselmann
                              for op in job.ops))
1140 9e49dfc5 Michael Hanselmann
1141 942e2262 Michael Hanselmann
      cb(queue)
1142 9e49dfc5 Michael Hanselmann
1143 9e49dfc5 Michael Hanselmann
    def _AfterStart(op, cbs):
1144 9e49dfc5 Michael Hanselmann
      self.fail("Should not reach this")
1145 9e49dfc5 Michael Hanselmann
1146 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1147 9e49dfc5 Michael Hanselmann
1148 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1149 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1150 9e49dfc5 Michael Hanselmann
1151 9e49dfc5 Michael Hanselmann
    # Check result
1152 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1153 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1154 9e49dfc5 Michael Hanselmann
    self.assert_(job.start_timestamp)
1155 9e49dfc5 Michael Hanselmann
    self.assert_(job.end_timestamp)
1156 9e49dfc5 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1157 9e49dfc5 Michael Hanselmann
                                for op in job.ops))
1158 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1159 9e49dfc5 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1160 9e49dfc5 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1161 9e49dfc5 Michael Hanselmann
1162 942e2262 Michael Hanselmann
    return queue
1163 942e2262 Michael Hanselmann
1164 942e2262 Michael Hanselmann
  def testCancelWhileWaitlockWithTimeout(self):
1165 942e2262 Michael Hanselmann
    def fn(_):
1166 942e2262 Michael Hanselmann
      # Fake an acquire attempt timing out
1167 942e2262 Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
1168 942e2262 Michael Hanselmann
1169 942e2262 Michael Hanselmann
    self._TestCancelWhileSomething(fn)
1170 942e2262 Michael Hanselmann
1171 942e2262 Michael Hanselmann
  def testCancelDuringQueueShutdown(self):
1172 942e2262 Michael Hanselmann
    queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs())
1173 942e2262 Michael Hanselmann
    self.assertFalse(queue.AcceptingJobsUnlocked())
1174 942e2262 Michael Hanselmann
1175 be760ba8 Michael Hanselmann
  def testCancelWhileRunning(self):
1176 be760ba8 Michael Hanselmann
    # Tests canceling a job with finished opcodes and more, unprocessed ones
1177 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1178 be760ba8 Michael Hanselmann
1179 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1180 be760ba8 Michael Hanselmann
           for i in range(3)]
1181 be760ba8 Michael Hanselmann
1182 be760ba8 Michael Hanselmann
    # Create job
1183 76b62028 Iustin Pop
    job_id = 28492
1184 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1185 be760ba8 Michael Hanselmann
1186 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1187 be760ba8 Michael Hanselmann
1188 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1189 be760ba8 Michael Hanselmann
1190 be760ba8 Michael Hanselmann
    # Run one opcode
1191 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1192 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1193 be760ba8 Michael Hanselmann
1194 be760ba8 Michael Hanselmann
    # Job goes back to queued
1195 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1196 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1197 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1198 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
1199 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1200 be760ba8 Michael Hanselmann
                      ["Res0", None, None]])
1201 be760ba8 Michael Hanselmann
1202 be760ba8 Michael Hanselmann
    # Mark as cancelled
1203 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
1204 be760ba8 Michael Hanselmann
    self.assert_(success)
1205 be760ba8 Michael Hanselmann
1206 be760ba8 Michael Hanselmann
    # Try processing another opcode (this will actually cancel the job)
1207 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1208 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1209 be760ba8 Michael Hanselmann
1210 be760ba8 Michael Hanselmann
    # Check result
1211 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1212 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
1213 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1214 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1215 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1216 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
1217 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
1218 be760ba8 Michael Hanselmann
                      ["Res0", "Job canceled by request",
1219 be760ba8 Michael Hanselmann
                       "Job canceled by request"]])
1220 be760ba8 Michael Hanselmann
1221 942e2262 Michael Hanselmann
  def _TestQueueShutdown(self, queue, opexec, job, runcount):
1222 942e2262 Michael Hanselmann
    self.assertTrue(queue.AcceptingJobsUnlocked())
1223 942e2262 Michael Hanselmann
1224 942e2262 Michael Hanselmann
    # Simulate shutdown
1225 942e2262 Michael Hanselmann
    queue.StopAcceptingJobs()
1226 942e2262 Michael Hanselmann
1227 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1228 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1229 942e2262 Michael Hanselmann
1230 942e2262 Michael Hanselmann
    # Check result
1231 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1232 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
1233 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1234 942e2262 Michael Hanselmann
    self.assertTrue(job.start_timestamp)
1235 942e2262 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
1236 942e2262 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1237 942e2262 Michael Hanselmann
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1238 942e2262 Michael Hanselmann
                               for op in job.ops[:runcount]))
1239 942e2262 Michael Hanselmann
    self.assertFalse(job.ops[runcount].end_timestamp)
1240 942e2262 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1241 942e2262 Michael Hanselmann
                                for op in job.ops[(runcount + 1):]))
1242 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1243 942e2262 Michael Hanselmann
                     [(([constants.OP_STATUS_SUCCESS] * runcount) +
1244 942e2262 Michael Hanselmann
                       ([constants.OP_STATUS_QUEUED] *
1245 942e2262 Michael Hanselmann
                        (len(job.ops) - runcount))),
1246 942e2262 Michael Hanselmann
                      (["Res%s" % i for i in range(runcount)] +
1247 942e2262 Michael Hanselmann
                       ([None] * (len(job.ops) - runcount)))])
1248 942e2262 Michael Hanselmann
1249 942e2262 Michael Hanselmann
    # Must have been written and replicated
1250 942e2262 Michael Hanselmann
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1251 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1252 942e2262 Michael Hanselmann
1253 942e2262 Michael Hanselmann
  def testQueueShutdownWhileRunning(self):
1254 942e2262 Michael Hanselmann
    # Tests shutting down the queue while a job is running
1255 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1256 942e2262 Michael Hanselmann
1257 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1258 942e2262 Michael Hanselmann
           for i in range(3)]
1259 942e2262 Michael Hanselmann
1260 942e2262 Michael Hanselmann
    # Create job
1261 942e2262 Michael Hanselmann
    job_id = 2718211587
1262 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1263 942e2262 Michael Hanselmann
1264 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1265 942e2262 Michael Hanselmann
1266 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1267 942e2262 Michael Hanselmann
1268 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1269 942e2262 Michael Hanselmann
1270 942e2262 Michael Hanselmann
    # Run one opcode
1271 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1272 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1273 942e2262 Michael Hanselmann
1274 942e2262 Michael Hanselmann
    # Job goes back to queued
1275 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1276 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1277 942e2262 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1278 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
1279 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1280 942e2262 Michael Hanselmann
                      ["Res0", None, None]])
1281 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1282 942e2262 Michael Hanselmann
1283 942e2262 Michael Hanselmann
    # Writes for waiting, running and result
1284 942e2262 Michael Hanselmann
    for _ in range(3):
1285 942e2262 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1286 942e2262 Michael Hanselmann
1287 942e2262 Michael Hanselmann
    # Run second opcode
1288 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1289 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1290 942e2262 Michael Hanselmann
1291 942e2262 Michael Hanselmann
    # Job goes back to queued
1292 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1293 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1294 942e2262 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1295 942e2262 Michael Hanselmann
                       constants.OP_STATUS_SUCCESS,
1296 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1297 942e2262 Michael Hanselmann
                      ["Res0", "Res1", None]])
1298 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1299 942e2262 Michael Hanselmann
1300 942e2262 Michael Hanselmann
    # Writes for waiting, running and result
1301 942e2262 Michael Hanselmann
    for _ in range(3):
1302 942e2262 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1303 942e2262 Michael Hanselmann
1304 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 2)
1305 942e2262 Michael Hanselmann
1306 942e2262 Michael Hanselmann
  def testQueueShutdownWithLockTimeout(self):
1307 942e2262 Michael Hanselmann
    # Tests shutting down while a lock acquire times out
1308 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1309 942e2262 Michael Hanselmann
1310 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1311 942e2262 Michael Hanselmann
           for i in range(3)]
1312 942e2262 Michael Hanselmann
1313 942e2262 Michael Hanselmann
    # Create job
1314 942e2262 Michael Hanselmann
    job_id = 1304231178
1315 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1316 942e2262 Michael Hanselmann
1317 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1318 942e2262 Michael Hanselmann
1319 942e2262 Michael Hanselmann
    acquire_timeout = False
1320 942e2262 Michael Hanselmann
1321 942e2262 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1322 942e2262 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1323 942e2262 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1324 942e2262 Michael Hanselmann
      if acquire_timeout:
1325 942e2262 Michael Hanselmann
        raise mcpu.LockAcquireTimeout()
1326 942e2262 Michael Hanselmann
1327 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None)
1328 942e2262 Michael Hanselmann
1329 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1330 942e2262 Michael Hanselmann
1331 942e2262 Michael Hanselmann
    # Run one opcode
1332 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1333 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1334 942e2262 Michael Hanselmann
1335 942e2262 Michael Hanselmann
    # Job goes back to queued
1336 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1337 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1338 942e2262 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1339 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
1340 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1341 942e2262 Michael Hanselmann
                      ["Res0", None, None]])
1342 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1343 942e2262 Michael Hanselmann
1344 942e2262 Michael Hanselmann
    # Writes for waiting, running and result
1345 942e2262 Michael Hanselmann
    for _ in range(3):
1346 942e2262 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1347 942e2262 Michael Hanselmann
1348 942e2262 Michael Hanselmann
    # The next opcode should have expiring lock acquires
1349 942e2262 Michael Hanselmann
    acquire_timeout = True
1350 942e2262 Michael Hanselmann
1351 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 1)
1352 942e2262 Michael Hanselmann
1353 942e2262 Michael Hanselmann
  def testQueueShutdownWhileInQueue(self):
1354 942e2262 Michael Hanselmann
    # This should never happen in reality (no new jobs are started by the
1355 942e2262 Michael Hanselmann
    # workerpool once a shutdown has been initiated), but it's better to test
1356 942e2262 Michael Hanselmann
    # the job processor for this scenario
1357 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1358 942e2262 Michael Hanselmann
1359 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1360 942e2262 Michael Hanselmann
           for i in range(5)]
1361 942e2262 Michael Hanselmann
1362 942e2262 Michael Hanselmann
    # Create job
1363 942e2262 Michael Hanselmann
    job_id = 2031
1364 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1365 942e2262 Michael Hanselmann
1366 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1367 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1368 942e2262 Michael Hanselmann
1369 942e2262 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
1370 942e2262 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
1371 942e2262 Michael Hanselmann
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
1372 942e2262 Michael Hanselmann
                               for op in job.ops))
1373 942e2262 Michael Hanselmann
1374 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1375 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 0)
1376 942e2262 Michael Hanselmann
1377 942e2262 Michael Hanselmann
  def testQueueShutdownWhileWaitlockInQueue(self):
1378 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1379 942e2262 Michael Hanselmann
1380 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1381 942e2262 Michael Hanselmann
           for i in range(5)]
1382 942e2262 Michael Hanselmann
1383 942e2262 Michael Hanselmann
    # Create job
1384 942e2262 Michael Hanselmann
    job_id = 53125685
1385 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1386 942e2262 Michael Hanselmann
1387 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1388 942e2262 Michael Hanselmann
1389 942e2262 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_WAITING
1390 942e2262 Michael Hanselmann
1391 942e2262 Michael Hanselmann
    assert len(job.ops) == 5
1392 942e2262 Michael Hanselmann
1393 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1394 942e2262 Michael Hanselmann
1395 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1396 942e2262 Michael Hanselmann
1397 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1398 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 0)
1399 942e2262 Michael Hanselmann
1400 be760ba8 Michael Hanselmann
  def testPartiallyRun(self):
1401 be760ba8 Michael Hanselmann
    # Tests calling the processor on a job that's been partially run before the
1402 be760ba8 Michael Hanselmann
    # program was restarted
1403 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1404 be760ba8 Michael Hanselmann
1405 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1406 be760ba8 Michael Hanselmann
1407 be760ba8 Michael Hanselmann
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
1408 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1409 be760ba8 Michael Hanselmann
             for i in range(10)]
1410 be760ba8 Michael Hanselmann
1411 be760ba8 Michael Hanselmann
      # Create job
1412 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
1413 be760ba8 Michael Hanselmann
1414 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1415 be760ba8 Michael Hanselmann
1416 be760ba8 Michael Hanselmann
      for _ in range(successcount):
1417 75d81fc8 Michael Hanselmann
        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1418 75d81fc8 Michael Hanselmann
                         jqueue._JobProcessor.DEFER)
1419 be760ba8 Michael Hanselmann
1420 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1421 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
1422 be760ba8 Michael Hanselmann
                       [[constants.OP_STATUS_SUCCESS
1423 be760ba8 Michael Hanselmann
                         for _ in range(successcount)] +
1424 be760ba8 Michael Hanselmann
                        [constants.OP_STATUS_QUEUED
1425 be760ba8 Michael Hanselmann
                         for _ in range(len(ops) - successcount)]])
1426 be760ba8 Michael Hanselmann
1427 03b63608 Michael Hanselmann
      self.assert_(job.ops_iter)
1428 be760ba8 Michael Hanselmann
1429 be760ba8 Michael Hanselmann
      # Serialize and restore (simulates program restart)
1430 8a3cd185 Michael Hanselmann
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1431 03b63608 Michael Hanselmann
      self.assertFalse(newjob.ops_iter)
1432 be760ba8 Michael Hanselmann
      self._TestPartial(newjob, successcount)
1433 be760ba8 Michael Hanselmann
1434 be760ba8 Michael Hanselmann
  def _TestPartial(self, job, successcount):
1435 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1436 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1437 be760ba8 Michael Hanselmann
1438 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1439 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1440 be760ba8 Michael Hanselmann
1441 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops) - successcount)):
1442 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1443 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1444 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1445 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1446 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1447 be760ba8 Michael Hanselmann
1448 be760ba8 Michael Hanselmann
      if remaining == 0:
1449 be760ba8 Michael Hanselmann
        # Last opcode
1450 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1451 be760ba8 Michael Hanselmann
        break
1452 be760ba8 Michael Hanselmann
1453 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1454 be760ba8 Michael Hanselmann
1455 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1456 be760ba8 Michael Hanselmann
1457 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1458 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1459 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1460 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1461 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1462 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1463 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1464 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1465 be760ba8 Michael Hanselmann
                            for op in job.ops))
1466 be760ba8 Michael Hanselmann
1467 be760ba8 Michael Hanselmann
    self._GenericCheckJob(job)
1468 be760ba8 Michael Hanselmann
1469 66bd7445 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1470 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1471 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1472 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1473 be760ba8 Michael Hanselmann
1474 be760ba8 Michael Hanselmann
    # ... also after being restored
1475 8a3cd185 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1476 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1477 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1478 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1479 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1480 be760ba8 Michael Hanselmann
1481 be760ba8 Michael Hanselmann
  def testProcessorOnRunningJob(self):
1482 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1483 be760ba8 Michael Hanselmann
1484 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1485 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1486 be760ba8 Michael Hanselmann
1487 be760ba8 Michael Hanselmann
    # Create job
1488 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 9571, ops)
1489 be760ba8 Michael Hanselmann
1490 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1491 be760ba8 Michael Hanselmann
1492 be760ba8 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
1493 be760ba8 Michael Hanselmann
1494 be760ba8 Michael Hanselmann
    assert len(job.ops) == 1
1495 be760ba8 Michael Hanselmann
1496 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1497 be760ba8 Michael Hanselmann
1498 be760ba8 Michael Hanselmann
    # Calling on running job must fail
1499 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
1500 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job))
1501 be760ba8 Michael Hanselmann
1502 be760ba8 Michael Hanselmann
  def testLogMessages(self):
1503 be760ba8 Michael Hanselmann
    # Tests the "Feedback" callback function
1504 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1505 be760ba8 Michael Hanselmann
1506 be760ba8 Michael Hanselmann
    messages = {
1507 be760ba8 Michael Hanselmann
      1: [
1508 be760ba8 Michael Hanselmann
        (None, "Hello"),
1509 be760ba8 Michael Hanselmann
        (None, "World"),
1510 be760ba8 Michael Hanselmann
        (constants.ELOG_MESSAGE, "there"),
1511 be760ba8 Michael Hanselmann
        ],
1512 be760ba8 Michael Hanselmann
      4: [
1513 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1514 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1515 be760ba8 Michael Hanselmann
        ],
1516 be760ba8 Michael Hanselmann
      }
1517 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1518 be760ba8 Michael Hanselmann
                               messages=messages.get(i, []))
1519 be760ba8 Michael Hanselmann
           for i in range(5)]
1520 be760ba8 Michael Hanselmann
1521 be760ba8 Michael Hanselmann
    # Create job
1522 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 29386, ops)
1523 be760ba8 Michael Hanselmann
1524 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1525 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1526 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1527 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1528 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1529 be760ba8 Michael Hanselmann
1530 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
1531 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1532 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1533 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1534 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1535 be760ba8 Michael Hanselmann
1536 be760ba8 Michael Hanselmann
      self.assertRaises(AssertionError, cbs.Feedback,
1537 be760ba8 Michael Hanselmann
                        "too", "many", "arguments")
1538 be760ba8 Michael Hanselmann
1539 be760ba8 Michael Hanselmann
      for (log_type, msg) in op.messages:
1540 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1541 be760ba8 Michael Hanselmann
        if log_type:
1542 be760ba8 Michael Hanselmann
          cbs.Feedback(log_type, msg)
1543 be760ba8 Michael Hanselmann
        else:
1544 be760ba8 Michael Hanselmann
          cbs.Feedback(msg)
1545 ebb2a2a3 Michael Hanselmann
        # Check for job update without replication
1546 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1547 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1548 be760ba8 Michael Hanselmann
1549 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1550 be760ba8 Michael Hanselmann
1551 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops))):
1552 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1553 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1554 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1555 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1556 be760ba8 Michael Hanselmann
1557 be760ba8 Michael Hanselmann
      if remaining == 0:
1558 be760ba8 Michael Hanselmann
        # Last opcode
1559 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1560 be760ba8 Michael Hanselmann
        break
1561 be760ba8 Michael Hanselmann
1562 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1563 be760ba8 Michael Hanselmann
1564 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1565 be760ba8 Michael Hanselmann
1566 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1567 ebb2a2a3 Michael Hanselmann
1568 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1569 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1570 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1571 be760ba8 Michael Hanselmann
1572 be760ba8 Michael Hanselmann
    logmsgcount = sum(len(m) for m in messages.values())
1573 be760ba8 Michael Hanselmann
1574 be760ba8 Michael Hanselmann
    self._CheckLogMessages(job, logmsgcount)
1575 be760ba8 Michael Hanselmann
1576 be760ba8 Michael Hanselmann
    # Serialize and restore (simulates program restart)
1577 8a3cd185 Michael Hanselmann
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1578 be760ba8 Michael Hanselmann
    self._CheckLogMessages(newjob, logmsgcount)
1579 be760ba8 Michael Hanselmann
1580 be760ba8 Michael Hanselmann
    # Check each message
1581 be760ba8 Michael Hanselmann
    prevserial = -1
1582 be760ba8 Michael Hanselmann
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1583 be760ba8 Michael Hanselmann
      for (serial, timestamp, log_type, msg) in oplog:
1584 be760ba8 Michael Hanselmann
        (exptype, expmsg) = messages.get(idx).pop(0)
1585 be760ba8 Michael Hanselmann
        if exptype:
1586 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, exptype)
1587 be760ba8 Michael Hanselmann
        else:
1588 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1589 be760ba8 Michael Hanselmann
        self.assertEqual(expmsg, msg)
1590 be760ba8 Michael Hanselmann
        self.assert_(serial > prevserial)
1591 be760ba8 Michael Hanselmann
        prevserial = serial
1592 be760ba8 Michael Hanselmann
1593 be760ba8 Michael Hanselmann
  def _CheckLogMessages(self, job, count):
1594 be760ba8 Michael Hanselmann
    # Check serial
1595 be760ba8 Michael Hanselmann
    self.assertEqual(job.log_serial, count)
1596 be760ba8 Michael Hanselmann
1597 be760ba8 Michael Hanselmann
    # No filter
1598 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(None),
1599 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1600 be760ba8 Michael Hanselmann
                      for entry in entries])
1601 be760ba8 Michael Hanselmann
1602 be760ba8 Michael Hanselmann
    # Filter with serial
1603 be760ba8 Michael Hanselmann
    assert count > 3
1604 be760ba8 Michael Hanselmann
    self.assert_(job.GetLogEntries(3))
1605 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(3),
1606 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1607 be760ba8 Michael Hanselmann
                      for entry in entries][3:])
1608 be760ba8 Michael Hanselmann
1609 be760ba8 Michael Hanselmann
    # No log message after highest serial
1610 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count))
1611 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count + 3))
1612 be760ba8 Michael Hanselmann
1613 6a373640 Michael Hanselmann
  def testSubmitManyJobs(self):
1614 6a373640 Michael Hanselmann
    queue = _FakeQueueForProc()
1615 6a373640 Michael Hanselmann
1616 6a373640 Michael Hanselmann
    job_id = 15656
1617 6a373640 Michael Hanselmann
    ops = [
1618 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1619 6a373640 Michael Hanselmann
                          submit_jobs=[]),
1620 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1621 6a373640 Michael Hanselmann
                          submit_jobs=[
1622 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1623 6a373640 Michael Hanselmann
                            ]),
1624 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False,
1625 6a373640 Michael Hanselmann
                          submit_jobs=[
1626 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1627 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1628 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1629 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1630 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1631 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1632 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1633 6a373640 Michael Hanselmann
                            ]),
1634 6a373640 Michael Hanselmann
      ]
1635 6a373640 Michael Hanselmann
1636 6a373640 Michael Hanselmann
    # Create job
1637 6a373640 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1638 6a373640 Michael Hanselmann
1639 6a373640 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1640 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1641 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1642 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1643 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1644 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1645 6a373640 Michael Hanselmann
1646 6a373640 Michael Hanselmann
    def _AfterStart(op, cbs):
1647 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1648 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1649 6a373640 Michael Hanselmann
1650 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1651 6a373640 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1652 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1653 6a373640 Michael Hanselmann
1654 6a373640 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1655 6a373640 Michael Hanselmann
      (success, _) = job.Cancel()
1656 6a373640 Michael Hanselmann
      self.assertFalse(success)
1657 6a373640 Michael Hanselmann
1658 6a373640 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1659 6a373640 Michael Hanselmann
1660 6a373640 Michael Hanselmann
    for idx in range(len(ops)):
1661 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1662 6a373640 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1663 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1664 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1665 6a373640 Michael Hanselmann
      if idx == len(ops) - 1:
1666 6a373640 Michael Hanselmann
        # Last opcode
1667 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1668 6a373640 Michael Hanselmann
      else:
1669 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
1670 6a373640 Michael Hanselmann
1671 6a373640 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1672 6a373640 Michael Hanselmann
        self.assert_(job.start_timestamp)
1673 6a373640 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1674 6a373640 Michael Hanselmann
1675 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1676 6a373640 Michael Hanselmann
1677 6a373640 Michael Hanselmann
    for idx, submitted_ops in enumerate(job_ops
1678 6a373640 Michael Hanselmann
                                        for op in ops
1679 6a373640 Michael Hanselmann
                                        for job_ops in op.submit_jobs):
1680 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextSubmittedJob(),
1681 6a373640 Michael Hanselmann
                       (1000 + idx, submitted_ops))
1682 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1683 6a373640 Michael Hanselmann
1684 6a373640 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1685 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1686 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1687 6a373640 Michael Hanselmann
                     [[[], [1000], [1001, 1002, 1003]]])
1688 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1689 6a373640 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1690 6a373640 Michael Hanselmann
1691 6a373640 Michael Hanselmann
    self._GenericCheckJob(job)
1692 6a373640 Michael Hanselmann
1693 1e6d5750 Iustin Pop
    # Calling the processor on a finished job should be a no-op
1694 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1695 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1696 1e6d5750 Iustin Pop
    self.assertRaises(IndexError, queue.GetNextUpdate)
1697 6a373640 Michael Hanselmann
1698 b95479a5 Michael Hanselmann
  def testJobDependency(self):
1699 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1700 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1701 b95479a5 Michael Hanselmann
1702 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1703 b95479a5 Michael Hanselmann
1704 b95479a5 Michael Hanselmann
    prev_job_id = 22113
1705 b95479a5 Michael Hanselmann
    prev_job_id2 = 28102
1706 b95479a5 Michael Hanselmann
    job_id = 29929
1707 b95479a5 Michael Hanselmann
    ops = [
1708 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1709 b95479a5 Michael Hanselmann
                          depends=[
1710 b95479a5 Michael Hanselmann
                            [prev_job_id2, None],
1711 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1712 b95479a5 Michael Hanselmann
                            ]),
1713 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False),
1714 b95479a5 Michael Hanselmann
      ]
1715 b95479a5 Michael Hanselmann
1716 b95479a5 Michael Hanselmann
    # Create job
1717 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1718 b95479a5 Michael Hanselmann
1719 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1720 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1721 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1722 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1723 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1724 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1725 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1726 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1727 b95479a5 Michael Hanselmann
1728 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1729 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1730 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1731 b95479a5 Michael Hanselmann
1732 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1733 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1734 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1735 b95479a5 Michael Hanselmann
1736 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1737 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1738 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1739 b95479a5 Michael Hanselmann
1740 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1741 b95479a5 Michael Hanselmann
1742 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1743 b95479a5 Michael Hanselmann
1744 b95479a5 Michael Hanselmann
    counter = itertools.count()
1745 b95479a5 Michael Hanselmann
    while True:
1746 b95479a5 Michael Hanselmann
      attempt = counter.next()
1747 b95479a5 Michael Hanselmann
1748 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1749 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1750 b95479a5 Michael Hanselmann
1751 b95479a5 Michael Hanselmann
      if attempt < 2:
1752 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1753 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1754 b95479a5 Michael Hanselmann
      elif attempt == 2:
1755 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1756 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1757 b95479a5 Michael Hanselmann
        # The processor will ask for the next dependency immediately
1758 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1759 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1760 b95479a5 Michael Hanselmann
      elif attempt < 5:
1761 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1762 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1763 b95479a5 Michael Hanselmann
      elif attempt == 5:
1764 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1765 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1766 b95479a5 Michael Hanselmann
      if attempt == 2:
1767 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 2)
1768 b95479a5 Michael Hanselmann
      elif attempt > 5:
1769 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1770 b95479a5 Michael Hanselmann
      else:
1771 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1772 b95479a5 Michael Hanselmann
1773 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1774 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt >= 5:
1775 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1776 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1777 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1778 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1779 b95479a5 Michael Hanselmann
1780 b95479a5 Michael Hanselmann
      if attempt < 5:
1781 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1782 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1783 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1784 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1785 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1786 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1787 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1788 b95479a5 Michael Hanselmann
        continue
1789 b95479a5 Michael Hanselmann
1790 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1791 b95479a5 Michael Hanselmann
        # Last opcode
1792 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1793 b95479a5 Michael Hanselmann
        break
1794 b95479a5 Michael Hanselmann
1795 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1796 b95479a5 Michael Hanselmann
1797 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1798 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1799 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1800 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1801 b95479a5 Michael Hanselmann
1802 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1803 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1804 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1805 b95479a5 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1806 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1807 b95479a5 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1808 b95479a5 Michael Hanselmann
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1809 b95479a5 Michael Hanselmann
                               for op in job.ops))
1810 b95479a5 Michael Hanselmann
1811 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1812 b95479a5 Michael Hanselmann
1813 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1814 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1815 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1816 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountWaitingJobs())
1817 b95479a5 Michael Hanselmann
1818 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1819 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1820 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1821 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1822 b95479a5 Michael Hanselmann
1823 b95479a5 Michael Hanselmann
  def testJobDependencyCancel(self):
1824 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1825 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1826 b95479a5 Michael Hanselmann
1827 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1828 b95479a5 Michael Hanselmann
1829 b95479a5 Michael Hanselmann
    prev_job_id = 13623
1830 b95479a5 Michael Hanselmann
    job_id = 30876
1831 b95479a5 Michael Hanselmann
    ops = [
1832 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1833 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1834 b95479a5 Michael Hanselmann
                          depends=[
1835 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1836 b95479a5 Michael Hanselmann
                            ]),
1837 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1838 b95479a5 Michael Hanselmann
      ]
1839 b95479a5 Michael Hanselmann
1840 b95479a5 Michael Hanselmann
    # Create job
1841 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1842 b95479a5 Michael Hanselmann
1843 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1844 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1845 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1846 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1847 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1848 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1849 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1850 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1851 b95479a5 Michael Hanselmann
1852 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1853 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1854 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1855 b95479a5 Michael Hanselmann
1856 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1857 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1858 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1859 b95479a5 Michael Hanselmann
1860 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1861 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1862 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1863 b95479a5 Michael Hanselmann
1864 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1865 b95479a5 Michael Hanselmann
1866 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1867 b95479a5 Michael Hanselmann
1868 b95479a5 Michael Hanselmann
    counter = itertools.count()
1869 b95479a5 Michael Hanselmann
    while True:
1870 b95479a5 Michael Hanselmann
      attempt = counter.next()
1871 b95479a5 Michael Hanselmann
1872 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1873 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1874 b95479a5 Michael Hanselmann
1875 b95479a5 Michael Hanselmann
      if attempt == 0:
1876 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1877 b95479a5 Michael Hanselmann
        pass
1878 b95479a5 Michael Hanselmann
      elif attempt < 4:
1879 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1880 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1881 b95479a5 Michael Hanselmann
      elif attempt == 4:
1882 b95479a5 Michael Hanselmann
        # Other job was cancelled
1883 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1884 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1885 b95479a5 Michael Hanselmann
1886 b95479a5 Michael Hanselmann
      if attempt == 0:
1887 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1888 b95479a5 Michael Hanselmann
      else:
1889 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1890 b95479a5 Michael Hanselmann
1891 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1892 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
1893 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1894 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1895 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1896 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1897 b95479a5 Michael Hanselmann
1898 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
1899 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1900 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1901 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1902 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1903 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1904 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1905 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1906 b95479a5 Michael Hanselmann
        continue
1907 b95479a5 Michael Hanselmann
1908 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1909 b95479a5 Michael Hanselmann
        # Last opcode
1910 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1911 b95479a5 Michael Hanselmann
        break
1912 b95479a5 Michael Hanselmann
1913 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1914 b95479a5 Michael Hanselmann
1915 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1916 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1917 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1918 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1919 b95479a5 Michael Hanselmann
1920 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1921 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1922 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1923 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1924 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
1925 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
1926 b95479a5 Michael Hanselmann
                      ["Res0", "Job canceled by request",
1927 b95479a5 Michael Hanselmann
                       "Job canceled by request"]])
1928 b95479a5 Michael Hanselmann
1929 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1930 b95479a5 Michael Hanselmann
1931 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1932 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1933 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1934 b95479a5 Michael Hanselmann
1935 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1936 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1937 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1938 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1939 b95479a5 Michael Hanselmann
1940 b95479a5 Michael Hanselmann
  def testJobDependencyWrongstatus(self):
1941 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1942 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1943 b95479a5 Michael Hanselmann
1944 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1945 b95479a5 Michael Hanselmann
1946 b95479a5 Michael Hanselmann
    prev_job_id = 9741
1947 b95479a5 Michael Hanselmann
    job_id = 11763
1948 b95479a5 Michael Hanselmann
    ops = [
1949 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1950 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1951 b95479a5 Michael Hanselmann
                          depends=[
1952 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1953 b95479a5 Michael Hanselmann
                            ]),
1954 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1955 b95479a5 Michael Hanselmann
      ]
1956 b95479a5 Michael Hanselmann
1957 b95479a5 Michael Hanselmann
    # Create job
1958 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1959 b95479a5 Michael Hanselmann
1960 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1961 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1962 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1963 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1964 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1965 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1966 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1967 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1968 b95479a5 Michael Hanselmann
1969 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1970 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1971 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1972 b95479a5 Michael Hanselmann
1973 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1974 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1975 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1976 b95479a5 Michael Hanselmann
1977 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1978 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1979 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1980 b95479a5 Michael Hanselmann
1981 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1982 b95479a5 Michael Hanselmann
1983 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1984 b95479a5 Michael Hanselmann
1985 b95479a5 Michael Hanselmann
    counter = itertools.count()
1986 b95479a5 Michael Hanselmann
    while True:
1987 b95479a5 Michael Hanselmann
      attempt = counter.next()
1988 b95479a5 Michael Hanselmann
1989 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1990 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1991 b95479a5 Michael Hanselmann
1992 b95479a5 Michael Hanselmann
      if attempt == 0:
1993 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1994 b95479a5 Michael Hanselmann
        pass
1995 b95479a5 Michael Hanselmann
      elif attempt < 4:
1996 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1997 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1998 b95479a5 Michael Hanselmann
      elif attempt == 4:
1999 b95479a5 Michael Hanselmann
        # Other job failed
2000 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
2001 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
2002 b95479a5 Michael Hanselmann
2003 b95479a5 Michael Hanselmann
      if attempt == 0:
2004 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
2005 b95479a5 Michael Hanselmann
      else:
2006 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
2007 b95479a5 Michael Hanselmann
2008 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
2009 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
2010 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
2011 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
2012 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
2013 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
2014 b95479a5 Michael Hanselmann
2015 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
2016 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
2017 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
2018 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
2019 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2020 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
2021 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
2022 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
2023 b95479a5 Michael Hanselmann
        continue
2024 b95479a5 Michael Hanselmann
2025 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
2026 b95479a5 Michael Hanselmann
        # Last opcode
2027 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
2028 b95479a5 Michael Hanselmann
        break
2029 b95479a5 Michael Hanselmann
2030 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
2031 b95479a5 Michael Hanselmann
2032 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
2033 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2034 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
2035 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
2036 b95479a5 Michael Hanselmann
2037 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
2038 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
2039 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
2040 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
2041 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR,
2042 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR]]),
2043 b95479a5 Michael Hanselmann
2044 b95479a5 Michael Hanselmann
    (opresult, ) = job.GetInfo(["opresult"])
2045 b95479a5 Michael Hanselmann
    self.assertEqual(len(opresult), len(ops))
2046 b95479a5 Michael Hanselmann
    self.assertEqual(opresult[0], "Res0")
2047 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[1]))
2048 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[2]))
2049 b95479a5 Michael Hanselmann
2050 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
2051 b95479a5 Michael Hanselmann
2052 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
2053 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2054 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
2055 b95479a5 Michael Hanselmann
2056 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
2057 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
2058 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
2059 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
2060 b95479a5 Michael Hanselmann
2061 be760ba8 Michael Hanselmann
2062 df5a5730 Michael Hanselmann
class TestEvaluateJobProcessorResult(unittest.TestCase):
2063 df5a5730 Michael Hanselmann
  def testFinished(self):
2064 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2065 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(30953)
2066 df5a5730 Michael Hanselmann
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2067 df5a5730 Michael Hanselmann
                                       jqueue._JobProcessor.FINISHED)
2068 df5a5730 Michael Hanselmann
    self.assertEqual(depmgr.GetNextNotification(), job.id)
2069 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2070 df5a5730 Michael Hanselmann
2071 df5a5730 Michael Hanselmann
  def testDefer(self):
2072 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2073 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(11326, priority=5463)
2074 df5a5730 Michael Hanselmann
    try:
2075 df5a5730 Michael Hanselmann
      jqueue._EvaluateJobProcessorResult(depmgr, job,
2076 df5a5730 Michael Hanselmann
                                         jqueue._JobProcessor.DEFER)
2077 df5a5730 Michael Hanselmann
    except workerpool.DeferTask, err:
2078 df5a5730 Michael Hanselmann
      self.assertEqual(err.priority, 5463)
2079 df5a5730 Michael Hanselmann
    else:
2080 df5a5730 Michael Hanselmann
      self.fail("Didn't raise exception")
2081 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2082 df5a5730 Michael Hanselmann
2083 df5a5730 Michael Hanselmann
  def testWaitdep(self):
2084 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2085 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(21317)
2086 df5a5730 Michael Hanselmann
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2087 df5a5730 Michael Hanselmann
                                       jqueue._JobProcessor.WAITDEP)
2088 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2089 df5a5730 Michael Hanselmann
2090 df5a5730 Michael Hanselmann
  def testOther(self):
2091 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2092 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(5813)
2093 df5a5730 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
2094 df5a5730 Michael Hanselmann
                      jqueue._EvaluateJobProcessorResult,
2095 df5a5730 Michael Hanselmann
                      depmgr, job, "Other result")
2096 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2097 df5a5730 Michael Hanselmann
2098 df5a5730 Michael Hanselmann
2099 26d3fd2f Michael Hanselmann
class _FakeTimeoutStrategy:
2100 26d3fd2f Michael Hanselmann
  def __init__(self, timeouts):
2101 26d3fd2f Michael Hanselmann
    self.timeouts = timeouts
2102 26d3fd2f Michael Hanselmann
    self.attempts = 0
2103 26d3fd2f Michael Hanselmann
    self.last_timeout = None
2104 26d3fd2f Michael Hanselmann
2105 26d3fd2f Michael Hanselmann
  def NextAttempt(self):
2106 26d3fd2f Michael Hanselmann
    self.attempts += 1
2107 26d3fd2f Michael Hanselmann
    if self.timeouts:
2108 26d3fd2f Michael Hanselmann
      timeout = self.timeouts.pop(0)
2109 26d3fd2f Michael Hanselmann
    else:
2110 26d3fd2f Michael Hanselmann
      timeout = None
2111 26d3fd2f Michael Hanselmann
    self.last_timeout = timeout
2112 26d3fd2f Michael Hanselmann
    return timeout
2113 26d3fd2f Michael Hanselmann
2114 26d3fd2f Michael Hanselmann
2115 26d3fd2f Michael Hanselmann
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
2116 26d3fd2f Michael Hanselmann
  def setUp(self):
2117 26d3fd2f Michael Hanselmann
    self.queue = _FakeQueueForProc()
2118 26d3fd2f Michael Hanselmann
    self.job = None
2119 26d3fd2f Michael Hanselmann
    self.curop = None
2120 26d3fd2f Michael Hanselmann
    self.opcounter = None
2121 26d3fd2f Michael Hanselmann
    self.timeout_strategy = None