Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.jqueue_unittest.py @ 7352d33b

History | View | Annotate | Download (96.3 kB)

1 989a8bee Michael Hanselmann
#!/usr/bin/python
2 989a8bee Michael Hanselmann
#
3 989a8bee Michael Hanselmann
4 76b62028 Iustin Pop
# Copyright (C) 2010, 2011, 2012 Google Inc.
5 989a8bee Michael Hanselmann
#
6 989a8bee Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 989a8bee Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 989a8bee Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 989a8bee Michael Hanselmann
# (at your option) any later version.
10 989a8bee Michael Hanselmann
#
11 989a8bee Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 989a8bee Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 989a8bee Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 989a8bee Michael Hanselmann
# General Public License for more details.
15 989a8bee Michael Hanselmann
#
16 989a8bee Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 989a8bee Michael Hanselmann
# along with this program; if not, write to the Free Software
18 989a8bee Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 989a8bee Michael Hanselmann
# 02110-1301, USA.
20 989a8bee Michael Hanselmann
21 989a8bee Michael Hanselmann
22 989a8bee Michael Hanselmann
"""Script for testing ganeti.jqueue"""
23 989a8bee Michael Hanselmann
24 989a8bee Michael Hanselmann
import os
25 989a8bee Michael Hanselmann
import sys
26 989a8bee Michael Hanselmann
import unittest
27 989a8bee Michael Hanselmann
import tempfile
28 989a8bee Michael Hanselmann
import shutil
29 989a8bee Michael Hanselmann
import errno
30 26d3fd2f Michael Hanselmann
import itertools
31 fcb21ad7 Michael Hanselmann
import random
32 4679547e Michael Hanselmann
import operator
33 989a8bee Michael Hanselmann
34 383477e9 Michael Hanselmann
try:
35 383477e9 Michael Hanselmann
  # pylint: disable=E0611
36 383477e9 Michael Hanselmann
  from pyinotify import pyinotify
37 383477e9 Michael Hanselmann
except ImportError:
38 383477e9 Michael Hanselmann
  import pyinotify
39 383477e9 Michael Hanselmann
40 989a8bee Michael Hanselmann
from ganeti import constants
41 989a8bee Michael Hanselmann
from ganeti import utils
42 989a8bee Michael Hanselmann
from ganeti import errors
43 989a8bee Michael Hanselmann
from ganeti import jqueue
44 8f5c488d Michael Hanselmann
from ganeti import opcodes
45 8f5c488d Michael Hanselmann
from ganeti import compat
46 26d3fd2f Michael Hanselmann
from ganeti import mcpu
47 fcb21ad7 Michael Hanselmann
from ganeti import query
48 df5a5730 Michael Hanselmann
from ganeti import workerpool
49 989a8bee Michael Hanselmann
50 989a8bee Michael Hanselmann
import testutils
51 989a8bee Michael Hanselmann
52 989a8bee Michael Hanselmann
53 989a8bee Michael Hanselmann
class _FakeJob:
54 989a8bee Michael Hanselmann
  def __init__(self, job_id, status):
55 989a8bee Michael Hanselmann
    self.id = job_id
56 c0f6d0d8 Michael Hanselmann
    self.writable = False
57 989a8bee Michael Hanselmann
    self._status = status
58 989a8bee Michael Hanselmann
    self._log = []
59 989a8bee Michael Hanselmann
60 989a8bee Michael Hanselmann
  def SetStatus(self, status):
61 989a8bee Michael Hanselmann
    self._status = status
62 989a8bee Michael Hanselmann
63 989a8bee Michael Hanselmann
  def AddLogEntry(self, msg):
64 989a8bee Michael Hanselmann
    self._log.append((len(self._log), msg))
65 989a8bee Michael Hanselmann
66 989a8bee Michael Hanselmann
  def CalcStatus(self):
67 989a8bee Michael Hanselmann
    return self._status
68 989a8bee Michael Hanselmann
69 989a8bee Michael Hanselmann
  def GetInfo(self, fields):
70 989a8bee Michael Hanselmann
    result = []
71 989a8bee Michael Hanselmann
72 989a8bee Michael Hanselmann
    for name in fields:
73 989a8bee Michael Hanselmann
      if name == "status":
74 989a8bee Michael Hanselmann
        result.append(self._status)
75 989a8bee Michael Hanselmann
      else:
76 989a8bee Michael Hanselmann
        raise Exception("Unknown field")
77 989a8bee Michael Hanselmann
78 989a8bee Michael Hanselmann
    return result
79 989a8bee Michael Hanselmann
80 989a8bee Michael Hanselmann
  def GetLogEntries(self, newer_than):
81 989a8bee Michael Hanselmann
    assert newer_than is None or newer_than >= 0
82 989a8bee Michael Hanselmann
83 989a8bee Michael Hanselmann
    if newer_than is None:
84 989a8bee Michael Hanselmann
      return self._log
85 989a8bee Michael Hanselmann
86 989a8bee Michael Hanselmann
    return self._log[newer_than:]
87 989a8bee Michael Hanselmann
88 989a8bee Michael Hanselmann
89 989a8bee Michael Hanselmann
class TestJobChangesChecker(unittest.TestCase):
90 989a8bee Michael Hanselmann
  def testStatus(self):
91 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
92 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
93 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
94 989a8bee Michael Hanselmann
95 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
96 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
97 989a8bee Michael Hanselmann
98 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
99 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
100 989a8bee Michael Hanselmann
101 989a8bee Michael Hanselmann
    # job.id is used by checker
102 989a8bee Michael Hanselmann
    self.assertEqual(job.id, 9094)
103 989a8bee Michael Hanselmann
104 989a8bee Michael Hanselmann
  def testStatusWithPrev(self):
105 989a8bee Michael Hanselmann
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
106 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"],
107 989a8bee Michael Hanselmann
                                        [constants.JOB_STATUS_QUEUED], None)
108 989a8bee Michael Hanselmann
    self.assert_(checker(job) is None)
109 989a8bee Michael Hanselmann
110 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
111 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
112 989a8bee Michael Hanselmann
113 989a8bee Michael Hanselmann
  def testFinalStatus(self):
114 989a8bee Michael Hanselmann
    for status in constants.JOBS_FINALIZED:
115 989a8bee Michael Hanselmann
      job = _FakeJob(2178711, status)
116 989a8bee Michael Hanselmann
      checker = jqueue._JobChangesChecker(["status"], [status], None)
117 989a8bee Michael Hanselmann
      # There won't be any changes in this status, hence it should signal
118 989a8bee Michael Hanselmann
      # a change immediately
119 989a8bee Michael Hanselmann
      self.assertEqual(checker(job), ([status], []))
120 989a8bee Michael Hanselmann
121 989a8bee Michael Hanselmann
  def testLog(self):
122 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
123 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
124 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
125 989a8bee Michael Hanselmann
126 989a8bee Michael Hanselmann
    job.AddLogEntry("Hello World")
127 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker(job)
128 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
129 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"]])
130 989a8bee Michael Hanselmann
131 989a8bee Michael Hanselmann
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
132 989a8bee Michael Hanselmann
    self.assert_(checker2(job) is None)
133 989a8bee Michael Hanselmann
134 989a8bee Michael Hanselmann
    job.AddLogEntry("Foo Bar")
135 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_ERROR)
136 989a8bee Michael Hanselmann
137 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker2(job)
138 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
139 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
140 989a8bee Michael Hanselmann
141 989a8bee Michael Hanselmann
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
142 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker3(job)
143 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
144 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
145 989a8bee Michael Hanselmann
146 989a8bee Michael Hanselmann
147 989a8bee Michael Hanselmann
class TestJobChangesWaiter(unittest.TestCase):
148 989a8bee Michael Hanselmann
  def setUp(self):
149 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
150 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
151 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
152 989a8bee Michael Hanselmann
153 989a8bee Michael Hanselmann
  def tearDown(self):
154 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
155 989a8bee Michael Hanselmann
156 989a8bee Michael Hanselmann
  def _EnsureNotifierClosed(self, notifier):
157 989a8bee Michael Hanselmann
    try:
158 989a8bee Michael Hanselmann
      os.fstat(notifier._fd)
159 989a8bee Michael Hanselmann
    except EnvironmentError, err:
160 989a8bee Michael Hanselmann
      self.assertEqual(err.errno, errno.EBADF)
161 989a8bee Michael Hanselmann
    else:
162 989a8bee Michael Hanselmann
      self.fail("File descriptor wasn't closed")
163 989a8bee Michael Hanselmann
164 989a8bee Michael Hanselmann
  def testClose(self):
165 989a8bee Michael Hanselmann
    for wait in [False, True]:
166 989a8bee Michael Hanselmann
      waiter = jqueue._JobFileChangesWaiter(self.filename)
167 989a8bee Michael Hanselmann
      try:
168 989a8bee Michael Hanselmann
        if wait:
169 989a8bee Michael Hanselmann
          waiter.Wait(0.001)
170 989a8bee Michael Hanselmann
      finally:
171 989a8bee Michael Hanselmann
        waiter.Close()
172 989a8bee Michael Hanselmann
173 989a8bee Michael Hanselmann
      # Ensure file descriptor was closed
174 989a8bee Michael Hanselmann
      self._EnsureNotifierClosed(waiter._notifier)
175 989a8bee Michael Hanselmann
176 989a8bee Michael Hanselmann
  def testChangingFile(self):
177 989a8bee Michael Hanselmann
    waiter = jqueue._JobFileChangesWaiter(self.filename)
178 989a8bee Michael Hanselmann
    try:
179 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
180 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
181 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
182 989a8bee Michael Hanselmann
    finally:
183 989a8bee Michael Hanselmann
      waiter.Close()
184 989a8bee Michael Hanselmann
185 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._notifier)
186 989a8bee Michael Hanselmann
187 989a8bee Michael Hanselmann
  def testChangingFile2(self):
188 989a8bee Michael Hanselmann
    waiter = jqueue._JobChangesWaiter(self.filename)
189 989a8bee Michael Hanselmann
    try:
190 989a8bee Michael Hanselmann
      self.assertFalse(waiter._filewaiter)
191 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(0.1))
192 989a8bee Michael Hanselmann
      self.assert_(waiter._filewaiter)
193 989a8bee Michael Hanselmann
194 989a8bee Michael Hanselmann
      # File waiter is now used, but there have been no changes
195 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
196 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
197 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
198 989a8bee Michael Hanselmann
    finally:
199 989a8bee Michael Hanselmann
      waiter.Close()
200 989a8bee Michael Hanselmann
201 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
202 989a8bee Michael Hanselmann
203 989a8bee Michael Hanselmann
204 383477e9 Michael Hanselmann
class _FailingWatchManager(pyinotify.WatchManager):
205 383477e9 Michael Hanselmann
  """Subclass of L{pyinotify.WatchManager} which always fails to register.
206 383477e9 Michael Hanselmann

207 383477e9 Michael Hanselmann
  """
208 383477e9 Michael Hanselmann
  def add_watch(self, filename, mask):
209 383477e9 Michael Hanselmann
    assert mask == (pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] |
210 383477e9 Michael Hanselmann
                    pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"])
211 383477e9 Michael Hanselmann
212 383477e9 Michael Hanselmann
    return {
213 383477e9 Michael Hanselmann
      filename: -1,
214 383477e9 Michael Hanselmann
      }
215 383477e9 Michael Hanselmann
216 383477e9 Michael Hanselmann
217 989a8bee Michael Hanselmann
class TestWaitForJobChangesHelper(unittest.TestCase):
218 989a8bee Michael Hanselmann
  def setUp(self):
219 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
220 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
221 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
222 989a8bee Michael Hanselmann
223 989a8bee Michael Hanselmann
  def tearDown(self):
224 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
225 989a8bee Michael Hanselmann
226 989a8bee Michael Hanselmann
  def _LoadWaitingJob(self):
227 47099cd1 Michael Hanselmann
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
228 989a8bee Michael Hanselmann
229 989a8bee Michael Hanselmann
  def _LoadLostJob(self):
230 989a8bee Michael Hanselmann
    return None
231 989a8bee Michael Hanselmann
232 989a8bee Michael Hanselmann
  def testNoChanges(self):
233 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
234 989a8bee Michael Hanselmann
235 989a8bee Michael Hanselmann
    # No change
236 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
237 47099cd1 Michael Hanselmann
                          [constants.JOB_STATUS_WAITING], None, 0.1),
238 989a8bee Michael Hanselmann
                     constants.JOB_NOTCHANGED)
239 989a8bee Michael Hanselmann
240 989a8bee Michael Hanselmann
    # No previous information
241 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
242 989a8bee Michael Hanselmann
                          ["status"], None, None, 1.0),
243 47099cd1 Michael Hanselmann
                     ([constants.JOB_STATUS_WAITING], []))
244 989a8bee Michael Hanselmann
245 989a8bee Michael Hanselmann
  def testLostJob(self):
246 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
247 989a8bee Michael Hanselmann
    self.assert_(wfjc(self.filename, self._LoadLostJob,
248 989a8bee Michael Hanselmann
                      ["status"], None, None, 1.0) is None)
249 989a8bee Michael Hanselmann
250 383477e9 Michael Hanselmann
  def testNonExistentFile(self):
251 383477e9 Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
252 383477e9 Michael Hanselmann
253 383477e9 Michael Hanselmann
    filename = utils.PathJoin(self.tmpdir, "does-not-exist")
254 383477e9 Michael Hanselmann
    self.assertFalse(os.path.exists(filename))
255 383477e9 Michael Hanselmann
256 383477e9 Michael Hanselmann
    result = wfjc(filename, self._LoadLostJob, ["status"], None, None, 1.0,
257 383477e9 Michael Hanselmann
                  _waiter_cls=compat.partial(jqueue._JobChangesWaiter,
258 383477e9 Michael Hanselmann
                                             _waiter_cls=NotImplemented))
259 383477e9 Michael Hanselmann
    self.assertTrue(result is None)
260 383477e9 Michael Hanselmann
261 383477e9 Michael Hanselmann
  def testInotifyError(self):
262 383477e9 Michael Hanselmann
    jobfile_waiter_cls = \
263 383477e9 Michael Hanselmann
      compat.partial(jqueue._JobFileChangesWaiter,
264 383477e9 Michael Hanselmann
                     _inotify_wm_cls=_FailingWatchManager)
265 383477e9 Michael Hanselmann
266 383477e9 Michael Hanselmann
    jobchange_waiter_cls = \
267 383477e9 Michael Hanselmann
      compat.partial(jqueue._JobChangesWaiter, _waiter_cls=jobfile_waiter_cls)
268 383477e9 Michael Hanselmann
269 383477e9 Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
270 383477e9 Michael Hanselmann
271 383477e9 Michael Hanselmann
    # Test if failing to watch a job file (e.g. due to
272 383477e9 Michael Hanselmann
    # fs.inotify.max_user_watches being too low) raises errors.InotifyError
273 383477e9 Michael Hanselmann
    self.assertRaises(errors.InotifyError, wfjc,
274 383477e9 Michael Hanselmann
                      self.filename, self._LoadWaitingJob,
275 383477e9 Michael Hanselmann
                      ["status"], [constants.JOB_STATUS_WAITING], None, 1.0,
276 383477e9 Michael Hanselmann
                      _waiter_cls=jobchange_waiter_cls)
277 383477e9 Michael Hanselmann
278 989a8bee Michael Hanselmann
279 6760e4ed Michael Hanselmann
class TestEncodeOpError(unittest.TestCase):
280 6760e4ed Michael Hanselmann
  def test(self):
281 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
282 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
283 6760e4ed Michael Hanselmann
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
284 6760e4ed Michael Hanselmann
285 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
286 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
287 6760e4ed Michael Hanselmann
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
288 6760e4ed Michael Hanselmann
289 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
290 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
291 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
292 6760e4ed Michael Hanselmann
293 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError("Hello World")
294 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
295 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
296 6760e4ed Michael Hanselmann
297 6760e4ed Michael Hanselmann
298 8f5c488d Michael Hanselmann
class TestQueuedOpCode(unittest.TestCase):
299 8f5c488d Michael Hanselmann
  def testDefaults(self):
300 8f5c488d Michael Hanselmann
    def _Check(op):
301 8f5c488d Michael Hanselmann
      self.assertFalse(hasattr(op.input, "dry_run"))
302 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
303 8f5c488d Michael Hanselmann
      self.assertFalse(op.log)
304 8f5c488d Michael Hanselmann
      self.assert_(op.start_timestamp is None)
305 8f5c488d Michael Hanselmann
      self.assert_(op.exec_timestamp is None)
306 8f5c488d Michael Hanselmann
      self.assert_(op.end_timestamp is None)
307 8f5c488d Michael Hanselmann
      self.assert_(op.result is None)
308 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
309 8f5c488d Michael Hanselmann
310 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
311 8f5c488d Michael Hanselmann
    _Check(op1)
312 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
313 8f5c488d Michael Hanselmann
    _Check(op2)
314 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
315 8f5c488d Michael Hanselmann
316 8f5c488d Michael Hanselmann
  def testPriority(self):
317 8f5c488d Michael Hanselmann
    def _Check(op):
318 8f5c488d Michael Hanselmann
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
319 8f5c488d Michael Hanselmann
             "Default priority equals high priority; test can't work"
320 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
321 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
322 8f5c488d Michael Hanselmann
323 c6afb1ca Iustin Pop
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
324 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(inpop)
325 8f5c488d Michael Hanselmann
    _Check(op1)
326 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
327 8f5c488d Michael Hanselmann
    _Check(op2)
328 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
329 8f5c488d Michael Hanselmann
330 8f5c488d Michael Hanselmann
331 8f5c488d Michael Hanselmann
class TestQueuedJob(unittest.TestCase):
332 4679547e Michael Hanselmann
  def testNoOpCodes(self):
333 5f6b0b71 Michael Hanselmann
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
334 c0f6d0d8 Michael Hanselmann
                      None, 1, [], False)
335 5f6b0b71 Michael Hanselmann
336 8f5c488d Michael Hanselmann
  def testDefaults(self):
337 8f5c488d Michael Hanselmann
    job_id = 4260
338 8f5c488d Michael Hanselmann
    ops = [
339 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(),
340 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
341 8f5c488d Michael Hanselmann
      ]
342 8f5c488d Michael Hanselmann
343 8f5c488d Michael Hanselmann
    def _Check(job):
344 c0f6d0d8 Michael Hanselmann
      self.assertTrue(job.writable)
345 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
346 8f5c488d Michael Hanselmann
      self.assertEqual(job.log_serial, 0)
347 8f5c488d Michael Hanselmann
      self.assert_(job.received_timestamp)
348 8f5c488d Michael Hanselmann
      self.assert_(job.start_timestamp is None)
349 8f5c488d Michael Hanselmann
      self.assert_(job.end_timestamp is None)
350 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
351 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
352 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
353 8f5c488d Michael Hanselmann
      self.assertEqual(len(job.ops), len(ops))
354 8f5c488d Michael Hanselmann
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
355 8f5c488d Michael Hanselmann
                              for (inp, op) in zip(ops, job.ops)))
356 a06c6ae8 Michael Hanselmann
      self.assertRaises(errors.OpPrereqError, job.GetInfo,
357 5f6b0b71 Michael Hanselmann
                        ["unknown-field"])
358 5f6b0b71 Michael Hanselmann
      self.assertEqual(job.GetInfo(["summary"]),
359 5f6b0b71 Michael Hanselmann
                       [[op.input.Summary() for op in job.ops]])
360 8a3cd185 Michael Hanselmann
      self.assertFalse(job.archived)
361 8f5c488d Michael Hanselmann
362 c0f6d0d8 Michael Hanselmann
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
363 8f5c488d Michael Hanselmann
    _Check(job1)
364 8a3cd185 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True, False)
365 8f5c488d Michael Hanselmann
    _Check(job2)
366 8f5c488d Michael Hanselmann
    self.assertEqual(job1.Serialize(), job2.Serialize())
367 8f5c488d Michael Hanselmann
368 c0f6d0d8 Michael Hanselmann
  def testWritable(self):
369 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
370 c0f6d0d8 Michael Hanselmann
    self.assertFalse(job.writable)
371 c0f6d0d8 Michael Hanselmann
372 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
373 c0f6d0d8 Michael Hanselmann
    self.assertTrue(job.writable)
374 c0f6d0d8 Michael Hanselmann
375 8a3cd185 Michael Hanselmann
  def testArchived(self):
376 8a3cd185 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
377 8a3cd185 Michael Hanselmann
    self.assertFalse(job.archived)
378 8a3cd185 Michael Hanselmann
379 8a3cd185 Michael Hanselmann
    newjob = jqueue._QueuedJob.Restore(None, job.Serialize(), True, True)
380 8a3cd185 Michael Hanselmann
    self.assertTrue(newjob.archived)
381 8a3cd185 Michael Hanselmann
382 8a3cd185 Michael Hanselmann
    newjob2 = jqueue._QueuedJob.Restore(None, newjob.Serialize(), True, False)
383 8a3cd185 Michael Hanselmann
    self.assertFalse(newjob2.archived)
384 8a3cd185 Michael Hanselmann
385 8f5c488d Michael Hanselmann
  def testPriority(self):
386 8f5c488d Michael Hanselmann
    job_id = 4283
387 8f5c488d Michael Hanselmann
    ops = [
388 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
389 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
390 8f5c488d Michael Hanselmann
      ]
391 8f5c488d Michael Hanselmann
392 8f5c488d Michael Hanselmann
    def _Check(job):
393 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
394 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
395 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
396 8f5c488d Michael Hanselmann
397 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, job_id, ops, True)
398 8f5c488d Michael Hanselmann
    _Check(job)
399 8f5c488d Michael Hanselmann
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
400 8f5c488d Michael Hanselmann
                            for op in job.ops))
401 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
402 8f5c488d Michael Hanselmann
403 8f5c488d Michael Hanselmann
    # Increase first
404 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 1
405 8f5c488d Michael Hanselmann
    _Check(job)
406 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
407 8f5c488d Michael Hanselmann
408 8f5c488d Michael Hanselmann
    # Mark opcode as finished
409 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
410 8f5c488d Michael Hanselmann
    _Check(job)
411 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
412 8f5c488d Michael Hanselmann
413 8f5c488d Michael Hanselmann
    # Increase second
414 8f5c488d Michael Hanselmann
    job.ops[1].priority -= 10
415 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
416 8f5c488d Michael Hanselmann
417 8f5c488d Michael Hanselmann
    # Test increasing first
418 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
419 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 19
420 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
421 8f5c488d Michael Hanselmann
422 4679547e Michael Hanselmann
  def _JobForPriority(self, job_id):
423 4679547e Michael Hanselmann
    ops = [
424 4679547e Michael Hanselmann
      opcodes.OpTagsGet(),
425 4679547e Michael Hanselmann
      opcodes.OpTestDelay(),
426 4679547e Michael Hanselmann
      opcodes.OpTagsGet(),
427 4679547e Michael Hanselmann
      opcodes.OpTestDelay(),
428 4679547e Michael Hanselmann
      ]
429 4679547e Michael Hanselmann
430 4679547e Michael Hanselmann
    job = jqueue._QueuedJob(None, job_id, ops, True)
431 4679547e Michael Hanselmann
432 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
433 4679547e Michael Hanselmann
                               for op in job.ops))
434 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
435 4679547e Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
436 4679547e Michael Hanselmann
                                for op in job.ops))
437 4679547e Michael Hanselmann
438 4679547e Michael Hanselmann
    return job
439 4679547e Michael Hanselmann
440 4679547e Michael Hanselmann
  def testChangePriorityAllQueued(self):
441 4679547e Michael Hanselmann
    job = self._JobForPriority(24984)
442 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
443 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
444 4679547e Michael Hanselmann
                               for op in job.ops))
445 4679547e Michael Hanselmann
    result = job.ChangePriority(-10)
446 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -10)
447 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
448 3c631ea2 Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
449 3c631ea2 Michael Hanselmann
                                for op in job.ops))
450 4679547e Michael Hanselmann
    self.assertEqual(result,
451 4679547e Michael Hanselmann
                     (True, ("Priorities of pending opcodes for job 24984 have"
452 4679547e Michael Hanselmann
                             " been changed to -10")))
453 4679547e Michael Hanselmann
454 4679547e Michael Hanselmann
  def testChangePriorityAllFinished(self):
455 4679547e Michael Hanselmann
    job = self._JobForPriority(16405)
456 4679547e Michael Hanselmann
457 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
458 4679547e Michael Hanselmann
      if idx > 2:
459 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
460 4679547e Michael Hanselmann
      else:
461 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
462 4679547e Michael Hanselmann
463 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
464 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
465 4679547e Michael Hanselmann
    result = job.ChangePriority(-10)
466 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
467 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
468 4679547e Michael Hanselmann
                               for op in job.ops))
469 4679547e Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
470 4679547e Michael Hanselmann
                                for op in job.ops))
471 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
472 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
473 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
474 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
475 4679547e Michael Hanselmann
      constants.OP_STATUS_ERROR,
476 4679547e Michael Hanselmann
      ])
477 4679547e Michael Hanselmann
    self.assertEqual(result, (False, "Job 16405 is finished"))
478 4679547e Michael Hanselmann
479 4679547e Michael Hanselmann
  def testChangePriorityCancelling(self):
480 4679547e Michael Hanselmann
    job = self._JobForPriority(31572)
481 4679547e Michael Hanselmann
482 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
483 4679547e Michael Hanselmann
      if idx > 1:
484 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
485 4679547e Michael Hanselmann
      else:
486 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
487 4679547e Michael Hanselmann
488 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
489 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
490 4679547e Michael Hanselmann
    result = job.ChangePriority(5)
491 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
492 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
493 4679547e Michael Hanselmann
                               for op in job.ops))
494 4679547e Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
495 4679547e Michael Hanselmann
                                for op in job.ops))
496 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
497 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
498 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
499 4679547e Michael Hanselmann
      constants.OP_STATUS_CANCELING,
500 4679547e Michael Hanselmann
      constants.OP_STATUS_CANCELING,
501 4679547e Michael Hanselmann
      ])
502 4679547e Michael Hanselmann
    self.assertEqual(result, (False, "Job 31572 is cancelling"))
503 4679547e Michael Hanselmann
504 4679547e Michael Hanselmann
  def testChangePriorityFirstRunning(self):
505 4679547e Michael Hanselmann
    job = self._JobForPriority(1716215889)
506 4679547e Michael Hanselmann
507 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
508 4679547e Michael Hanselmann
      if idx == 0:
509 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_RUNNING
510 4679547e Michael Hanselmann
      else:
511 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_QUEUED
512 4679547e Michael Hanselmann
513 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
514 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
515 4679547e Michael Hanselmann
    result = job.ChangePriority(7)
516 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
517 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
518 4679547e Michael Hanselmann
                     [constants.OP_PRIO_DEFAULT, 7, 7, 7])
519 3c631ea2 Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
520 3c631ea2 Michael Hanselmann
                                for op in job.ops))
521 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
522 4679547e Michael Hanselmann
      constants.OP_STATUS_RUNNING,
523 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
524 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
525 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
526 4679547e Michael Hanselmann
      ])
527 4679547e Michael Hanselmann
    self.assertEqual(result,
528 4679547e Michael Hanselmann
                     (True, ("Priorities of pending opcodes for job"
529 4679547e Michael Hanselmann
                             " 1716215889 have been changed to 7")))
530 4679547e Michael Hanselmann
531 4679547e Michael Hanselmann
  def testChangePriorityLastRunning(self):
532 4679547e Michael Hanselmann
    job = self._JobForPriority(1308)
533 4679547e Michael Hanselmann
534 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
535 4679547e Michael Hanselmann
      if idx == (len(job.ops) - 1):
536 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_RUNNING
537 4679547e Michael Hanselmann
      else:
538 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
539 4679547e Michael Hanselmann
540 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
541 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
542 4679547e Michael Hanselmann
    result = job.ChangePriority(-3)
543 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
544 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
545 4679547e Michael Hanselmann
                               for op in job.ops))
546 4679547e Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
547 4679547e Michael Hanselmann
                                for op in job.ops))
548 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
549 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
550 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
551 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
552 4679547e Michael Hanselmann
      constants.OP_STATUS_RUNNING,
553 4679547e Michael Hanselmann
      ])
554 4679547e Michael Hanselmann
    self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
555 4679547e Michael Hanselmann
556 4679547e Michael Hanselmann
  def testChangePrioritySecondOpcodeRunning(self):
557 4679547e Michael Hanselmann
    job = self._JobForPriority(27701)
558 4679547e Michael Hanselmann
559 4679547e Michael Hanselmann
    self.assertEqual(len(job.ops), 4)
560 4679547e Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
561 4679547e Michael Hanselmann
    job.ops[1].status = constants.OP_STATUS_RUNNING
562 4679547e Michael Hanselmann
    job.ops[2].status = constants.OP_STATUS_QUEUED
563 4679547e Michael Hanselmann
    job.ops[3].status = constants.OP_STATUS_QUEUED
564 4679547e Michael Hanselmann
565 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
566 4679547e Michael Hanselmann
    result = job.ChangePriority(-19)
567 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -19)
568 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
569 4679547e Michael Hanselmann
                     [constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
570 4679547e Michael Hanselmann
                      -19, -19])
571 3c631ea2 Michael Hanselmann
    self.assertFalse(compat.any(hasattr(op.input, "priority")
572 3c631ea2 Michael Hanselmann
                                for op in job.ops))
573 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
574 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
575 4679547e Michael Hanselmann
      constants.OP_STATUS_RUNNING,
576 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
577 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
578 4679547e Michael Hanselmann
      ])
579 4679547e Michael Hanselmann
    self.assertEqual(result,
580 4679547e Michael Hanselmann
                     (True, ("Priorities of pending opcodes for job"
581 4679547e Michael Hanselmann
                             " 27701 have been changed to -19")))
582 4679547e Michael Hanselmann
583 4679547e Michael Hanselmann
  def testChangePriorityWithInconsistentJob(self):
584 4679547e Michael Hanselmann
    job = self._JobForPriority(30097)
585 4679547e Michael Hanselmann
586 4679547e Michael Hanselmann
    self.assertEqual(len(job.ops), 4)
587 4679547e Michael Hanselmann
588 4679547e Michael Hanselmann
    # This job is invalid (as it has two opcodes marked as running) and make
589 4679547e Michael Hanselmann
    # the call fail because an unprocessed opcode precedes a running one (which
590 4679547e Michael Hanselmann
    # should never happen in reality)
591 4679547e Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
592 4679547e Michael Hanselmann
    job.ops[1].status = constants.OP_STATUS_RUNNING
593 4679547e Michael Hanselmann
    job.ops[2].status = constants.OP_STATUS_QUEUED
594 4679547e Michael Hanselmann
    job.ops[3].status = constants.OP_STATUS_RUNNING
595 4679547e Michael Hanselmann
596 4679547e Michael Hanselmann
    self.assertRaises(AssertionError, job.ChangePriority, 19)
597 4679547e Michael Hanselmann
598 db5bce34 Michael Hanselmann
  def testCalcStatus(self):
599 db5bce34 Michael Hanselmann
    def _Queued(ops):
600 db5bce34 Michael Hanselmann
      # The default status is "queued"
601 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
602 db5bce34 Michael Hanselmann
                              for op in ops))
603 db5bce34 Michael Hanselmann
604 db5bce34 Michael Hanselmann
    def _Waitlock1(ops):
605 47099cd1 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_WAITING
606 db5bce34 Michael Hanselmann
607 db5bce34 Michael Hanselmann
    def _Waitlock2(ops):
608 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
609 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
610 47099cd1 Michael Hanselmann
      ops[2].status = constants.OP_STATUS_WAITING
611 db5bce34 Michael Hanselmann
612 db5bce34 Michael Hanselmann
    def _Running(ops):
613 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
614 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_RUNNING
615 db5bce34 Michael Hanselmann
      for op in ops[2:]:
616 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_QUEUED
617 db5bce34 Michael Hanselmann
618 db5bce34 Michael Hanselmann
    def _Canceling1(ops):
619 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
620 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
621 db5bce34 Michael Hanselmann
      for op in ops[2:]:
622 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
623 db5bce34 Michael Hanselmann
624 db5bce34 Michael Hanselmann
    def _Canceling2(ops):
625 db5bce34 Michael Hanselmann
      for op in ops:
626 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
627 db5bce34 Michael Hanselmann
628 db5bce34 Michael Hanselmann
    def _Canceled(ops):
629 db5bce34 Michael Hanselmann
      for op in ops:
630 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELED
631 db5bce34 Michael Hanselmann
632 db5bce34 Michael Hanselmann
    def _Error1(ops):
633 db5bce34 Michael Hanselmann
      for idx, op in enumerate(ops):
634 db5bce34 Michael Hanselmann
        if idx > 3:
635 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_ERROR
636 db5bce34 Michael Hanselmann
        else:
637 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_SUCCESS
638 db5bce34 Michael Hanselmann
639 db5bce34 Michael Hanselmann
    def _Error2(ops):
640 db5bce34 Michael Hanselmann
      for op in ops:
641 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
642 db5bce34 Michael Hanselmann
643 db5bce34 Michael Hanselmann
    def _Success(ops):
644 db5bce34 Michael Hanselmann
      for op in ops:
645 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
646 db5bce34 Michael Hanselmann
647 db5bce34 Michael Hanselmann
    tests = {
648 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_QUEUED: [_Queued],
649 47099cd1 Michael Hanselmann
      constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
650 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_RUNNING: [_Running],
651 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
652 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELED: [_Canceled],
653 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
654 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_SUCCESS: [_Success],
655 db5bce34 Michael Hanselmann
      }
656 db5bce34 Michael Hanselmann
657 db5bce34 Michael Hanselmann
    def _NewJob():
658 db5bce34 Michael Hanselmann
      job = jqueue._QueuedJob(None, 1,
659 c0f6d0d8 Michael Hanselmann
                              [opcodes.OpTestDelay() for _ in range(10)],
660 c0f6d0d8 Michael Hanselmann
                              True)
661 db5bce34 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
662 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
663 db5bce34 Michael Hanselmann
                              for op in job.ops))
664 db5bce34 Michael Hanselmann
      return job
665 db5bce34 Michael Hanselmann
666 db5bce34 Michael Hanselmann
    for status in constants.JOB_STATUS_ALL:
667 db5bce34 Michael Hanselmann
      sttests = tests[status]
668 db5bce34 Michael Hanselmann
      assert sttests
669 db5bce34 Michael Hanselmann
      for fn in sttests:
670 db5bce34 Michael Hanselmann
        job = _NewJob()
671 db5bce34 Michael Hanselmann
        fn(job.ops)
672 db5bce34 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), status)
673 db5bce34 Michael Hanselmann
674 8f5c488d Michael Hanselmann
675 b95479a5 Michael Hanselmann
class _FakeDependencyManager:
676 be760ba8 Michael Hanselmann
  def __init__(self):
677 b95479a5 Michael Hanselmann
    self._checks = []
678 b95479a5 Michael Hanselmann
    self._notifications = []
679 b95479a5 Michael Hanselmann
    self._waiting = set()
680 b95479a5 Michael Hanselmann
681 b95479a5 Michael Hanselmann
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
682 b95479a5 Michael Hanselmann
    self._checks.append((job, dep_job_id, dep_status, result))
683 b95479a5 Michael Hanselmann
684 b95479a5 Michael Hanselmann
  def CountPendingResults(self):
685 b95479a5 Michael Hanselmann
    return len(self._checks)
686 b95479a5 Michael Hanselmann
687 b95479a5 Michael Hanselmann
  def CountWaitingJobs(self):
688 b95479a5 Michael Hanselmann
    return len(self._waiting)
689 b95479a5 Michael Hanselmann
690 b95479a5 Michael Hanselmann
  def GetNextNotification(self):
691 b95479a5 Michael Hanselmann
    return self._notifications.pop(0)
692 b95479a5 Michael Hanselmann
693 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
694 b95479a5 Michael Hanselmann
    return job in self._waiting
695 b95479a5 Michael Hanselmann
696 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
697 b95479a5 Michael Hanselmann
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
698 b95479a5 Michael Hanselmann
699 b95479a5 Michael Hanselmann
    assert exp_job == job
700 b95479a5 Michael Hanselmann
    assert exp_dep_job_id == dep_job_id
701 b95479a5 Michael Hanselmann
    assert exp_dep_status == dep_status
702 b95479a5 Michael Hanselmann
703 b95479a5 Michael Hanselmann
    (result_status, _) = result
704 b95479a5 Michael Hanselmann
705 b95479a5 Michael Hanselmann
    if result_status == jqueue._JobDependencyManager.WAIT:
706 b95479a5 Michael Hanselmann
      self._waiting.add(job)
707 b95479a5 Michael Hanselmann
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
708 b95479a5 Michael Hanselmann
      self._waiting.remove(job)
709 b95479a5 Michael Hanselmann
710 b95479a5 Michael Hanselmann
    return result
711 b95479a5 Michael Hanselmann
712 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
713 b95479a5 Michael Hanselmann
    self._notifications.append(job_id)
714 b95479a5 Michael Hanselmann
715 b95479a5 Michael Hanselmann
716 b95479a5 Michael Hanselmann
class _DisabledFakeDependencyManager:
717 b95479a5 Michael Hanselmann
  def JobWaiting(self, _):
718 b95479a5 Michael Hanselmann
    return False
719 b95479a5 Michael Hanselmann
720 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, *args):
721 b95479a5 Michael Hanselmann
    assert False, "Should not be called"
722 b95479a5 Michael Hanselmann
723 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, _):
724 b95479a5 Michael Hanselmann
    pass
725 b95479a5 Michael Hanselmann
726 b95479a5 Michael Hanselmann
727 b95479a5 Michael Hanselmann
class _FakeQueueForProc:
728 b95479a5 Michael Hanselmann
  def __init__(self, depmgr=None):
729 be760ba8 Michael Hanselmann
    self._acquired = False
730 ebb2a2a3 Michael Hanselmann
    self._updates = []
731 6a373640 Michael Hanselmann
    self._submitted = []
732 942e2262 Michael Hanselmann
    self._accepting_jobs = True
733 6a373640 Michael Hanselmann
734 6a373640 Michael Hanselmann
    self._submit_count = itertools.count(1000)
735 be760ba8 Michael Hanselmann
736 b95479a5 Michael Hanselmann
    if depmgr:
737 b95479a5 Michael Hanselmann
      self.depmgr = depmgr
738 b95479a5 Michael Hanselmann
    else:
739 b95479a5 Michael Hanselmann
      self.depmgr = _DisabledFakeDependencyManager()
740 b95479a5 Michael Hanselmann
741 be760ba8 Michael Hanselmann
  def IsAcquired(self):
742 be760ba8 Michael Hanselmann
    return self._acquired
743 be760ba8 Michael Hanselmann
744 ebb2a2a3 Michael Hanselmann
  def GetNextUpdate(self):
745 ebb2a2a3 Michael Hanselmann
    return self._updates.pop(0)
746 ebb2a2a3 Michael Hanselmann
747 6a373640 Michael Hanselmann
  def GetNextSubmittedJob(self):
748 6a373640 Michael Hanselmann
    return self._submitted.pop(0)
749 6a373640 Michael Hanselmann
750 be760ba8 Michael Hanselmann
  def acquire(self, shared=0):
751 be760ba8 Michael Hanselmann
    assert shared == 1
752 be760ba8 Michael Hanselmann
    self._acquired = True
753 be760ba8 Michael Hanselmann
754 be760ba8 Michael Hanselmann
  def release(self):
755 be760ba8 Michael Hanselmann
    assert self._acquired
756 be760ba8 Michael Hanselmann
    self._acquired = False
757 be760ba8 Michael Hanselmann
758 ebb2a2a3 Michael Hanselmann
  def UpdateJobUnlocked(self, job, replicate=True):
759 ebb2a2a3 Michael Hanselmann
    assert self._acquired, "Lock not acquired while updating job"
760 ebb2a2a3 Michael Hanselmann
    self._updates.append((job, bool(replicate)))
761 be760ba8 Michael Hanselmann
762 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
763 6a373640 Michael Hanselmann
    assert not self._acquired, "Lock acquired while submitting jobs"
764 6a373640 Michael Hanselmann
    job_ids = [self._submit_count.next() for _ in jobs]
765 6a373640 Michael Hanselmann
    self._submitted.extend(zip(job_ids, jobs))
766 6a373640 Michael Hanselmann
    return job_ids
767 6a373640 Michael Hanselmann
768 942e2262 Michael Hanselmann
  def StopAcceptingJobs(self):
769 942e2262 Michael Hanselmann
    self._accepting_jobs = False
770 942e2262 Michael Hanselmann
771 942e2262 Michael Hanselmann
  def AcceptingJobsUnlocked(self):
772 942e2262 Michael Hanselmann
    return self._accepting_jobs
773 942e2262 Michael Hanselmann
774 be760ba8 Michael Hanselmann
775 be760ba8 Michael Hanselmann
class _FakeExecOpCodeForProc:
776 ebb2a2a3 Michael Hanselmann
  def __init__(self, queue, before_start, after_start):
777 ebb2a2a3 Michael Hanselmann
    self._queue = queue
778 be760ba8 Michael Hanselmann
    self._before_start = before_start
779 be760ba8 Michael Hanselmann
    self._after_start = after_start
780 be760ba8 Michael Hanselmann
781 e4e59de8 Michael Hanselmann
  def __call__(self, op, cbs, timeout=None):
782 be760ba8 Michael Hanselmann
    assert isinstance(op, opcodes.OpTestDummy)
783 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired(), \
784 ebb2a2a3 Michael Hanselmann
           "Queue lock not released when executing opcode"
785 be760ba8 Michael Hanselmann
786 be760ba8 Michael Hanselmann
    if self._before_start:
787 e4e59de8 Michael Hanselmann
      self._before_start(timeout, cbs.CurrentPriority())
788 be760ba8 Michael Hanselmann
789 be760ba8 Michael Hanselmann
    cbs.NotifyStart()
790 be760ba8 Michael Hanselmann
791 be760ba8 Michael Hanselmann
    if self._after_start:
792 be760ba8 Michael Hanselmann
      self._after_start(op, cbs)
793 be760ba8 Michael Hanselmann
794 ebb2a2a3 Michael Hanselmann
    # Check again after the callbacks
795 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired()
796 ebb2a2a3 Michael Hanselmann
797 be760ba8 Michael Hanselmann
    if op.fail:
798 be760ba8 Michael Hanselmann
      raise errors.OpExecError("Error requested (%s)" % op.result)
799 be760ba8 Michael Hanselmann
800 6a373640 Michael Hanselmann
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
801 6a373640 Michael Hanselmann
      return cbs.SubmitManyJobs(op.submit_jobs)
802 6a373640 Michael Hanselmann
803 be760ba8 Michael Hanselmann
    return op.result
804 be760ba8 Michael Hanselmann
805 be760ba8 Michael Hanselmann
806 26d3fd2f Michael Hanselmann
class _JobProcessorTestUtils:
807 be760ba8 Michael Hanselmann
  def _CreateJob(self, queue, job_id, ops):
808 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(queue, job_id, ops, True)
809 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
810 be760ba8 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
811 be760ba8 Michael Hanselmann
    self.assertEqual(len(ops), len(job.ops))
812 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.input == inp
813 be760ba8 Michael Hanselmann
                            for (op, inp) in zip(job.ops, ops)))
814 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
815 be760ba8 Michael Hanselmann
    return job
816 be760ba8 Michael Hanselmann
817 26d3fd2f Michael Hanselmann
818 26d3fd2f Michael Hanselmann
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
819 be760ba8 Michael Hanselmann
  def _GenericCheckJob(self, job):
820 be760ba8 Michael Hanselmann
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
821 be760ba8 Michael Hanselmann
                      for op in job.ops)
822 be760ba8 Michael Hanselmann
823 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
824 be760ba8 Michael Hanselmann
                     [[op.start_timestamp for op in job.ops],
825 be760ba8 Michael Hanselmann
                      [op.exec_timestamp for op in job.ops],
826 be760ba8 Michael Hanselmann
                      [op.end_timestamp for op in job.ops]])
827 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
828 be760ba8 Michael Hanselmann
                     [job.received_timestamp,
829 be760ba8 Michael Hanselmann
                      job.start_timestamp,
830 be760ba8 Michael Hanselmann
                      job.end_timestamp])
831 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
832 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
833 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
834 be760ba8 Michael Hanselmann
835 be760ba8 Michael Hanselmann
  def testSuccess(self):
836 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
837 be760ba8 Michael Hanselmann
838 be760ba8 Michael Hanselmann
    for (job_id, opcount) in [(25351, 1), (6637, 3),
839 be760ba8 Michael Hanselmann
                              (24644, 10), (32207, 100)]:
840 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
841 be760ba8 Michael Hanselmann
             for i in range(opcount)]
842 be760ba8 Michael Hanselmann
843 be760ba8 Michael Hanselmann
      # Create job
844 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
845 be760ba8 Michael Hanselmann
846 f23db633 Michael Hanselmann
      def _BeforeStart(timeout, priority):
847 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
848 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
849 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
850 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
851 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
852 be760ba8 Michael Hanselmann
853 be760ba8 Michael Hanselmann
      def _AfterStart(op, cbs):
854 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
855 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
856 ebb2a2a3 Michael Hanselmann
857 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
858 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
859 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
860 be760ba8 Michael Hanselmann
861 be760ba8 Michael Hanselmann
        # Job is running, cancelling shouldn't be possible
862 be760ba8 Michael Hanselmann
        (success, _) = job.Cancel()
863 be760ba8 Michael Hanselmann
        self.assertFalse(success)
864 be760ba8 Michael Hanselmann
865 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
866 be760ba8 Michael Hanselmann
867 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
868 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
869 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
870 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
871 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
872 be760ba8 Michael Hanselmann
        if idx == len(ops) - 1:
873 be760ba8 Michael Hanselmann
          # Last opcode
874 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
875 be760ba8 Michael Hanselmann
        else:
876 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.DEFER)
877 be760ba8 Michael Hanselmann
878 be760ba8 Michael Hanselmann
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
879 be760ba8 Michael Hanselmann
          self.assert_(job.start_timestamp)
880 be760ba8 Michael Hanselmann
          self.assertFalse(job.end_timestamp)
881 be760ba8 Michael Hanselmann
882 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
883 ebb2a2a3 Michael Hanselmann
884 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
885 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
886 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opresult"]),
887 be760ba8 Michael Hanselmann
                       [[op.input.result for op in job.ops]])
888 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
889 be760ba8 Michael Hanselmann
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
890 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
891 be760ba8 Michael Hanselmann
                              for op in job.ops))
892 be760ba8 Michael Hanselmann
893 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
894 be760ba8 Michael Hanselmann
895 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
896 75d81fc8 Michael Hanselmann
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
897 75d81fc8 Michael Hanselmann
                       jqueue._JobProcessor.FINISHED)
898 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
899 be760ba8 Michael Hanselmann
900 be760ba8 Michael Hanselmann
  def testOpcodeError(self):
901 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
902 be760ba8 Michael Hanselmann
903 be760ba8 Michael Hanselmann
    testdata = [
904 be760ba8 Michael Hanselmann
      (17077, 1, 0, 0),
905 be760ba8 Michael Hanselmann
      (1782, 5, 2, 2),
906 be760ba8 Michael Hanselmann
      (18179, 10, 9, 9),
907 be760ba8 Michael Hanselmann
      (4744, 10, 3, 8),
908 be760ba8 Michael Hanselmann
      (23816, 100, 39, 45),
909 be760ba8 Michael Hanselmann
      ]
910 be760ba8 Michael Hanselmann
911 be760ba8 Michael Hanselmann
    for (job_id, opcount, failfrom, failto) in testdata:
912 be760ba8 Michael Hanselmann
      # Prepare opcodes
913 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
914 be760ba8 Michael Hanselmann
                                 fail=(failfrom <= i and
915 be760ba8 Michael Hanselmann
                                       i <= failto))
916 be760ba8 Michael Hanselmann
             for i in range(opcount)]
917 be760ba8 Michael Hanselmann
918 be760ba8 Michael Hanselmann
      # Create job
919 a06c6ae8 Michael Hanselmann
      job = self._CreateJob(queue, str(job_id), ops)
920 be760ba8 Michael Hanselmann
921 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, None, None)
922 be760ba8 Michael Hanselmann
923 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
924 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
925 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
926 ebb2a2a3 Michael Hanselmann
        # queued to waitlock
927 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
928 ebb2a2a3 Michael Hanselmann
        # waitlock to running
929 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
930 ebb2a2a3 Michael Hanselmann
        # Opcode result
931 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
932 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
933 be760ba8 Michael Hanselmann
934 be760ba8 Michael Hanselmann
        if idx in (failfrom, len(ops) - 1):
935 be760ba8 Michael Hanselmann
          # Last opcode
936 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
937 be760ba8 Michael Hanselmann
          break
938 be760ba8 Michael Hanselmann
939 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
940 be760ba8 Michael Hanselmann
941 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
942 be760ba8 Michael Hanselmann
943 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
944 ebb2a2a3 Michael Hanselmann
945 be760ba8 Michael Hanselmann
      # Check job status
946 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
947 76b62028 Iustin Pop
      self.assertEqual(job.GetInfo(["id"]), [job_id])
948 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
949 be760ba8 Michael Hanselmann
950 be760ba8 Michael Hanselmann
      # Check opcode status
951 be760ba8 Michael Hanselmann
      data = zip(job.ops,
952 be760ba8 Michael Hanselmann
                 job.GetInfo(["opstatus"])[0],
953 be760ba8 Michael Hanselmann
                 job.GetInfo(["opresult"])[0])
954 be760ba8 Michael Hanselmann
955 be760ba8 Michael Hanselmann
      for idx, (op, opstatus, opresult) in enumerate(data):
956 be760ba8 Michael Hanselmann
        if idx < failfrom:
957 be760ba8 Michael Hanselmann
          assert not op.input.fail
958 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
959 be760ba8 Michael Hanselmann
          self.assertEqual(opresult, op.input.result)
960 be760ba8 Michael Hanselmann
        elif idx <= failto:
961 be760ba8 Michael Hanselmann
          assert op.input.fail
962 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
963 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
964 be760ba8 Michael Hanselmann
        else:
965 be760ba8 Michael Hanselmann
          assert not op.input.fail
966 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
967 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
968 be760ba8 Michael Hanselmann
969 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
970 be760ba8 Michael Hanselmann
                              for op in job.ops[:failfrom]))
971 be760ba8 Michael Hanselmann
972 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
973 be760ba8 Michael Hanselmann
974 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
975 75d81fc8 Michael Hanselmann
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
976 75d81fc8 Michael Hanselmann
                       jqueue._JobProcessor.FINISHED)
977 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
978 be760ba8 Michael Hanselmann
979 be760ba8 Michael Hanselmann
  def testCancelWhileInQueue(self):
980 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
981 be760ba8 Michael Hanselmann
982 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
983 be760ba8 Michael Hanselmann
           for i in range(5)]
984 be760ba8 Michael Hanselmann
985 be760ba8 Michael Hanselmann
    # Create job
986 be760ba8 Michael Hanselmann
    job_id = 17045
987 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
988 be760ba8 Michael Hanselmann
989 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
990 be760ba8 Michael Hanselmann
991 be760ba8 Michael Hanselmann
    # Mark as cancelled
992 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
993 be760ba8 Michael Hanselmann
    self.assert_(success)
994 be760ba8 Michael Hanselmann
995 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
996 ebb2a2a3 Michael Hanselmann
997 66bd7445 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
998 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
999 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
1000 be760ba8 Michael Hanselmann
                            for op in job.ops))
1001 be760ba8 Michael Hanselmann
1002 66bd7445 Michael Hanselmann
    # Serialize to check for differences
1003 66bd7445 Michael Hanselmann
    before_proc = job.Serialize()
1004 66bd7445 Michael Hanselmann
1005 66bd7445 Michael Hanselmann
    # Simulate processor called in workerpool
1006 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1007 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1008 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1009 30c945d0 Michael Hanselmann
1010 30c945d0 Michael Hanselmann
    # Check result
1011 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1012 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1013 30c945d0 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
1014 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
1015 30c945d0 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1016 30c945d0 Michael Hanselmann
                                for op in job.ops))
1017 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1018 30c945d0 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1019 30c945d0 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1020 30c945d0 Michael Hanselmann
1021 66bd7445 Michael Hanselmann
    # Must not have changed or written
1022 66bd7445 Michael Hanselmann
    self.assertEqual(before_proc, job.Serialize())
1023 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1024 66bd7445 Michael Hanselmann
1025 30c945d0 Michael Hanselmann
  def testCancelWhileWaitlockInQueue(self):
1026 30c945d0 Michael Hanselmann
    queue = _FakeQueueForProc()
1027 30c945d0 Michael Hanselmann
1028 30c945d0 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1029 30c945d0 Michael Hanselmann
           for i in range(5)]
1030 30c945d0 Michael Hanselmann
1031 30c945d0 Michael Hanselmann
    # Create job
1032 30c945d0 Michael Hanselmann
    job_id = 8645
1033 30c945d0 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1034 30c945d0 Michael Hanselmann
1035 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1036 30c945d0 Michael Hanselmann
1037 47099cd1 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_WAITING
1038 30c945d0 Michael Hanselmann
1039 30c945d0 Michael Hanselmann
    assert len(job.ops) == 5
1040 30c945d0 Michael Hanselmann
1041 47099cd1 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1042 30c945d0 Michael Hanselmann
1043 30c945d0 Michael Hanselmann
    # Mark as cancelling
1044 30c945d0 Michael Hanselmann
    (success, _) = job.Cancel()
1045 30c945d0 Michael Hanselmann
    self.assert_(success)
1046 30c945d0 Michael Hanselmann
1047 30c945d0 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1048 30c945d0 Michael Hanselmann
1049 30c945d0 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1050 30c945d0 Michael Hanselmann
                            for op in job.ops))
1051 30c945d0 Michael Hanselmann
1052 30c945d0 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1053 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1054 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1055 be760ba8 Michael Hanselmann
1056 be760ba8 Michael Hanselmann
    # Check result
1057 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1058 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1059 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
1060 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
1061 be760ba8 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1062 be760ba8 Michael Hanselmann
                                for op in job.ops))
1063 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1064 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1065 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1066 be760ba8 Michael Hanselmann
1067 be760ba8 Michael Hanselmann
  def testCancelWhileWaitlock(self):
1068 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1069 be760ba8 Michael Hanselmann
1070 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1071 be760ba8 Michael Hanselmann
           for i in range(5)]
1072 be760ba8 Michael Hanselmann
1073 be760ba8 Michael Hanselmann
    # Create job
1074 be760ba8 Michael Hanselmann
    job_id = 11009
1075 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1076 be760ba8 Michael Hanselmann
1077 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1078 be760ba8 Michael Hanselmann
1079 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1080 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1081 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1082 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1083 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1084 be760ba8 Michael Hanselmann
1085 be760ba8 Michael Hanselmann
      # Mark as cancelled
1086 be760ba8 Michael Hanselmann
      (success, _) = job.Cancel()
1087 be760ba8 Michael Hanselmann
      self.assert_(success)
1088 be760ba8 Michael Hanselmann
1089 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1090 be760ba8 Michael Hanselmann
                              for op in job.ops))
1091 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1092 be760ba8 Michael Hanselmann
1093 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
1094 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1095 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1096 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1097 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1098 be760ba8 Michael Hanselmann
1099 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1100 be760ba8 Michael Hanselmann
1101 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1102 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1103 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1104 ebb2a2a3 Michael Hanselmann
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1105 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1106 be760ba8 Michael Hanselmann
1107 be760ba8 Michael Hanselmann
    # Check result
1108 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1109 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1110 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
1111 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
1112 be760ba8 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1113 be760ba8 Michael Hanselmann
                                for op in job.ops))
1114 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1115 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1116 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1117 be760ba8 Michael Hanselmann
1118 942e2262 Michael Hanselmann
  def _TestCancelWhileSomething(self, cb):
1119 9e49dfc5 Michael Hanselmann
    queue = _FakeQueueForProc()
1120 9e49dfc5 Michael Hanselmann
1121 9e49dfc5 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1122 9e49dfc5 Michael Hanselmann
           for i in range(5)]
1123 9e49dfc5 Michael Hanselmann
1124 9e49dfc5 Michael Hanselmann
    # Create job
1125 9e49dfc5 Michael Hanselmann
    job_id = 24314
1126 9e49dfc5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1127 9e49dfc5 Michael Hanselmann
1128 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1129 9e49dfc5 Michael Hanselmann
1130 9e49dfc5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1131 9e49dfc5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1132 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1133 9e49dfc5 Michael Hanselmann
1134 9e49dfc5 Michael Hanselmann
      # Mark as cancelled
1135 9e49dfc5 Michael Hanselmann
      (success, _) = job.Cancel()
1136 9e49dfc5 Michael Hanselmann
      self.assert_(success)
1137 9e49dfc5 Michael Hanselmann
1138 9e49dfc5 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1139 9e49dfc5 Michael Hanselmann
                              for op in job.ops))
1140 9e49dfc5 Michael Hanselmann
1141 942e2262 Michael Hanselmann
      cb(queue)
1142 9e49dfc5 Michael Hanselmann
1143 9e49dfc5 Michael Hanselmann
    def _AfterStart(op, cbs):
1144 9e49dfc5 Michael Hanselmann
      self.fail("Should not reach this")
1145 9e49dfc5 Michael Hanselmann
1146 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1147 9e49dfc5 Michael Hanselmann
1148 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1149 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1150 9e49dfc5 Michael Hanselmann
1151 9e49dfc5 Michael Hanselmann
    # Check result
1152 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1153 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1154 9e49dfc5 Michael Hanselmann
    self.assert_(job.start_timestamp)
1155 9e49dfc5 Michael Hanselmann
    self.assert_(job.end_timestamp)
1156 9e49dfc5 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1157 9e49dfc5 Michael Hanselmann
                                for op in job.ops))
1158 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1159 9e49dfc5 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1160 9e49dfc5 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1161 9e49dfc5 Michael Hanselmann
1162 942e2262 Michael Hanselmann
    return queue
1163 942e2262 Michael Hanselmann
1164 942e2262 Michael Hanselmann
  def testCancelWhileWaitlockWithTimeout(self):
1165 942e2262 Michael Hanselmann
    def fn(_):
1166 942e2262 Michael Hanselmann
      # Fake an acquire attempt timing out
1167 942e2262 Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
1168 942e2262 Michael Hanselmann
1169 942e2262 Michael Hanselmann
    self._TestCancelWhileSomething(fn)
1170 942e2262 Michael Hanselmann
1171 942e2262 Michael Hanselmann
  def testCancelDuringQueueShutdown(self):
1172 942e2262 Michael Hanselmann
    queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs())
1173 942e2262 Michael Hanselmann
    self.assertFalse(queue.AcceptingJobsUnlocked())
1174 942e2262 Michael Hanselmann
1175 be760ba8 Michael Hanselmann
  def testCancelWhileRunning(self):
1176 be760ba8 Michael Hanselmann
    # Tests canceling a job with finished opcodes and more, unprocessed ones
1177 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1178 be760ba8 Michael Hanselmann
1179 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1180 be760ba8 Michael Hanselmann
           for i in range(3)]
1181 be760ba8 Michael Hanselmann
1182 be760ba8 Michael Hanselmann
    # Create job
1183 76b62028 Iustin Pop
    job_id = 28492
1184 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1185 be760ba8 Michael Hanselmann
1186 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1187 be760ba8 Michael Hanselmann
1188 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1189 be760ba8 Michael Hanselmann
1190 be760ba8 Michael Hanselmann
    # Run one opcode
1191 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1192 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1193 be760ba8 Michael Hanselmann
1194 be760ba8 Michael Hanselmann
    # Job goes back to queued
1195 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1196 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1197 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1198 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
1199 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1200 be760ba8 Michael Hanselmann
                      ["Res0", None, None]])
1201 be760ba8 Michael Hanselmann
1202 be760ba8 Michael Hanselmann
    # Mark as cancelled
1203 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
1204 be760ba8 Michael Hanselmann
    self.assert_(success)
1205 be760ba8 Michael Hanselmann
1206 be760ba8 Michael Hanselmann
    # Try processing another opcode (this will actually cancel the job)
1207 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1208 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1209 be760ba8 Michael Hanselmann
1210 be760ba8 Michael Hanselmann
    # Check result
1211 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1212 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
1213 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1214 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1215 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1216 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
1217 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
1218 be760ba8 Michael Hanselmann
                      ["Res0", "Job canceled by request",
1219 be760ba8 Michael Hanselmann
                       "Job canceled by request"]])
1220 be760ba8 Michael Hanselmann
1221 942e2262 Michael Hanselmann
  def _TestQueueShutdown(self, queue, opexec, job, runcount):
1222 942e2262 Michael Hanselmann
    self.assertTrue(queue.AcceptingJobsUnlocked())
1223 942e2262 Michael Hanselmann
1224 942e2262 Michael Hanselmann
    # Simulate shutdown
1225 942e2262 Michael Hanselmann
    queue.StopAcceptingJobs()
1226 942e2262 Michael Hanselmann
1227 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1228 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1229 942e2262 Michael Hanselmann
1230 942e2262 Michael Hanselmann
    # Check result
1231 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1232 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
1233 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1234 942e2262 Michael Hanselmann
    self.assertTrue(job.start_timestamp)
1235 942e2262 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
1236 942e2262 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1237 942e2262 Michael Hanselmann
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1238 942e2262 Michael Hanselmann
                               for op in job.ops[:runcount]))
1239 942e2262 Michael Hanselmann
    self.assertFalse(job.ops[runcount].end_timestamp)
1240 942e2262 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1241 942e2262 Michael Hanselmann
                                for op in job.ops[(runcount + 1):]))
1242 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1243 942e2262 Michael Hanselmann
                     [(([constants.OP_STATUS_SUCCESS] * runcount) +
1244 942e2262 Michael Hanselmann
                       ([constants.OP_STATUS_QUEUED] *
1245 942e2262 Michael Hanselmann
                        (len(job.ops) - runcount))),
1246 942e2262 Michael Hanselmann
                      (["Res%s" % i for i in range(runcount)] +
1247 942e2262 Michael Hanselmann
                       ([None] * (len(job.ops) - runcount)))])
1248 942e2262 Michael Hanselmann
1249 942e2262 Michael Hanselmann
    # Must have been written and replicated
1250 942e2262 Michael Hanselmann
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1251 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1252 942e2262 Michael Hanselmann
1253 942e2262 Michael Hanselmann
  def testQueueShutdownWhileRunning(self):
1254 942e2262 Michael Hanselmann
    # Tests shutting down the queue while a job is running
1255 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1256 942e2262 Michael Hanselmann
1257 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1258 942e2262 Michael Hanselmann
           for i in range(3)]
1259 942e2262 Michael Hanselmann
1260 942e2262 Michael Hanselmann
    # Create job
1261 942e2262 Michael Hanselmann
    job_id = 2718211587
1262 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1263 942e2262 Michael Hanselmann
1264 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1265 942e2262 Michael Hanselmann
1266 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1267 942e2262 Michael Hanselmann
1268 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1269 942e2262 Michael Hanselmann
1270 942e2262 Michael Hanselmann
    # Run one opcode
1271 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1272 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1273 942e2262 Michael Hanselmann
1274 942e2262 Michael Hanselmann
    # Job goes back to queued
1275 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1276 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1277 942e2262 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1278 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
1279 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1280 942e2262 Michael Hanselmann
                      ["Res0", None, None]])
1281 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1282 942e2262 Michael Hanselmann
1283 942e2262 Michael Hanselmann
    # Writes for waiting, running and result
1284 942e2262 Michael Hanselmann
    for _ in range(3):
1285 942e2262 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1286 942e2262 Michael Hanselmann
1287 942e2262 Michael Hanselmann
    # Run second opcode
1288 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1289 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1290 942e2262 Michael Hanselmann
1291 942e2262 Michael Hanselmann
    # Job goes back to queued
1292 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1293 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1294 942e2262 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1295 942e2262 Michael Hanselmann
                       constants.OP_STATUS_SUCCESS,
1296 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1297 942e2262 Michael Hanselmann
                      ["Res0", "Res1", None]])
1298 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1299 942e2262 Michael Hanselmann
1300 942e2262 Michael Hanselmann
    # Writes for waiting, running and result
1301 942e2262 Michael Hanselmann
    for _ in range(3):
1302 942e2262 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1303 942e2262 Michael Hanselmann
1304 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 2)
1305 942e2262 Michael Hanselmann
1306 942e2262 Michael Hanselmann
  def testQueueShutdownWithLockTimeout(self):
1307 942e2262 Michael Hanselmann
    # Tests shutting down while a lock acquire times out
1308 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1309 942e2262 Michael Hanselmann
1310 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1311 942e2262 Michael Hanselmann
           for i in range(3)]
1312 942e2262 Michael Hanselmann
1313 942e2262 Michael Hanselmann
    # Create job
1314 942e2262 Michael Hanselmann
    job_id = 1304231178
1315 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1316 942e2262 Michael Hanselmann
1317 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1318 942e2262 Michael Hanselmann
1319 942e2262 Michael Hanselmann
    acquire_timeout = False
1320 942e2262 Michael Hanselmann
1321 942e2262 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1322 942e2262 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1323 942e2262 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1324 942e2262 Michael Hanselmann
      if acquire_timeout:
1325 942e2262 Michael Hanselmann
        raise mcpu.LockAcquireTimeout()
1326 942e2262 Michael Hanselmann
1327 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None)
1328 942e2262 Michael Hanselmann
1329 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1330 942e2262 Michael Hanselmann
1331 942e2262 Michael Hanselmann
    # Run one opcode
1332 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1333 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1334 942e2262 Michael Hanselmann
1335 942e2262 Michael Hanselmann
    # Job goes back to queued
1336 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1337 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1338 942e2262 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1339 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
1340 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1341 942e2262 Michael Hanselmann
                      ["Res0", None, None]])
1342 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1343 942e2262 Michael Hanselmann
1344 942e2262 Michael Hanselmann
    # Writes for waiting, running and result
1345 942e2262 Michael Hanselmann
    for _ in range(3):
1346 942e2262 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1347 942e2262 Michael Hanselmann
1348 942e2262 Michael Hanselmann
    # The next opcode should have expiring lock acquires
1349 942e2262 Michael Hanselmann
    acquire_timeout = True
1350 942e2262 Michael Hanselmann
1351 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 1)
1352 942e2262 Michael Hanselmann
1353 942e2262 Michael Hanselmann
  def testQueueShutdownWhileInQueue(self):
1354 942e2262 Michael Hanselmann
    # This should never happen in reality (no new jobs are started by the
1355 942e2262 Michael Hanselmann
    # workerpool once a shutdown has been initiated), but it's better to test
1356 942e2262 Michael Hanselmann
    # the job processor for this scenario
1357 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1358 942e2262 Michael Hanselmann
1359 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1360 942e2262 Michael Hanselmann
           for i in range(5)]
1361 942e2262 Michael Hanselmann
1362 942e2262 Michael Hanselmann
    # Create job
1363 942e2262 Michael Hanselmann
    job_id = 2031
1364 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1365 942e2262 Michael Hanselmann
1366 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1367 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1368 942e2262 Michael Hanselmann
1369 942e2262 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
1370 942e2262 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
1371 942e2262 Michael Hanselmann
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
1372 942e2262 Michael Hanselmann
                               for op in job.ops))
1373 942e2262 Michael Hanselmann
1374 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1375 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 0)
1376 942e2262 Michael Hanselmann
1377 942e2262 Michael Hanselmann
  def testQueueShutdownWhileWaitlockInQueue(self):
1378 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1379 942e2262 Michael Hanselmann
1380 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1381 942e2262 Michael Hanselmann
           for i in range(5)]
1382 942e2262 Michael Hanselmann
1383 942e2262 Michael Hanselmann
    # Create job
1384 942e2262 Michael Hanselmann
    job_id = 53125685
1385 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1386 942e2262 Michael Hanselmann
1387 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1388 942e2262 Michael Hanselmann
1389 942e2262 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_WAITING
1390 942e2262 Michael Hanselmann
1391 942e2262 Michael Hanselmann
    assert len(job.ops) == 5
1392 942e2262 Michael Hanselmann
1393 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1394 942e2262 Michael Hanselmann
1395 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1396 942e2262 Michael Hanselmann
1397 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1398 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 0)
1399 942e2262 Michael Hanselmann
1400 be760ba8 Michael Hanselmann
  def testPartiallyRun(self):
1401 be760ba8 Michael Hanselmann
    # Tests calling the processor on a job that's been partially run before the
1402 be760ba8 Michael Hanselmann
    # program was restarted
1403 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1404 be760ba8 Michael Hanselmann
1405 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1406 be760ba8 Michael Hanselmann
1407 be760ba8 Michael Hanselmann
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
1408 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1409 be760ba8 Michael Hanselmann
             for i in range(10)]
1410 be760ba8 Michael Hanselmann
1411 be760ba8 Michael Hanselmann
      # Create job
1412 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
1413 be760ba8 Michael Hanselmann
1414 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1415 be760ba8 Michael Hanselmann
1416 be760ba8 Michael Hanselmann
      for _ in range(successcount):
1417 75d81fc8 Michael Hanselmann
        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1418 75d81fc8 Michael Hanselmann
                         jqueue._JobProcessor.DEFER)
1419 be760ba8 Michael Hanselmann
1420 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1421 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
1422 be760ba8 Michael Hanselmann
                       [[constants.OP_STATUS_SUCCESS
1423 be760ba8 Michael Hanselmann
                         for _ in range(successcount)] +
1424 be760ba8 Michael Hanselmann
                        [constants.OP_STATUS_QUEUED
1425 be760ba8 Michael Hanselmann
                         for _ in range(len(ops) - successcount)]])
1426 be760ba8 Michael Hanselmann
1427 03b63608 Michael Hanselmann
      self.assert_(job.ops_iter)
1428 be760ba8 Michael Hanselmann
1429 be760ba8 Michael Hanselmann
      # Serialize and restore (simulates program restart)
1430 8a3cd185 Michael Hanselmann
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1431 03b63608 Michael Hanselmann
      self.assertFalse(newjob.ops_iter)
1432 be760ba8 Michael Hanselmann
      self._TestPartial(newjob, successcount)
1433 be760ba8 Michael Hanselmann
1434 be760ba8 Michael Hanselmann
  def _TestPartial(self, job, successcount):
1435 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1436 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1437 be760ba8 Michael Hanselmann
1438 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1439 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1440 be760ba8 Michael Hanselmann
1441 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops) - successcount)):
1442 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1443 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1444 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1445 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1446 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1447 be760ba8 Michael Hanselmann
1448 be760ba8 Michael Hanselmann
      if remaining == 0:
1449 be760ba8 Michael Hanselmann
        # Last opcode
1450 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1451 be760ba8 Michael Hanselmann
        break
1452 be760ba8 Michael Hanselmann
1453 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1454 be760ba8 Michael Hanselmann
1455 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1456 be760ba8 Michael Hanselmann
1457 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1458 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1459 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1460 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1461 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1462 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1463 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1464 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1465 be760ba8 Michael Hanselmann
                            for op in job.ops))
1466 be760ba8 Michael Hanselmann
1467 be760ba8 Michael Hanselmann
    self._GenericCheckJob(job)
1468 be760ba8 Michael Hanselmann
1469 66bd7445 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1470 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1471 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1472 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1473 be760ba8 Michael Hanselmann
1474 be760ba8 Michael Hanselmann
    # ... also after being restored
1475 8a3cd185 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1476 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1477 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1478 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1479 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1480 be760ba8 Michael Hanselmann
1481 be760ba8 Michael Hanselmann
  def testProcessorOnRunningJob(self):
1482 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1483 be760ba8 Michael Hanselmann
1484 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1485 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1486 be760ba8 Michael Hanselmann
1487 be760ba8 Michael Hanselmann
    # Create job
1488 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 9571, ops)
1489 be760ba8 Michael Hanselmann
1490 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1491 be760ba8 Michael Hanselmann
1492 be760ba8 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
1493 be760ba8 Michael Hanselmann
1494 be760ba8 Michael Hanselmann
    assert len(job.ops) == 1
1495 be760ba8 Michael Hanselmann
1496 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1497 be760ba8 Michael Hanselmann
1498 be760ba8 Michael Hanselmann
    # Calling on running job must fail
1499 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
1500 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job))
1501 be760ba8 Michael Hanselmann
1502 be760ba8 Michael Hanselmann
  def testLogMessages(self):
1503 be760ba8 Michael Hanselmann
    # Tests the "Feedback" callback function
1504 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1505 be760ba8 Michael Hanselmann
1506 be760ba8 Michael Hanselmann
    messages = {
1507 be760ba8 Michael Hanselmann
      1: [
1508 be760ba8 Michael Hanselmann
        (None, "Hello"),
1509 be760ba8 Michael Hanselmann
        (None, "World"),
1510 be760ba8 Michael Hanselmann
        (constants.ELOG_MESSAGE, "there"),
1511 be760ba8 Michael Hanselmann
        ],
1512 be760ba8 Michael Hanselmann
      4: [
1513 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1514 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1515 be760ba8 Michael Hanselmann
        ],
1516 be760ba8 Michael Hanselmann
      }
1517 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1518 be760ba8 Michael Hanselmann
                               messages=messages.get(i, []))
1519 be760ba8 Michael Hanselmann
           for i in range(5)]
1520 be760ba8 Michael Hanselmann
1521 be760ba8 Michael Hanselmann
    # Create job
1522 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 29386, ops)
1523 be760ba8 Michael Hanselmann
1524 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1525 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1526 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1527 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1528 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1529 be760ba8 Michael Hanselmann
1530 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
1531 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1532 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1533 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1534 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1535 be760ba8 Michael Hanselmann
1536 be760ba8 Michael Hanselmann
      self.assertRaises(AssertionError, cbs.Feedback,
1537 be760ba8 Michael Hanselmann
                        "too", "many", "arguments")
1538 be760ba8 Michael Hanselmann
1539 be760ba8 Michael Hanselmann
      for (log_type, msg) in op.messages:
1540 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1541 be760ba8 Michael Hanselmann
        if log_type:
1542 be760ba8 Michael Hanselmann
          cbs.Feedback(log_type, msg)
1543 be760ba8 Michael Hanselmann
        else:
1544 be760ba8 Michael Hanselmann
          cbs.Feedback(msg)
1545 ebb2a2a3 Michael Hanselmann
        # Check for job update without replication
1546 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1547 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1548 be760ba8 Michael Hanselmann
1549 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1550 be760ba8 Michael Hanselmann
1551 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops))):
1552 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1553 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1554 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1555 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1556 be760ba8 Michael Hanselmann
1557 be760ba8 Michael Hanselmann
      if remaining == 0:
1558 be760ba8 Michael Hanselmann
        # Last opcode
1559 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1560 be760ba8 Michael Hanselmann
        break
1561 be760ba8 Michael Hanselmann
1562 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1563 be760ba8 Michael Hanselmann
1564 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1565 be760ba8 Michael Hanselmann
1566 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1567 ebb2a2a3 Michael Hanselmann
1568 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1569 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1570 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1571 be760ba8 Michael Hanselmann
1572 be760ba8 Michael Hanselmann
    logmsgcount = sum(len(m) for m in messages.values())
1573 be760ba8 Michael Hanselmann
1574 be760ba8 Michael Hanselmann
    self._CheckLogMessages(job, logmsgcount)
1575 be760ba8 Michael Hanselmann
1576 be760ba8 Michael Hanselmann
    # Serialize and restore (simulates program restart)
1577 8a3cd185 Michael Hanselmann
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1578 be760ba8 Michael Hanselmann
    self._CheckLogMessages(newjob, logmsgcount)
1579 be760ba8 Michael Hanselmann
1580 be760ba8 Michael Hanselmann
    # Check each message
1581 be760ba8 Michael Hanselmann
    prevserial = -1
1582 be760ba8 Michael Hanselmann
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1583 be760ba8 Michael Hanselmann
      for (serial, timestamp, log_type, msg) in oplog:
1584 be760ba8 Michael Hanselmann
        (exptype, expmsg) = messages.get(idx).pop(0)
1585 be760ba8 Michael Hanselmann
        if exptype:
1586 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, exptype)
1587 be760ba8 Michael Hanselmann
        else:
1588 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1589 be760ba8 Michael Hanselmann
        self.assertEqual(expmsg, msg)
1590 be760ba8 Michael Hanselmann
        self.assert_(serial > prevserial)
1591 be760ba8 Michael Hanselmann
        prevserial = serial
1592 be760ba8 Michael Hanselmann
1593 be760ba8 Michael Hanselmann
  def _CheckLogMessages(self, job, count):
1594 be760ba8 Michael Hanselmann
    # Check serial
1595 be760ba8 Michael Hanselmann
    self.assertEqual(job.log_serial, count)
1596 be760ba8 Michael Hanselmann
1597 be760ba8 Michael Hanselmann
    # No filter
1598 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(None),
1599 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1600 be760ba8 Michael Hanselmann
                      for entry in entries])
1601 be760ba8 Michael Hanselmann
1602 be760ba8 Michael Hanselmann
    # Filter with serial
1603 be760ba8 Michael Hanselmann
    assert count > 3
1604 be760ba8 Michael Hanselmann
    self.assert_(job.GetLogEntries(3))
1605 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(3),
1606 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1607 be760ba8 Michael Hanselmann
                      for entry in entries][3:])
1608 be760ba8 Michael Hanselmann
1609 be760ba8 Michael Hanselmann
    # No log message after highest serial
1610 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count))
1611 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count + 3))
1612 be760ba8 Michael Hanselmann
1613 6a373640 Michael Hanselmann
  def testSubmitManyJobs(self):
1614 6a373640 Michael Hanselmann
    queue = _FakeQueueForProc()
1615 6a373640 Michael Hanselmann
1616 6a373640 Michael Hanselmann
    job_id = 15656
1617 6a373640 Michael Hanselmann
    ops = [
1618 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1619 6a373640 Michael Hanselmann
                          submit_jobs=[]),
1620 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1621 6a373640 Michael Hanselmann
                          submit_jobs=[
1622 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1623 6a373640 Michael Hanselmann
                            ]),
1624 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False,
1625 6a373640 Michael Hanselmann
                          submit_jobs=[
1626 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1627 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1628 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1629 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1630 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1631 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1632 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1633 6a373640 Michael Hanselmann
                            ]),
1634 6a373640 Michael Hanselmann
      ]
1635 6a373640 Michael Hanselmann
1636 6a373640 Michael Hanselmann
    # Create job
1637 6a373640 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1638 6a373640 Michael Hanselmann
1639 6a373640 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1640 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1641 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1642 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1643 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1644 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1645 6a373640 Michael Hanselmann
1646 6a373640 Michael Hanselmann
    def _AfterStart(op, cbs):
1647 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1648 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1649 6a373640 Michael Hanselmann
1650 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1651 6a373640 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1652 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1653 6a373640 Michael Hanselmann
1654 6a373640 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1655 6a373640 Michael Hanselmann
      (success, _) = job.Cancel()
1656 6a373640 Michael Hanselmann
      self.assertFalse(success)
1657 6a373640 Michael Hanselmann
1658 6a373640 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1659 6a373640 Michael Hanselmann
1660 6a373640 Michael Hanselmann
    for idx in range(len(ops)):
1661 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1662 6a373640 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1663 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1664 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1665 6a373640 Michael Hanselmann
      if idx == len(ops) - 1:
1666 6a373640 Michael Hanselmann
        # Last opcode
1667 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1668 6a373640 Michael Hanselmann
      else:
1669 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
1670 6a373640 Michael Hanselmann
1671 6a373640 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1672 6a373640 Michael Hanselmann
        self.assert_(job.start_timestamp)
1673 6a373640 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1674 6a373640 Michael Hanselmann
1675 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1676 6a373640 Michael Hanselmann
1677 6a373640 Michael Hanselmann
    for idx, submitted_ops in enumerate(job_ops
1678 6a373640 Michael Hanselmann
                                        for op in ops
1679 6a373640 Michael Hanselmann
                                        for job_ops in op.submit_jobs):
1680 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextSubmittedJob(),
1681 6a373640 Michael Hanselmann
                       (1000 + idx, submitted_ops))
1682 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1683 6a373640 Michael Hanselmann
1684 6a373640 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1685 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1686 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1687 6a373640 Michael Hanselmann
                     [[[], [1000], [1001, 1002, 1003]]])
1688 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1689 6a373640 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1690 6a373640 Michael Hanselmann
1691 6a373640 Michael Hanselmann
    self._GenericCheckJob(job)
1692 6a373640 Michael Hanselmann
1693 1e6d5750 Iustin Pop
    # Calling the processor on a finished job should be a no-op
1694 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1695 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1696 1e6d5750 Iustin Pop
    self.assertRaises(IndexError, queue.GetNextUpdate)
1697 6a373640 Michael Hanselmann
1698 b95479a5 Michael Hanselmann
  def testJobDependency(self):
1699 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1700 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1701 b95479a5 Michael Hanselmann
1702 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1703 b95479a5 Michael Hanselmann
1704 b95479a5 Michael Hanselmann
    prev_job_id = 22113
1705 b95479a5 Michael Hanselmann
    prev_job_id2 = 28102
1706 b95479a5 Michael Hanselmann
    job_id = 29929
1707 b95479a5 Michael Hanselmann
    ops = [
1708 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1709 b95479a5 Michael Hanselmann
                          depends=[
1710 b95479a5 Michael Hanselmann
                            [prev_job_id2, None],
1711 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1712 b95479a5 Michael Hanselmann
                            ]),
1713 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False),
1714 b95479a5 Michael Hanselmann
      ]
1715 b95479a5 Michael Hanselmann
1716 b95479a5 Michael Hanselmann
    # Create job
1717 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1718 b95479a5 Michael Hanselmann
1719 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1720 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1721 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1722 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1723 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1724 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1725 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1726 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1727 b95479a5 Michael Hanselmann
1728 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1729 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1730 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1731 b95479a5 Michael Hanselmann
1732 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1733 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1734 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1735 b95479a5 Michael Hanselmann
1736 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1737 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1738 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1739 b95479a5 Michael Hanselmann
1740 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1741 b95479a5 Michael Hanselmann
1742 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1743 b95479a5 Michael Hanselmann
1744 b95479a5 Michael Hanselmann
    counter = itertools.count()
1745 b95479a5 Michael Hanselmann
    while True:
1746 b95479a5 Michael Hanselmann
      attempt = counter.next()
1747 b95479a5 Michael Hanselmann
1748 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1749 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1750 b95479a5 Michael Hanselmann
1751 b95479a5 Michael Hanselmann
      if attempt < 2:
1752 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1753 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1754 b95479a5 Michael Hanselmann
      elif attempt == 2:
1755 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1756 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1757 b95479a5 Michael Hanselmann
        # The processor will ask for the next dependency immediately
1758 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1759 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1760 b95479a5 Michael Hanselmann
      elif attempt < 5:
1761 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1762 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1763 b95479a5 Michael Hanselmann
      elif attempt == 5:
1764 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1765 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1766 b95479a5 Michael Hanselmann
      if attempt == 2:
1767 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 2)
1768 b95479a5 Michael Hanselmann
      elif attempt > 5:
1769 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1770 b95479a5 Michael Hanselmann
      else:
1771 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1772 b95479a5 Michael Hanselmann
1773 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1774 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt >= 5:
1775 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1776 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1777 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1778 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1779 b95479a5 Michael Hanselmann
1780 b95479a5 Michael Hanselmann
      if attempt < 5:
1781 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1782 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1783 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1784 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1785 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1786 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1787 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1788 b95479a5 Michael Hanselmann
        continue
1789 b95479a5 Michael Hanselmann
1790 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1791 b95479a5 Michael Hanselmann
        # Last opcode
1792 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1793 b95479a5 Michael Hanselmann
        break
1794 b95479a5 Michael Hanselmann
1795 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1796 b95479a5 Michael Hanselmann
1797 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1798 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1799 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1800 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1801 b95479a5 Michael Hanselmann
1802 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1803 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1804 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1805 b95479a5 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1806 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1807 b95479a5 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1808 b95479a5 Michael Hanselmann
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1809 b95479a5 Michael Hanselmann
                               for op in job.ops))
1810 b95479a5 Michael Hanselmann
1811 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1812 b95479a5 Michael Hanselmann
1813 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1814 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1815 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1816 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountWaitingJobs())
1817 b95479a5 Michael Hanselmann
1818 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1819 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1820 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1821 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1822 b95479a5 Michael Hanselmann
1823 b95479a5 Michael Hanselmann
  def testJobDependencyCancel(self):
1824 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1825 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1826 b95479a5 Michael Hanselmann
1827 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1828 b95479a5 Michael Hanselmann
1829 b95479a5 Michael Hanselmann
    prev_job_id = 13623
1830 b95479a5 Michael Hanselmann
    job_id = 30876
1831 b95479a5 Michael Hanselmann
    ops = [
1832 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1833 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1834 b95479a5 Michael Hanselmann
                          depends=[
1835 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1836 b95479a5 Michael Hanselmann
                            ]),
1837 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1838 b95479a5 Michael Hanselmann
      ]
1839 b95479a5 Michael Hanselmann
1840 b95479a5 Michael Hanselmann
    # Create job
1841 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1842 b95479a5 Michael Hanselmann
1843 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1844 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1845 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1846 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1847 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1848 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1849 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1850 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1851 b95479a5 Michael Hanselmann
1852 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1853 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1854 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1855 b95479a5 Michael Hanselmann
1856 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1857 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1858 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1859 b95479a5 Michael Hanselmann
1860 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1861 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1862 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1863 b95479a5 Michael Hanselmann
1864 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1865 b95479a5 Michael Hanselmann
1866 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1867 b95479a5 Michael Hanselmann
1868 b95479a5 Michael Hanselmann
    counter = itertools.count()
1869 b95479a5 Michael Hanselmann
    while True:
1870 b95479a5 Michael Hanselmann
      attempt = counter.next()
1871 b95479a5 Michael Hanselmann
1872 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1873 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1874 b95479a5 Michael Hanselmann
1875 b95479a5 Michael Hanselmann
      if attempt == 0:
1876 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1877 b95479a5 Michael Hanselmann
        pass
1878 b95479a5 Michael Hanselmann
      elif attempt < 4:
1879 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1880 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1881 b95479a5 Michael Hanselmann
      elif attempt == 4:
1882 b95479a5 Michael Hanselmann
        # Other job was cancelled
1883 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1884 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1885 b95479a5 Michael Hanselmann
1886 b95479a5 Michael Hanselmann
      if attempt == 0:
1887 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1888 b95479a5 Michael Hanselmann
      else:
1889 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1890 b95479a5 Michael Hanselmann
1891 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1892 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
1893 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1894 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1895 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1896 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1897 b95479a5 Michael Hanselmann
1898 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
1899 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1900 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1901 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1902 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1903 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1904 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1905 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1906 b95479a5 Michael Hanselmann
        continue
1907 b95479a5 Michael Hanselmann
1908 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1909 b95479a5 Michael Hanselmann
        # Last opcode
1910 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1911 b95479a5 Michael Hanselmann
        break
1912 b95479a5 Michael Hanselmann
1913 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1914 b95479a5 Michael Hanselmann
1915 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1916 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1917 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1918 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1919 b95479a5 Michael Hanselmann
1920 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1921 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1922 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1923 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1924 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
1925 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
1926 b95479a5 Michael Hanselmann
                      ["Res0", "Job canceled by request",
1927 b95479a5 Michael Hanselmann
                       "Job canceled by request"]])
1928 b95479a5 Michael Hanselmann
1929 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1930 b95479a5 Michael Hanselmann
1931 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1932 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1933 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1934 b95479a5 Michael Hanselmann
1935 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1936 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1937 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1938 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1939 b95479a5 Michael Hanselmann
1940 b95479a5 Michael Hanselmann
  def testJobDependencyWrongstatus(self):
1941 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1942 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1943 b95479a5 Michael Hanselmann
1944 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1945 b95479a5 Michael Hanselmann
1946 b95479a5 Michael Hanselmann
    prev_job_id = 9741
1947 b95479a5 Michael Hanselmann
    job_id = 11763
1948 b95479a5 Michael Hanselmann
    ops = [
1949 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1950 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1951 b95479a5 Michael Hanselmann
                          depends=[
1952 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1953 b95479a5 Michael Hanselmann
                            ]),
1954 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1955 b95479a5 Michael Hanselmann
      ]
1956 b95479a5 Michael Hanselmann
1957 b95479a5 Michael Hanselmann
    # Create job
1958 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1959 b95479a5 Michael Hanselmann
1960 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1961 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1962 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1963 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1964 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1965 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1966 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1967 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1968 b95479a5 Michael Hanselmann
1969 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1970 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1971 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1972 b95479a5 Michael Hanselmann
1973 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1974 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1975 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1976 b95479a5 Michael Hanselmann
1977 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1978 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1979 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1980 b95479a5 Michael Hanselmann
1981 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1982 b95479a5 Michael Hanselmann
1983 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1984 b95479a5 Michael Hanselmann
1985 b95479a5 Michael Hanselmann
    counter = itertools.count()
1986 b95479a5 Michael Hanselmann
    while True:
1987 b95479a5 Michael Hanselmann
      attempt = counter.next()
1988 b95479a5 Michael Hanselmann
1989 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1990 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1991 b95479a5 Michael Hanselmann
1992 b95479a5 Michael Hanselmann
      if attempt == 0:
1993 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1994 b95479a5 Michael Hanselmann
        pass
1995 b95479a5 Michael Hanselmann
      elif attempt < 4:
1996 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1997 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1998 b95479a5 Michael Hanselmann
      elif attempt == 4:
1999 b95479a5 Michael Hanselmann
        # Other job failed
2000 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
2001 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
2002 b95479a5 Michael Hanselmann
2003 b95479a5 Michael Hanselmann
      if attempt == 0:
2004 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
2005 b95479a5 Michael Hanselmann
      else:
2006 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
2007 b95479a5 Michael Hanselmann
2008 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
2009 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
2010 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
2011 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
2012 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
2013 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
2014 b95479a5 Michael Hanselmann
2015 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
2016 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
2017 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
2018 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
2019 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2020 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
2021 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
2022 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
2023 b95479a5 Michael Hanselmann
        continue
2024 b95479a5 Michael Hanselmann
2025 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
2026 b95479a5 Michael Hanselmann
        # Last opcode
2027 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
2028 b95479a5 Michael Hanselmann
        break
2029 b95479a5 Michael Hanselmann
2030 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
2031 b95479a5 Michael Hanselmann
2032 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
2033 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2034 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
2035 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
2036 b95479a5 Michael Hanselmann
2037 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
2038 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
2039 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
2040 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
2041 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR,
2042 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR]]),
2043 b95479a5 Michael Hanselmann
2044 b95479a5 Michael Hanselmann
    (opresult, ) = job.GetInfo(["opresult"])
2045 b95479a5 Michael Hanselmann
    self.assertEqual(len(opresult), len(ops))
2046 b95479a5 Michael Hanselmann
    self.assertEqual(opresult[0], "Res0")
2047 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[1]))
2048 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[2]))
2049 b95479a5 Michael Hanselmann
2050 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
2051 b95479a5 Michael Hanselmann
2052 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
2053 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2054 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
2055 b95479a5 Michael Hanselmann
2056 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
2057 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
2058 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
2059 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
2060 b95479a5 Michael Hanselmann
2061 be760ba8 Michael Hanselmann
2062 df5a5730 Michael Hanselmann
class TestEvaluateJobProcessorResult(unittest.TestCase):
2063 df5a5730 Michael Hanselmann
  def testFinished(self):
2064 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2065 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(30953)
2066 df5a5730 Michael Hanselmann
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2067 df5a5730 Michael Hanselmann
                                       jqueue._JobProcessor.FINISHED)
2068 df5a5730 Michael Hanselmann
    self.assertEqual(depmgr.GetNextNotification(), job.id)
2069 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2070 df5a5730 Michael Hanselmann
2071 df5a5730 Michael Hanselmann
  def testDefer(self):
2072 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2073 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(11326, priority=5463)
2074 df5a5730 Michael Hanselmann
    try:
2075 df5a5730 Michael Hanselmann
      jqueue._EvaluateJobProcessorResult(depmgr, job,
2076 df5a5730 Michael Hanselmann
                                         jqueue._JobProcessor.DEFER)
2077 df5a5730 Michael Hanselmann
    except workerpool.DeferTask, err:
2078 df5a5730 Michael Hanselmann
      self.assertEqual(err.priority, 5463)
2079 df5a5730 Michael Hanselmann
    else:
2080 df5a5730 Michael Hanselmann
      self.fail("Didn't raise exception")
2081 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2082 df5a5730 Michael Hanselmann
2083 df5a5730 Michael Hanselmann
  def testWaitdep(self):
2084 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2085 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(21317)
2086 df5a5730 Michael Hanselmann
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2087 df5a5730 Michael Hanselmann
                                       jqueue._JobProcessor.WAITDEP)
2088 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2089 df5a5730 Michael Hanselmann
2090 df5a5730 Michael Hanselmann
  def testOther(self):
2091 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2092 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(5813)
2093 df5a5730 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
2094 df5a5730 Michael Hanselmann
                      jqueue._EvaluateJobProcessorResult,
2095 df5a5730 Michael Hanselmann
                      depmgr, job, "Other result")
2096 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2097 df5a5730 Michael Hanselmann
2098 df5a5730 Michael Hanselmann
2099 26d3fd2f Michael Hanselmann
class _FakeTimeoutStrategy:
2100 26d3fd2f Michael Hanselmann
  def __init__(self, timeouts):
2101 26d3fd2f Michael Hanselmann
    self.timeouts = timeouts
2102 26d3fd2f Michael Hanselmann
    self.attempts = 0
2103 26d3fd2f Michael Hanselmann
    self.last_timeout = None
2104 26d3fd2f Michael Hanselmann
2105 26d3fd2f Michael Hanselmann
  def NextAttempt(self):
2106 26d3fd2f Michael Hanselmann
    self.attempts += 1
2107 26d3fd2f Michael Hanselmann
    if self.timeouts:
2108 26d3fd2f Michael Hanselmann
      timeout = self.timeouts.pop(0)
2109 26d3fd2f Michael Hanselmann
    else:
2110 26d3fd2f Michael Hanselmann
      timeout = None
2111 26d3fd2f Michael Hanselmann
    self.last_timeout = timeout
2112 26d3fd2f Michael Hanselmann
    return timeout
2113 26d3fd2f Michael Hanselmann
2114 26d3fd2f Michael Hanselmann
2115 26d3fd2f Michael Hanselmann
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
2116 26d3fd2f Michael Hanselmann
  def setUp(self):
2117 26d3fd2f Michael Hanselmann
    self.queue = _FakeQueueForProc()
2118 26d3fd2f Michael Hanselmann
    self.job = None
2119 26d3fd2f Michael Hanselmann
    self.curop = None
2120 26d3fd2f Michael Hanselmann
    self.opcounter = None
2121 26d3fd2f Michael Hanselmann
    self.timeout_strategy = None
2122 26d3fd2f Michael Hanselmann
    self.retries = 0
2123 26d3fd2f Michael Hanselmann
    self.prev_tsop = None
2124 26d3fd2f Michael Hanselmann
    self.prev_prio = None
2125 5fd6b694 Michael Hanselmann
    self.prev_status = None
2126 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = None
2127 26d3fd2f Michael Hanselmann
    self.gave_lock = None
2128 26d3fd2f Michael Hanselmann
    self.done_lock_before_blocking = False
2129 26d3fd2f Michael Hanselmann
2130 f23db633 Michael Hanselmann
  def _BeforeStart(self, timeout, priority):
2131 26d3fd2f Michael Hanselmann
    job = self.job
2132 26d3fd2f Michael Hanselmann
2133 5fd6b694 Michael Hanselmann
    # If status has changed, job must've been written
2134 5fd6b694 Michael Hanselmann
    if self.prev_status != self.job.ops[self.curop].status:
2135 5fd6b694 Michael Hanselmann
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2136 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2137 5fd6b694 Michael Hanselmann
2138 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
2139 47099cd1 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2140 26d3fd2f Michael Hanselmann
2141 26d3fd2f Michael Hanselmann
    ts = self.timeout_strategy
2142 26d3fd2f Michael Hanselmann
2143 26d3fd2f Michael Hanselmann
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
2144 26d3fd2f Michael Hanselmann
    self.assertEqual(timeout, ts.last_timeout)
2145 f23db633 Michael Hanselmann
    self.assertEqual(priority, job.ops[self.curop].priority)
2146 26d3fd2f Michael Hanselmann
2147 26d3fd2f Michael Hanselmann
    self.gave_lock = True
2148 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = priority
2149 26d3fd2f Michael Hanselmann
2150 26d3fd2f Michael Hanselmann
    if (self.curop == 3 and
2151 26d3fd2f Michael Hanselmann
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
2152 26d3fd2f Michael Hanselmann
      # Give locks before running into blocking acquire
2153 26d3fd2f Michael Hanselmann
      assert self.retries == 7
2154 26d3fd2f Michael Hanselmann
      self.retries = 0
2155 26d3fd2f Michael Hanselmann
      self.done_lock_before_blocking = True
2156 26d3fd2f Michael Hanselmann
      return
2157 26d3fd2f Michael Hanselmann
2158 26d3fd2f Michael Hanselmann
    if self.retries > 0:
2159 26d3fd2f Michael Hanselmann
      self.assert_(timeout is not None)
2160 26d3fd2f Michael Hanselmann
      self.retries -= 1
2161 26d3fd2f Michael Hanselmann
      self.gave_lock = False
2162 26d3fd2f Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
2163 26d3fd2f Michael Hanselmann
2164 26d3fd2f Michael Hanselmann
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
2165 26d3fd2f Michael Hanselmann
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
2166 26d3fd2f Michael Hanselmann
      assert not ts.timeouts
2167 26d3fd2f Michael Hanselmann
      self.assert_(timeout is None)
2168 26d3fd2f Michael Hanselmann
2169 26d3fd2f Michael Hanselmann
  def _AfterStart(self, op, cbs):
2170 26d3fd2f Michael Hanselmann
    job = self.job
2171 26d3fd2f Michael Hanselmann
2172 5fd6b694 Michael Hanselmann
    # Setting to "running" requires an update
2173 ebb2a2a3 Michael Hanselmann
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2174 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2175 5fd6b694 Michael Hanselmann
2176 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
2177 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
2178 26d3fd2f Michael Hanselmann
2179 26d3fd2f Michael Hanselmann
    # Job is running, cancelling shouldn't be possible
2180 26d3fd2f Michael Hanselmann
    (success, _) = job.Cancel()
2181 26d3fd2f Michael Hanselmann
    self.assertFalse(success)
2182 26d3fd2f Michael Hanselmann
2183 26d3fd2f Michael Hanselmann
  def _NextOpcode(self):
2184 26d3fd2f Michael Hanselmann
    self.curop = self.opcounter.next()
2185 26d3fd2f Michael Hanselmann
    self.prev_prio = self.job.ops[self.curop].priority
2186 5fd6b694 Michael Hanselmann
    self.prev_status = self.job.ops[self.curop].status
2187 26d3fd2f Michael Hanselmann
2188 26d3fd2f Michael Hanselmann
  def _NewTimeoutStrategy(self):
2189 26d3fd2f Michael Hanselmann
    job = self.job
2190 26d3fd2f Michael Hanselmann
2191 26d3fd2f Michael Hanselmann
    self.assertEqual(self.retries, 0)
2192 26d3fd2f Michael Hanselmann
2193 26d3fd2f Michael Hanselmann
    if self.prev_tsop == self.curop:
2194 26d3fd2f Michael Hanselmann
      # Still on the same opcode, priority must've been increased
2195 26d3fd2f Michael Hanselmann
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
2196 26d3fd2f Michael Hanselmann
2197 26d3fd2f Michael Hanselmann
    if self.curop == 1:
2198 26d3fd2f Michael Hanselmann
      # Normal retry
2199 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
2200 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts) - 1
2201 26d3fd2f Michael Hanselmann
2202 26d3fd2f Michael Hanselmann
    elif self.curop == 2:
2203 26d3fd2f Michael Hanselmann
      # Let this run into a blocking acquire
2204 26d3fd2f Michael Hanselmann
      timeouts = range(11, 61, 12)
2205 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
2206 26d3fd2f Michael Hanselmann
2207 26d3fd2f Michael Hanselmann
    elif self.curop == 3:
2208 26d3fd2f Michael Hanselmann
      # Wait for priority to increase, but give lock before blocking acquire
2209 26d3fd2f Michael Hanselmann
      timeouts = range(12, 100, 14)
2210 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
2211 26d3fd2f Michael Hanselmann
2212 26d3fd2f Michael Hanselmann
      self.assertFalse(self.done_lock_before_blocking)
2213 26d3fd2f Michael Hanselmann
2214 26d3fd2f Michael Hanselmann
    elif self.curop == 4:
2215 26d3fd2f Michael Hanselmann
      self.assert_(self.done_lock_before_blocking)
2216 26d3fd2f Michael Hanselmann
2217 26d3fd2f Michael Hanselmann
      # Timeouts, but no need to retry
2218 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
2219 26d3fd2f Michael Hanselmann
      self.retries = 0
2220 26d3fd2f Michael Hanselmann
2221 26d3fd2f Michael Hanselmann
    elif self.curop == 5:
2222 26d3fd2f Michael Hanselmann
      # Normal retry
2223 26d3fd2f Michael Hanselmann
      timeouts = range(19, 100, 11)
2224 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
2225 26d3fd2f Michael Hanselmann
2226 26d3fd2f Michael Hanselmann
    else:
2227 26d3fd2f Michael Hanselmann
      timeouts = []
2228 26d3fd2f Michael Hanselmann
      self.retries = 0
2229 26d3fd2f Michael Hanselmann
2230 26d3fd2f Michael Hanselmann
    assert len(job.ops) == 10
2231 26d3fd2f Michael Hanselmann
    assert self.retries <= len(timeouts)
2232 26d3fd2f Michael Hanselmann
2233 26d3fd2f Michael Hanselmann
    ts = _FakeTimeoutStrategy(timeouts)
2234 26d3fd2f Michael Hanselmann
2235 26d3fd2f Michael Hanselmann
    self.timeout_strategy = ts
2236 26d3fd2f Michael Hanselmann
    self.prev_tsop = self.curop
2237 26d3fd2f Michael Hanselmann
    self.prev_prio = job.ops[self.curop].priority
2238 26d3fd2f Michael Hanselmann
2239 26d3fd2f Michael Hanselmann
    return ts
2240 26d3fd2f Michael Hanselmann
2241 26d3fd2f Michael Hanselmann
  def testTimeout(self):
2242 26d3fd2f Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2243 26d3fd2f Michael Hanselmann
           for i in range(10)]
2244 26d3fd2f Michael Hanselmann
2245 26d3fd2f Michael Hanselmann
    # Create job
2246 26d3fd2f Michael Hanselmann
    job_id = 15801
2247 26d3fd2f Michael Hanselmann
    job = self._CreateJob(self.queue, job_id, ops)
2248 26d3fd2f Michael Hanselmann
    self.job = job
2249 26d3fd2f Michael Hanselmann
2250 26d3fd2f Michael Hanselmann
    self.opcounter = itertools.count(0)
2251 26d3fd2f Michael Hanselmann
2252 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
2253 ebb2a2a3 Michael Hanselmann
                                    self._AfterStart)
2254 26d3fd2f Michael Hanselmann
    tsf = self._NewTimeoutStrategy
2255 26d3fd2f Michael Hanselmann
2256 26d3fd2f Michael Hanselmann
    self.assertFalse(self.done_lock_before_blocking)
2257 26d3fd2f Michael Hanselmann
2258 5fd6b694 Michael Hanselmann
    while True:
2259 26d3fd2f Michael Hanselmann
      proc = jqueue._JobProcessor(self.queue, opexec, job,
2260 26d3fd2f Michael Hanselmann
                                  _timeout_strategy_factory=tsf)
2261 26d3fd2f Michael Hanselmann
2262 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
2263 5fd6b694 Michael Hanselmann
2264 5fd6b694 Michael Hanselmann
      if self.curop is not None:
2265 5fd6b694 Michael Hanselmann
        self.prev_status = self.job.ops[self.curop].status
2266 5fd6b694 Michael Hanselmann
2267 5fd6b694 Michael Hanselmann
      self.lock_acq_prio = None
2268 5fd6b694 Michael Hanselmann
2269 26d3fd2f Michael Hanselmann
      result = proc(_nextop_fn=self._NextOpcode)
2270 5fd6b694 Michael Hanselmann
      assert self.curop is not None
2271 5fd6b694 Michael Hanselmann
2272 3c631ea2 Michael Hanselmann
      # Input priority should never be set or modified
2273 3c631ea2 Michael Hanselmann
      self.assertFalse(compat.any(hasattr(op.input, "priority")
2274 3c631ea2 Michael Hanselmann
                                  for op in job.ops))
2275 3c631ea2 Michael Hanselmann
2276 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
2277 5fd6b694 Michael Hanselmann
        # Got lock and/or job is done, result must've been written
2278 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
2279 5fd6b694 Michael Hanselmann
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2280 5fd6b694 Michael Hanselmann
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
2281 5fd6b694 Michael Hanselmann
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
2282 5fd6b694 Michael Hanselmann
        self.assert_(job.ops[self.curop].exec_timestamp)
2283 5fd6b694 Michael Hanselmann
2284 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
2285 26d3fd2f Michael Hanselmann
        self.assertFalse(job.cur_opctx)
2286 26d3fd2f Michael Hanselmann
        break
2287 26d3fd2f Michael Hanselmann
2288 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
2289 26d3fd2f Michael Hanselmann
2290 5fd6b694 Michael Hanselmann
      if self.curop == 0:
2291 5fd6b694 Michael Hanselmann
        self.assertEqual(job.ops[self.curop].start_timestamp,
2292 5fd6b694 Michael Hanselmann
                         job.start_timestamp)
2293 5fd6b694 Michael Hanselmann
2294 26d3fd2f Michael Hanselmann
      if self.gave_lock:
2295 5fd6b694 Michael Hanselmann
        # Opcode finished, but job not yet done
2296 5fd6b694 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2297 26d3fd2f Michael Hanselmann
      else:
2298 5fd6b694 Michael Hanselmann
        # Did not get locks
2299 26d3fd2f Michael Hanselmann
        self.assert_(job.cur_opctx)
2300 26d3fd2f Michael Hanselmann
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
2301 26d3fd2f Michael Hanselmann
                         self.timeout_strategy.NextAttempt)
2302 5fd6b694 Michael Hanselmann
        self.assertFalse(job.ops[self.curop].exec_timestamp)
2303 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2304 5fd6b694 Michael Hanselmann
2305 5fd6b694 Michael Hanselmann
        # If priority has changed since acquiring locks, the job must've been
2306 5fd6b694 Michael Hanselmann
        # updated
2307 5fd6b694 Michael Hanselmann
        if self.lock_acq_prio != job.ops[self.curop].priority:
2308 5fd6b694 Michael Hanselmann
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2309 5fd6b694 Michael Hanselmann
2310 5fd6b694 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
2311 26d3fd2f Michael Hanselmann
2312 26d3fd2f Michael Hanselmann
      self.assert_(job.start_timestamp)
2313 26d3fd2f Michael Hanselmann
      self.assertFalse(job.end_timestamp)
2314 26d3fd2f Michael Hanselmann
2315 26d3fd2f Michael Hanselmann
    self.assertEqual(self.curop, len(job.ops) - 1)
2316 26d3fd2f Michael Hanselmann
    self.assertEqual(self.job, job)
2317 26d3fd2f Michael Hanselmann
    self.assertEqual(self.opcounter.next(), len(job.ops))
2318 26d3fd2f Michael Hanselmann
    self.assert_(self.done_lock_before_blocking)
2319 26d3fd2f Michael Hanselmann
2320 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2321 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2322 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2323 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
2324 26d3fd2f Michael Hanselmann
                     [[op.input.result for op in job.ops]])
2325 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
2326 26d3fd2f Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
2327 26d3fd2f Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
2328 26d3fd2f Michael Hanselmann
                            for op in job.ops))
2329 26d3fd2f Michael Hanselmann
2330 66bd7445 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
2331 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2332 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
2333 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2334 26d3fd2f Michael Hanselmann
2335 26d3fd2f Michael Hanselmann
2336 4679547e Michael Hanselmann
class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
2337 4679547e Michael Hanselmann
  def setUp(self):
2338 4679547e Michael Hanselmann
    self.queue = _FakeQueueForProc()
2339 4679547e Michael Hanselmann
    self.opexecprio = []
2340 4679547e Michael Hanselmann
2341 4679547e Michael Hanselmann
  def _BeforeStart(self, timeout, priority):
2342 4679547e Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
2343 4679547e Michael Hanselmann
    self.opexecprio.append(priority)
2344 4679547e Michael Hanselmann
2345 4679547e Michael Hanselmann
  def testChangePriorityWhileRunning(self):
2346 4679547e Michael Hanselmann
    # Tests changing the priority on a job while it has finished opcodes
2347 4679547e Michael Hanselmann
    # (successful) and more, unprocessed ones
2348 4679547e Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2349 4679547e Michael Hanselmann
           for i in range(3)]
2350 4679547e Michael Hanselmann
2351 4679547e Michael Hanselmann
    # Create job
2352 4679547e Michael Hanselmann
    job_id = 3499
2353 4679547e Michael Hanselmann
    job = self._CreateJob(self.queue, job_id, ops)
2354 4679547e Michael Hanselmann
2355 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2356 4679547e Michael Hanselmann
2357 4679547e Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
2358 4679547e Michael Hanselmann
2359 4679547e Michael Hanselmann
    # Run first opcode
2360 4679547e Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2361 4679547e Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
2362 4679547e Michael Hanselmann
2363 4679547e Michael Hanselmann
    # Job goes back to queued
2364 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2365 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2366 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2367 4679547e Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
2368 4679547e Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
2369 4679547e Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
2370 4679547e Michael Hanselmann
                      ["Res0", None, None]])
2371 4679547e Michael Hanselmann
2372 4679547e Michael Hanselmann
    self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
2373 4679547e Michael Hanselmann
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2374 4679547e Michael Hanselmann
2375 4679547e Michael Hanselmann
    # Change priority
2376 4679547e Michael Hanselmann
    self.assertEqual(job.ChangePriority(-10),
2377 4679547e Michael Hanselmann
                     (True,
2378 4679547e Michael Hanselmann
                      ("Priorities of pending opcodes for job 3499 have"
2379 4679547e Michael Hanselmann
                       " been changed to -10")))
2380 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -10)
2381 4679547e Michael Hanselmann
2382 4679547e Michael Hanselmann
    # Process second opcode
2383 4679547e Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2384 4679547e Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
2385 4679547e Michael Hanselmann
2386 4679547e Michael Hanselmann
    self.assertEqual(self.opexecprio.pop(0), -10)
2387 4679547e Michael Hanselmann
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2388 4679547e Michael Hanselmann
2389 4679547e Michael Hanselmann
    # Check status
2390 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2391 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -10)
2392 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2393 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
2394 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2395 4679547e Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
2396 4679547e Michael Hanselmann
                       constants.OP_STATUS_SUCCESS,
2397 4679547e Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
2398 4679547e Michael Hanselmann
                      ["Res0", "Res1", None]])
2399 4679547e Michael Hanselmann
2400 4679547e Michael Hanselmann
    # Change priority once more
2401 4679547e Michael Hanselmann
    self.assertEqual(job.ChangePriority(5),
2402 4679547e Michael Hanselmann
                     (True,
2403 4679547e Michael Hanselmann
                      ("Priorities of pending opcodes for job 3499 have"
2404 4679547e Michael Hanselmann
                       " been changed to 5")))
2405 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), 5)
2406 4679547e Michael Hanselmann
2407 4679547e Michael Hanselmann
    # Process third opcode
2408 4679547e Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2409 4679547e Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
2410 4679547e Michael Hanselmann
2411 4679547e Michael Hanselmann
    self.assertEqual(self.opexecprio.pop(0), 5)
2412 4679547e Michael Hanselmann
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2413 4679547e Michael Hanselmann
2414 4679547e Michael Hanselmann
    # Check status
2415 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2416 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2417 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2418 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2419 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2420 4679547e Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
2421 4679547e Michael Hanselmann
                       constants.OP_STATUS_SUCCESS,
2422 4679547e Michael Hanselmann
                       constants.OP_STATUS_SUCCESS],
2423 4679547e Michael Hanselmann
                      ["Res0", "Res1", "Res2"]])
2424 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
2425 4679547e Michael Hanselmann
                     [constants.OP_PRIO_DEFAULT, -10, 5])
2426 4679547e Michael Hanselmann
2427 4679547e Michael Hanselmann
2428 1b5150ab Michael Hanselmann
class _IdOnlyFakeJob:
2429 1b5150ab Michael Hanselmann
  def __init__(self, job_id, priority=NotImplemented):
2430 1b5150ab Michael Hanselmann
    self.id = str(job_id)
2431 1b5150ab Michael Hanselmann
    self._priority = priority
2432 1b5150ab Michael Hanselmann
2433 1b5150ab Michael Hanselmann
  def CalcPriority(self):
2434 1b5150ab Michael Hanselmann
    return self._priority
2435 b95479a5 Michael Hanselmann
2436 1b5150ab Michael Hanselmann
2437 1b5150ab Michael Hanselmann
class TestJobDependencyManager(unittest.TestCase):
2438 b95479a5 Michael Hanselmann
  def setUp(self):
2439 b95479a5 Michael Hanselmann
    self._status = []
2440 b95479a5 Michael Hanselmann
    self._queue = []
2441 b95479a5 Michael Hanselmann
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
2442 b95479a5 Michael Hanselmann
2443 b95479a5 Michael Hanselmann
  def _GetStatus(self, job_id):
2444 b95479a5 Michael Hanselmann
    (exp_job_id, result) = self._status.pop(0)
2445 b95479a5 Michael Hanselmann
    self.assertEqual(exp_job_id, job_id)
2446 b95479a5 Michael Hanselmann
    return result
2447 b95479a5 Michael Hanselmann
2448 b95479a5 Michael Hanselmann
  def _Enqueue(self, jobs):
2449 37d76f1e Michael Hanselmann
    self.assertFalse(self.jdm._lock.is_owned(),
2450 37d76f1e Michael Hanselmann
                     msg=("Must not own manager lock while re-adding jobs"
2451 37d76f1e Michael Hanselmann
                          " (potential deadlock)"))
2452 b95479a5 Michael Hanselmann
    self._queue.append(jobs)
2453 b95479a5 Michael Hanselmann
2454 b95479a5 Michael Hanselmann
  def testNotFinalizedThenCancel(self):
2455 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(17697)
2456 b95479a5 Michael Hanselmann
    job_id = str(28625)
2457 b95479a5 Michael Hanselmann
2458 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2459 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2460 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2461 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2462 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2463 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2464 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2465 b95479a5 Michael Hanselmann
      job_id: set([job]),
2466 b95479a5 Michael Hanselmann
      })
2467 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2468 fcb21ad7 Michael Hanselmann
      ("job/28625", None, None, [("job", [job.id])])
2469 fcb21ad7 Michael Hanselmann
      ])
2470 b95479a5 Michael Hanselmann
2471 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2472 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2473 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CANCEL)
2474 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2475 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2476 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2477 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2478 b95479a5 Michael Hanselmann
2479 942e2262 Michael Hanselmann
  def testNotFinalizedThenQueued(self):
2480 942e2262 Michael Hanselmann
    # This can happen on a queue shutdown
2481 942e2262 Michael Hanselmann
    job = _IdOnlyFakeJob(1320)
2482 942e2262 Michael Hanselmann
    job_id = str(22971)
2483 942e2262 Michael Hanselmann
2484 942e2262 Michael Hanselmann
    for i in range(5):
2485 942e2262 Michael Hanselmann
      if i > 2:
2486 942e2262 Michael Hanselmann
        self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2487 942e2262 Michael Hanselmann
      else:
2488 942e2262 Michael Hanselmann
        self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2489 942e2262 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2490 942e2262 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
2491 942e2262 Michael Hanselmann
      self.assertFalse(self._status)
2492 942e2262 Michael Hanselmann
      self.assertFalse(self._queue)
2493 942e2262 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
2494 942e2262 Michael Hanselmann
      self.assertEqual(self.jdm._waiters, {
2495 942e2262 Michael Hanselmann
        job_id: set([job]),
2496 942e2262 Michael Hanselmann
        })
2497 942e2262 Michael Hanselmann
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2498 942e2262 Michael Hanselmann
        ("job/22971", None, None, [("job", [job.id])])
2499 942e2262 Michael Hanselmann
        ])
2500 942e2262 Michael Hanselmann
2501 b95479a5 Michael Hanselmann
  def testRequireCancel(self):
2502 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(5278)
2503 b95479a5 Michael Hanselmann
    job_id = str(9610)
2504 b95479a5 Michael Hanselmann
    dep_status = [constants.JOB_STATUS_CANCELED]
2505 b95479a5 Michael Hanselmann
2506 47099cd1 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
2507 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2508 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2509 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2510 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2511 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2512 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2513 b95479a5 Michael Hanselmann
      job_id: set([job]),
2514 b95479a5 Michael Hanselmann
      })
2515 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2516 fcb21ad7 Michael Hanselmann
      ("job/9610", None, None, [("job", [job.id])])
2517 fcb21ad7 Michael Hanselmann
      ])
2518 b95479a5 Michael Hanselmann
2519 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2520 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2521 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2522 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2523 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2524 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2525 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2526 b95479a5 Michael Hanselmann
2527 b95479a5 Michael Hanselmann
  def testRequireError(self):
2528 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(21459)
2529 b95479a5 Michael Hanselmann
    job_id = str(25519)
2530 b95479a5 Michael Hanselmann
    dep_status = [constants.JOB_STATUS_ERROR]
2531 b95479a5 Michael Hanselmann
2532 47099cd1 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
2533 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2534 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2535 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2536 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2537 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2538 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2539 b95479a5 Michael Hanselmann
      job_id: set([job]),
2540 b95479a5 Michael Hanselmann
      })
2541 b95479a5 Michael Hanselmann
2542 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2543 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2544 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2545 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2546 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2547 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2548 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2549 b95479a5 Michael Hanselmann
2550 b95479a5 Michael Hanselmann
  def testRequireMultiple(self):
2551 b95479a5 Michael Hanselmann
    dep_status = list(constants.JOBS_FINALIZED)
2552 b95479a5 Michael Hanselmann
2553 b95479a5 Michael Hanselmann
    for end_status in dep_status:
2554 1b5150ab Michael Hanselmann
      job = _IdOnlyFakeJob(21343)
2555 b95479a5 Michael Hanselmann
      job_id = str(14609)
2556 b95479a5 Michael Hanselmann
2557 47099cd1 Michael Hanselmann
      self._status.append((job_id, constants.JOB_STATUS_WAITING))
2558 b95479a5 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2559 b95479a5 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
2560 b95479a5 Michael Hanselmann
      self.assertFalse(self._status)
2561 b95479a5 Michael Hanselmann
      self.assertFalse(self._queue)
2562 b95479a5 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
2563 b95479a5 Michael Hanselmann
      self.assertEqual(self.jdm._waiters, {
2564 b95479a5 Michael Hanselmann
        job_id: set([job]),
2565 b95479a5 Michael Hanselmann
        })
2566 fcb21ad7 Michael Hanselmann
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2567 fcb21ad7 Michael Hanselmann
        ("job/14609", None, None, [("job", [job.id])])
2568 fcb21ad7 Michael Hanselmann
        ])
2569 b95479a5 Michael Hanselmann
2570 b95479a5 Michael Hanselmann
      self._status.append((job_id, end_status))
2571 b95479a5 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2572 b95479a5 Michael Hanselmann
      self.assertEqual(result, self.jdm.CONTINUE)
2573 b95479a5 Michael Hanselmann
      self.assertFalse(self._status)
2574 b95479a5 Michael Hanselmann
      self.assertFalse(self._queue)
2575 b95479a5 Michael Hanselmann
      self.assertFalse(self.jdm.JobWaiting(job))
2576 fcb21ad7 Michael Hanselmann
      self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2577 b95479a5 Michael Hanselmann
2578 b95479a5 Michael Hanselmann
  def testNotify(self):
2579 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(8227)
2580 b95479a5 Michael Hanselmann
    job_id = str(4113)
2581 b95479a5 Michael Hanselmann
2582 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2583 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2584 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2585 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2586 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2587 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2588 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2589 b95479a5 Michael Hanselmann
      job_id: set([job]),
2590 b95479a5 Michael Hanselmann
      })
2591 b95479a5 Michael Hanselmann
2592 b95479a5 Michael Hanselmann
    self.jdm.NotifyWaiters(job_id)
2593 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2594 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm._waiters)
2595 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2596 b95479a5 Michael Hanselmann
    self.assertEqual(self._queue, [set([job])])
2597 b95479a5 Michael Hanselmann
2598 b95479a5 Michael Hanselmann
  def testWrongStatus(self):
2599 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(10102)
2600 b95479a5 Michael Hanselmann
    job_id = str(1271)
2601 b95479a5 Michael Hanselmann
2602 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2603 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2604 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2605 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2606 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2607 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2608 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2609 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2610 b95479a5 Michael Hanselmann
      job_id: set([job]),
2611 b95479a5 Michael Hanselmann
      })
2612 b95479a5 Michael Hanselmann
2613 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2614 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2615 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2616 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WRONGSTATUS)
2617 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2618 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2619 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2620 b95479a5 Michael Hanselmann
2621 b95479a5 Michael Hanselmann
  def testCorrectStatus(self):
2622 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(24273)
2623 b95479a5 Michael Hanselmann
    job_id = str(23885)
2624 b95479a5 Michael Hanselmann
2625 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2626 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2627 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2628 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2629 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2630 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2631 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2632 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2633 b95479a5 Michael Hanselmann
      job_id: set([job]),
2634 b95479a5 Michael Hanselmann
      })
2635 b95479a5 Michael Hanselmann
2636 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2637 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2638 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2639 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2640 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2641 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2642 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2643 b95479a5 Michael Hanselmann
2644 b95479a5 Michael Hanselmann
  def testFinalizedRightAway(self):
2645 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(224)
2646 b95479a5 Michael Hanselmann
    job_id = str(3081)
2647 b95479a5 Michael Hanselmann
2648 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2649 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2650 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2651 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2652 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2653 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2654 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2655 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2656 b95479a5 Michael Hanselmann
      job_id: set(),
2657 b95479a5 Michael Hanselmann
      })
2658 b95479a5 Michael Hanselmann
2659 b95479a5 Michael Hanselmann
    # Force cleanup
2660 b95479a5 Michael Hanselmann
    self.jdm.NotifyWaiters("0")
2661 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm._waiters)
2662 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2663 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2664 b95479a5 Michael Hanselmann
2665 fcb21ad7 Michael Hanselmann
  def testMultipleWaiting(self):
2666 fcb21ad7 Michael Hanselmann
    # Use a deterministic random generator
2667 fcb21ad7 Michael Hanselmann
    rnd = random.Random(21402)
2668 fcb21ad7 Michael Hanselmann
2669 fcb21ad7 Michael Hanselmann
    job_ids = map(str, rnd.sample(range(1, 10000), 150))
2670 fcb21ad7 Michael Hanselmann
2671 fcb21ad7 Michael Hanselmann
    waiters = dict((job_ids.pop(),
2672 1b5150ab Michael Hanselmann
                    set(map(_IdOnlyFakeJob,
2673 fcb21ad7 Michael Hanselmann
                            [job_ids.pop()
2674 fcb21ad7 Michael Hanselmann
                             for _ in range(rnd.randint(1, 20))])))
2675 fcb21ad7 Michael Hanselmann
                   for _ in range(10))
2676 fcb21ad7 Michael Hanselmann
2677 fcb21ad7 Michael Hanselmann
    # Ensure there are no duplicate job IDs
2678 fcb21ad7 Michael Hanselmann
    assert not utils.FindDuplicates(waiters.keys() +
2679 fcb21ad7 Michael Hanselmann
                                    [job.id
2680 fcb21ad7 Michael Hanselmann
                                     for jobs in waiters.values()
2681 fcb21ad7 Michael Hanselmann
                                     for job in jobs])
2682 fcb21ad7 Michael Hanselmann
2683 fcb21ad7 Michael Hanselmann
    # Register all jobs as waiters
2684 fcb21ad7 Michael Hanselmann
    for job_id, job in [(job_id, job)
2685 fcb21ad7 Michael Hanselmann
                        for (job_id, jobs) in waiters.items()
2686 fcb21ad7 Michael Hanselmann
                        for job in jobs]:
2687 fcb21ad7 Michael Hanselmann
      self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2688 fcb21ad7 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id,
2689 fcb21ad7 Michael Hanselmann
                                              [constants.JOB_STATUS_SUCCESS])
2690 fcb21ad7 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
2691 fcb21ad7 Michael Hanselmann
      self.assertFalse(self._status)
2692 fcb21ad7 Michael Hanselmann
      self.assertFalse(self._queue)
2693 fcb21ad7 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
2694 fcb21ad7 Michael Hanselmann
2695 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, waiters)
2696 fcb21ad7 Michael Hanselmann
2697 fcb21ad7 Michael Hanselmann
    def _MakeSet((name, mode, owner_names, pending)):
2698 fcb21ad7 Michael Hanselmann
      return (name, mode, owner_names,
2699 fcb21ad7 Michael Hanselmann
              [(pendmode, set(pend)) for (pendmode, pend) in pending])
2700 fcb21ad7 Michael Hanselmann
2701 fcb21ad7 Michael Hanselmann
    def _CheckLockInfo():
2702 fcb21ad7 Michael Hanselmann
      info = self.jdm.GetLockInfo([query.LQ_PENDING])
2703 fcb21ad7 Michael Hanselmann
      self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2704 fcb21ad7 Michael Hanselmann
        ("job/%s" % job_id, None, None,
2705 fcb21ad7 Michael Hanselmann
         [("job", set([job.id for job in jobs]))])
2706 fcb21ad7 Michael Hanselmann
        for job_id, jobs in waiters.items()
2707 fcb21ad7 Michael Hanselmann
        if jobs
2708 fcb21ad7 Michael Hanselmann
        ]))
2709 fcb21ad7 Michael Hanselmann
2710 fcb21ad7 Michael Hanselmann
    _CheckLockInfo()
2711 fcb21ad7 Michael Hanselmann
2712 fcb21ad7 Michael Hanselmann
    # Notify in random order
2713 fcb21ad7 Michael Hanselmann
    for job_id in rnd.sample(waiters, len(waiters)):
2714 fcb21ad7 Michael Hanselmann
      # Remove from pending waiter list
2715 fcb21ad7 Michael Hanselmann
      jobs = waiters.pop(job_id)
2716 fcb21ad7 Michael Hanselmann
      for job in jobs:
2717 fcb21ad7 Michael Hanselmann
        self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2718 fcb21ad7 Michael Hanselmann
        (result, _) = self.jdm.CheckAndRegister(job, job_id,
2719 fcb21ad7 Michael Hanselmann
                                                [constants.JOB_STATUS_SUCCESS])
2720 fcb21ad7 Michael Hanselmann
        self.assertEqual(result, self.jdm.CONTINUE)
2721 fcb21ad7 Michael Hanselmann
        self.assertFalse(self._status)
2722 fcb21ad7 Michael Hanselmann
        self.assertFalse(self._queue)
2723 fcb21ad7 Michael Hanselmann
        self.assertFalse(self.jdm.JobWaiting(job))
2724 fcb21ad7 Michael Hanselmann
2725 fcb21ad7 Michael Hanselmann
      _CheckLockInfo()
2726 fcb21ad7 Michael Hanselmann
2727 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2728 fcb21ad7 Michael Hanselmann
2729 fcb21ad7 Michael Hanselmann
    assert not waiters
2730 fcb21ad7 Michael Hanselmann
2731 b95479a5 Michael Hanselmann
  def testSelfDependency(self):
2732 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(18937)
2733 b95479a5 Michael Hanselmann
2734 b95479a5 Michael Hanselmann
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2735 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2736 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.ERROR)
2737 b95479a5 Michael Hanselmann
2738 b95479a5 Michael Hanselmann
  def testJobDisappears(self):
2739 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(30540)
2740 b95479a5 Michael Hanselmann
    job_id = str(23769)
2741 b95479a5 Michael Hanselmann
2742 b95479a5 Michael Hanselmann
    def _FakeStatus(_):
2743 b95479a5 Michael Hanselmann
      raise errors.JobLost("#msg#")
2744 b95479a5 Michael Hanselmann
2745 b95479a5 Michael Hanselmann
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2746 b95479a5 Michael Hanselmann
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2747 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.ERROR)
2748 b95479a5 Michael Hanselmann
    self.assertFalse(jdm.JobWaiting(job))
2749 fcb21ad7 Michael Hanselmann
    self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2750 b95479a5 Michael Hanselmann
2751 b95479a5 Michael Hanselmann
2752 989a8bee Michael Hanselmann
if __name__ == "__main__":
2753 989a8bee Michael Hanselmann
  testutils.GanetiTestProgram()