Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.jqueue_unittest.py @ 560ef132

History | View | Annotate | Download (95.3 kB)

1 989a8bee Michael Hanselmann
#!/usr/bin/python
2 989a8bee Michael Hanselmann
#
3 989a8bee Michael Hanselmann
4 76b62028 Iustin Pop
# Copyright (C) 2010, 2011, 2012 Google Inc.
5 989a8bee Michael Hanselmann
#
6 989a8bee Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 989a8bee Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 989a8bee Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 989a8bee Michael Hanselmann
# (at your option) any later version.
10 989a8bee Michael Hanselmann
#
11 989a8bee Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 989a8bee Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 989a8bee Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 989a8bee Michael Hanselmann
# General Public License for more details.
15 989a8bee Michael Hanselmann
#
16 989a8bee Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 989a8bee Michael Hanselmann
# along with this program; if not, write to the Free Software
18 989a8bee Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 989a8bee Michael Hanselmann
# 02110-1301, USA.
20 989a8bee Michael Hanselmann
21 989a8bee Michael Hanselmann
22 989a8bee Michael Hanselmann
"""Script for testing ganeti.jqueue"""
23 989a8bee Michael Hanselmann
24 989a8bee Michael Hanselmann
import os
25 989a8bee Michael Hanselmann
import sys
26 989a8bee Michael Hanselmann
import unittest
27 989a8bee Michael Hanselmann
import tempfile
28 989a8bee Michael Hanselmann
import shutil
29 989a8bee Michael Hanselmann
import errno
30 26d3fd2f Michael Hanselmann
import itertools
31 fcb21ad7 Michael Hanselmann
import random
32 4679547e Michael Hanselmann
import operator
33 989a8bee Michael Hanselmann
34 383477e9 Michael Hanselmann
try:
35 383477e9 Michael Hanselmann
  # pylint: disable=E0611
36 383477e9 Michael Hanselmann
  from pyinotify import pyinotify
37 383477e9 Michael Hanselmann
except ImportError:
38 383477e9 Michael Hanselmann
  import pyinotify
39 383477e9 Michael Hanselmann
40 989a8bee Michael Hanselmann
from ganeti import constants
41 989a8bee Michael Hanselmann
from ganeti import utils
42 989a8bee Michael Hanselmann
from ganeti import errors
43 989a8bee Michael Hanselmann
from ganeti import jqueue
44 8f5c488d Michael Hanselmann
from ganeti import opcodes
45 8f5c488d Michael Hanselmann
from ganeti import compat
46 26d3fd2f Michael Hanselmann
from ganeti import mcpu
47 fcb21ad7 Michael Hanselmann
from ganeti import query
48 df5a5730 Michael Hanselmann
from ganeti import workerpool
49 989a8bee Michael Hanselmann
50 989a8bee Michael Hanselmann
import testutils
51 989a8bee Michael Hanselmann
52 989a8bee Michael Hanselmann
53 989a8bee Michael Hanselmann
class _FakeJob:
54 989a8bee Michael Hanselmann
  def __init__(self, job_id, status):
55 989a8bee Michael Hanselmann
    self.id = job_id
56 c0f6d0d8 Michael Hanselmann
    self.writable = False
57 989a8bee Michael Hanselmann
    self._status = status
58 989a8bee Michael Hanselmann
    self._log = []
59 989a8bee Michael Hanselmann
60 989a8bee Michael Hanselmann
  def SetStatus(self, status):
61 989a8bee Michael Hanselmann
    self._status = status
62 989a8bee Michael Hanselmann
63 989a8bee Michael Hanselmann
  def AddLogEntry(self, msg):
64 989a8bee Michael Hanselmann
    self._log.append((len(self._log), msg))
65 989a8bee Michael Hanselmann
66 989a8bee Michael Hanselmann
  def CalcStatus(self):
67 989a8bee Michael Hanselmann
    return self._status
68 989a8bee Michael Hanselmann
69 989a8bee Michael Hanselmann
  def GetInfo(self, fields):
70 989a8bee Michael Hanselmann
    result = []
71 989a8bee Michael Hanselmann
72 989a8bee Michael Hanselmann
    for name in fields:
73 989a8bee Michael Hanselmann
      if name == "status":
74 989a8bee Michael Hanselmann
        result.append(self._status)
75 989a8bee Michael Hanselmann
      else:
76 989a8bee Michael Hanselmann
        raise Exception("Unknown field")
77 989a8bee Michael Hanselmann
78 989a8bee Michael Hanselmann
    return result
79 989a8bee Michael Hanselmann
80 989a8bee Michael Hanselmann
  def GetLogEntries(self, newer_than):
81 989a8bee Michael Hanselmann
    assert newer_than is None or newer_than >= 0
82 989a8bee Michael Hanselmann
83 989a8bee Michael Hanselmann
    if newer_than is None:
84 989a8bee Michael Hanselmann
      return self._log
85 989a8bee Michael Hanselmann
86 989a8bee Michael Hanselmann
    return self._log[newer_than:]
87 989a8bee Michael Hanselmann
88 989a8bee Michael Hanselmann
89 989a8bee Michael Hanselmann
class TestJobChangesChecker(unittest.TestCase):
90 989a8bee Michael Hanselmann
  def testStatus(self):
91 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
92 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
93 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
94 989a8bee Michael Hanselmann
95 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
96 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
97 989a8bee Michael Hanselmann
98 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
99 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
100 989a8bee Michael Hanselmann
101 989a8bee Michael Hanselmann
    # job.id is used by checker
102 989a8bee Michael Hanselmann
    self.assertEqual(job.id, 9094)
103 989a8bee Michael Hanselmann
104 989a8bee Michael Hanselmann
  def testStatusWithPrev(self):
105 989a8bee Michael Hanselmann
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
106 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"],
107 989a8bee Michael Hanselmann
                                        [constants.JOB_STATUS_QUEUED], None)
108 989a8bee Michael Hanselmann
    self.assert_(checker(job) is None)
109 989a8bee Michael Hanselmann
110 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
111 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
112 989a8bee Michael Hanselmann
113 989a8bee Michael Hanselmann
  def testFinalStatus(self):
114 989a8bee Michael Hanselmann
    for status in constants.JOBS_FINALIZED:
115 989a8bee Michael Hanselmann
      job = _FakeJob(2178711, status)
116 989a8bee Michael Hanselmann
      checker = jqueue._JobChangesChecker(["status"], [status], None)
117 989a8bee Michael Hanselmann
      # There won't be any changes in this status, hence it should signal
118 989a8bee Michael Hanselmann
      # a change immediately
119 989a8bee Michael Hanselmann
      self.assertEqual(checker(job), ([status], []))
120 989a8bee Michael Hanselmann
121 989a8bee Michael Hanselmann
  def testLog(self):
122 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
123 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
124 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
125 989a8bee Michael Hanselmann
126 989a8bee Michael Hanselmann
    job.AddLogEntry("Hello World")
127 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker(job)
128 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
129 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"]])
130 989a8bee Michael Hanselmann
131 989a8bee Michael Hanselmann
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
132 989a8bee Michael Hanselmann
    self.assert_(checker2(job) is None)
133 989a8bee Michael Hanselmann
134 989a8bee Michael Hanselmann
    job.AddLogEntry("Foo Bar")
135 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_ERROR)
136 989a8bee Michael Hanselmann
137 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker2(job)
138 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
139 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
140 989a8bee Michael Hanselmann
141 989a8bee Michael Hanselmann
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
142 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker3(job)
143 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
144 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
145 989a8bee Michael Hanselmann
146 989a8bee Michael Hanselmann
147 989a8bee Michael Hanselmann
class TestJobChangesWaiter(unittest.TestCase):
148 989a8bee Michael Hanselmann
  def setUp(self):
149 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
150 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
151 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
152 989a8bee Michael Hanselmann
153 989a8bee Michael Hanselmann
  def tearDown(self):
154 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
155 989a8bee Michael Hanselmann
156 989a8bee Michael Hanselmann
  def _EnsureNotifierClosed(self, notifier):
157 989a8bee Michael Hanselmann
    try:
158 989a8bee Michael Hanselmann
      os.fstat(notifier._fd)
159 989a8bee Michael Hanselmann
    except EnvironmentError, err:
160 989a8bee Michael Hanselmann
      self.assertEqual(err.errno, errno.EBADF)
161 989a8bee Michael Hanselmann
    else:
162 989a8bee Michael Hanselmann
      self.fail("File descriptor wasn't closed")
163 989a8bee Michael Hanselmann
164 989a8bee Michael Hanselmann
  def testClose(self):
165 989a8bee Michael Hanselmann
    for wait in [False, True]:
166 989a8bee Michael Hanselmann
      waiter = jqueue._JobFileChangesWaiter(self.filename)
167 989a8bee Michael Hanselmann
      try:
168 989a8bee Michael Hanselmann
        if wait:
169 989a8bee Michael Hanselmann
          waiter.Wait(0.001)
170 989a8bee Michael Hanselmann
      finally:
171 989a8bee Michael Hanselmann
        waiter.Close()
172 989a8bee Michael Hanselmann
173 989a8bee Michael Hanselmann
      # Ensure file descriptor was closed
174 989a8bee Michael Hanselmann
      self._EnsureNotifierClosed(waiter._notifier)
175 989a8bee Michael Hanselmann
176 989a8bee Michael Hanselmann
  def testChangingFile(self):
177 989a8bee Michael Hanselmann
    waiter = jqueue._JobFileChangesWaiter(self.filename)
178 989a8bee Michael Hanselmann
    try:
179 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
180 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
181 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
182 989a8bee Michael Hanselmann
    finally:
183 989a8bee Michael Hanselmann
      waiter.Close()
184 989a8bee Michael Hanselmann
185 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._notifier)
186 989a8bee Michael Hanselmann
187 989a8bee Michael Hanselmann
  def testChangingFile2(self):
188 989a8bee Michael Hanselmann
    waiter = jqueue._JobChangesWaiter(self.filename)
189 989a8bee Michael Hanselmann
    try:
190 989a8bee Michael Hanselmann
      self.assertFalse(waiter._filewaiter)
191 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(0.1))
192 989a8bee Michael Hanselmann
      self.assert_(waiter._filewaiter)
193 989a8bee Michael Hanselmann
194 989a8bee Michael Hanselmann
      # File waiter is now used, but there have been no changes
195 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
196 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
197 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
198 989a8bee Michael Hanselmann
    finally:
199 989a8bee Michael Hanselmann
      waiter.Close()
200 989a8bee Michael Hanselmann
201 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
202 989a8bee Michael Hanselmann
203 989a8bee Michael Hanselmann
204 383477e9 Michael Hanselmann
class _FailingWatchManager(pyinotify.WatchManager):
205 383477e9 Michael Hanselmann
  """Subclass of L{pyinotify.WatchManager} which always fails to register.
206 383477e9 Michael Hanselmann

207 383477e9 Michael Hanselmann
  """
208 383477e9 Michael Hanselmann
  def add_watch(self, filename, mask):
209 383477e9 Michael Hanselmann
    assert mask == (pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] |
210 383477e9 Michael Hanselmann
                    pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"])
211 383477e9 Michael Hanselmann
212 383477e9 Michael Hanselmann
    return {
213 383477e9 Michael Hanselmann
      filename: -1,
214 383477e9 Michael Hanselmann
      }
215 383477e9 Michael Hanselmann
216 383477e9 Michael Hanselmann
217 989a8bee Michael Hanselmann
class TestWaitForJobChangesHelper(unittest.TestCase):
218 989a8bee Michael Hanselmann
  def setUp(self):
219 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
220 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
221 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
222 989a8bee Michael Hanselmann
223 989a8bee Michael Hanselmann
  def tearDown(self):
224 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
225 989a8bee Michael Hanselmann
226 989a8bee Michael Hanselmann
  def _LoadWaitingJob(self):
227 47099cd1 Michael Hanselmann
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
228 989a8bee Michael Hanselmann
229 989a8bee Michael Hanselmann
  def _LoadLostJob(self):
230 989a8bee Michael Hanselmann
    return None
231 989a8bee Michael Hanselmann
232 989a8bee Michael Hanselmann
  def testNoChanges(self):
233 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
234 989a8bee Michael Hanselmann
235 989a8bee Michael Hanselmann
    # No change
236 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
237 47099cd1 Michael Hanselmann
                          [constants.JOB_STATUS_WAITING], None, 0.1),
238 989a8bee Michael Hanselmann
                     constants.JOB_NOTCHANGED)
239 989a8bee Michael Hanselmann
240 989a8bee Michael Hanselmann
    # No previous information
241 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
242 989a8bee Michael Hanselmann
                          ["status"], None, None, 1.0),
243 47099cd1 Michael Hanselmann
                     ([constants.JOB_STATUS_WAITING], []))
244 989a8bee Michael Hanselmann
245 989a8bee Michael Hanselmann
  def testLostJob(self):
246 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
247 989a8bee Michael Hanselmann
    self.assert_(wfjc(self.filename, self._LoadLostJob,
248 989a8bee Michael Hanselmann
                      ["status"], None, None, 1.0) is None)
249 989a8bee Michael Hanselmann
250 383477e9 Michael Hanselmann
  def testNonExistentFile(self):
251 383477e9 Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
252 383477e9 Michael Hanselmann
253 383477e9 Michael Hanselmann
    filename = utils.PathJoin(self.tmpdir, "does-not-exist")
254 383477e9 Michael Hanselmann
    self.assertFalse(os.path.exists(filename))
255 383477e9 Michael Hanselmann
256 383477e9 Michael Hanselmann
    result = wfjc(filename, self._LoadLostJob, ["status"], None, None, 1.0,
257 383477e9 Michael Hanselmann
                  _waiter_cls=compat.partial(jqueue._JobChangesWaiter,
258 383477e9 Michael Hanselmann
                                             _waiter_cls=NotImplemented))
259 383477e9 Michael Hanselmann
    self.assertTrue(result is None)
260 383477e9 Michael Hanselmann
261 383477e9 Michael Hanselmann
  def testInotifyError(self):
262 383477e9 Michael Hanselmann
    jobfile_waiter_cls = \
263 383477e9 Michael Hanselmann
      compat.partial(jqueue._JobFileChangesWaiter,
264 383477e9 Michael Hanselmann
                     _inotify_wm_cls=_FailingWatchManager)
265 383477e9 Michael Hanselmann
266 383477e9 Michael Hanselmann
    jobchange_waiter_cls = \
267 383477e9 Michael Hanselmann
      compat.partial(jqueue._JobChangesWaiter, _waiter_cls=jobfile_waiter_cls)
268 383477e9 Michael Hanselmann
269 383477e9 Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
270 383477e9 Michael Hanselmann
271 383477e9 Michael Hanselmann
    # Test if failing to watch a job file (e.g. due to
272 383477e9 Michael Hanselmann
    # fs.inotify.max_user_watches being too low) raises errors.InotifyError
273 383477e9 Michael Hanselmann
    self.assertRaises(errors.InotifyError, wfjc,
274 383477e9 Michael Hanselmann
                      self.filename, self._LoadWaitingJob,
275 383477e9 Michael Hanselmann
                      ["status"], [constants.JOB_STATUS_WAITING], None, 1.0,
276 383477e9 Michael Hanselmann
                      _waiter_cls=jobchange_waiter_cls)
277 383477e9 Michael Hanselmann
278 989a8bee Michael Hanselmann
279 6760e4ed Michael Hanselmann
class TestEncodeOpError(unittest.TestCase):
280 6760e4ed Michael Hanselmann
  def test(self):
281 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
282 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
283 6760e4ed Michael Hanselmann
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
284 6760e4ed Michael Hanselmann
285 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
286 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
287 6760e4ed Michael Hanselmann
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
288 6760e4ed Michael Hanselmann
289 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
290 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
291 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
292 6760e4ed Michael Hanselmann
293 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError("Hello World")
294 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
295 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
296 6760e4ed Michael Hanselmann
297 6760e4ed Michael Hanselmann
298 8f5c488d Michael Hanselmann
class TestQueuedOpCode(unittest.TestCase):
299 8f5c488d Michael Hanselmann
  def testDefaults(self):
300 8f5c488d Michael Hanselmann
    def _Check(op):
301 a9e1819b Klaus Aehlig
      self.assertFalse(op.input.dry_run)
302 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
303 8f5c488d Michael Hanselmann
      self.assertFalse(op.log)
304 8f5c488d Michael Hanselmann
      self.assert_(op.start_timestamp is None)
305 8f5c488d Michael Hanselmann
      self.assert_(op.exec_timestamp is None)
306 8f5c488d Michael Hanselmann
      self.assert_(op.end_timestamp is None)
307 8f5c488d Michael Hanselmann
      self.assert_(op.result is None)
308 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
309 8f5c488d Michael Hanselmann
310 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
311 8f5c488d Michael Hanselmann
    _Check(op1)
312 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
313 8f5c488d Michael Hanselmann
    _Check(op2)
314 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
315 8f5c488d Michael Hanselmann
316 8f5c488d Michael Hanselmann
  def testPriority(self):
317 8f5c488d Michael Hanselmann
    def _Check(op):
318 8f5c488d Michael Hanselmann
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
319 8f5c488d Michael Hanselmann
             "Default priority equals high priority; test can't work"
320 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
321 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
322 8f5c488d Michael Hanselmann
323 c6afb1ca Iustin Pop
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
324 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(inpop)
325 8f5c488d Michael Hanselmann
    _Check(op1)
326 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
327 8f5c488d Michael Hanselmann
    _Check(op2)
328 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
329 8f5c488d Michael Hanselmann
330 8f5c488d Michael Hanselmann
331 8f5c488d Michael Hanselmann
class TestQueuedJob(unittest.TestCase):
332 4679547e Michael Hanselmann
  def testNoOpCodes(self):
333 5f6b0b71 Michael Hanselmann
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
334 c0f6d0d8 Michael Hanselmann
                      None, 1, [], False)
335 5f6b0b71 Michael Hanselmann
336 8f5c488d Michael Hanselmann
  def testDefaults(self):
337 8f5c488d Michael Hanselmann
    job_id = 4260
338 8f5c488d Michael Hanselmann
    ops = [
339 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(),
340 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
341 8f5c488d Michael Hanselmann
      ]
342 8f5c488d Michael Hanselmann
343 8f5c488d Michael Hanselmann
    def _Check(job):
344 c0f6d0d8 Michael Hanselmann
      self.assertTrue(job.writable)
345 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
346 8f5c488d Michael Hanselmann
      self.assertEqual(job.log_serial, 0)
347 8f5c488d Michael Hanselmann
      self.assert_(job.received_timestamp)
348 8f5c488d Michael Hanselmann
      self.assert_(job.start_timestamp is None)
349 8f5c488d Michael Hanselmann
      self.assert_(job.end_timestamp is None)
350 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
351 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
352 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
353 8f5c488d Michael Hanselmann
      self.assertEqual(len(job.ops), len(ops))
354 8f5c488d Michael Hanselmann
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
355 8f5c488d Michael Hanselmann
                              for (inp, op) in zip(ops, job.ops)))
356 a06c6ae8 Michael Hanselmann
      self.assertRaises(errors.OpPrereqError, job.GetInfo,
357 5f6b0b71 Michael Hanselmann
                        ["unknown-field"])
358 5f6b0b71 Michael Hanselmann
      self.assertEqual(job.GetInfo(["summary"]),
359 5f6b0b71 Michael Hanselmann
                       [[op.input.Summary() for op in job.ops]])
360 8a3cd185 Michael Hanselmann
      self.assertFalse(job.archived)
361 8f5c488d Michael Hanselmann
362 c0f6d0d8 Michael Hanselmann
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
363 8f5c488d Michael Hanselmann
    _Check(job1)
364 8a3cd185 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True, False)
365 8f5c488d Michael Hanselmann
    _Check(job2)
366 8f5c488d Michael Hanselmann
    self.assertEqual(job1.Serialize(), job2.Serialize())
367 8f5c488d Michael Hanselmann
368 c0f6d0d8 Michael Hanselmann
  def testWritable(self):
369 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
370 c0f6d0d8 Michael Hanselmann
    self.assertFalse(job.writable)
371 c0f6d0d8 Michael Hanselmann
372 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
373 c0f6d0d8 Michael Hanselmann
    self.assertTrue(job.writable)
374 c0f6d0d8 Michael Hanselmann
375 8a3cd185 Michael Hanselmann
  def testArchived(self):
376 8a3cd185 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
377 8a3cd185 Michael Hanselmann
    self.assertFalse(job.archived)
378 8a3cd185 Michael Hanselmann
379 8a3cd185 Michael Hanselmann
    newjob = jqueue._QueuedJob.Restore(None, job.Serialize(), True, True)
380 8a3cd185 Michael Hanselmann
    self.assertTrue(newjob.archived)
381 8a3cd185 Michael Hanselmann
382 8a3cd185 Michael Hanselmann
    newjob2 = jqueue._QueuedJob.Restore(None, newjob.Serialize(), True, False)
383 8a3cd185 Michael Hanselmann
    self.assertFalse(newjob2.archived)
384 8a3cd185 Michael Hanselmann
385 8f5c488d Michael Hanselmann
  def testPriority(self):
386 8f5c488d Michael Hanselmann
    job_id = 4283
387 8f5c488d Michael Hanselmann
    ops = [
388 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
389 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
390 8f5c488d Michael Hanselmann
      ]
391 8f5c488d Michael Hanselmann
392 8f5c488d Michael Hanselmann
    def _Check(job):
393 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
394 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
395 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
396 8f5c488d Michael Hanselmann
397 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, job_id, ops, True)
398 8f5c488d Michael Hanselmann
    _Check(job)
399 8f5c488d Michael Hanselmann
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
400 8f5c488d Michael Hanselmann
                            for op in job.ops))
401 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
402 8f5c488d Michael Hanselmann
403 8f5c488d Michael Hanselmann
    # Increase first
404 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 1
405 8f5c488d Michael Hanselmann
    _Check(job)
406 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
407 8f5c488d Michael Hanselmann
408 8f5c488d Michael Hanselmann
    # Mark opcode as finished
409 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
410 8f5c488d Michael Hanselmann
    _Check(job)
411 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
412 8f5c488d Michael Hanselmann
413 8f5c488d Michael Hanselmann
    # Increase second
414 8f5c488d Michael Hanselmann
    job.ops[1].priority -= 10
415 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
416 8f5c488d Michael Hanselmann
417 8f5c488d Michael Hanselmann
    # Test increasing first
418 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
419 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 19
420 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
421 8f5c488d Michael Hanselmann
422 4679547e Michael Hanselmann
  def _JobForPriority(self, job_id):
423 4679547e Michael Hanselmann
    ops = [
424 4679547e Michael Hanselmann
      opcodes.OpTagsGet(),
425 4679547e Michael Hanselmann
      opcodes.OpTestDelay(),
426 4679547e Michael Hanselmann
      opcodes.OpTagsGet(),
427 4679547e Michael Hanselmann
      opcodes.OpTestDelay(),
428 4679547e Michael Hanselmann
      ]
429 4679547e Michael Hanselmann
430 4679547e Michael Hanselmann
    job = jqueue._QueuedJob(None, job_id, ops, True)
431 4679547e Michael Hanselmann
432 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
433 4679547e Michael Hanselmann
                               for op in job.ops))
434 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
435 4679547e Michael Hanselmann
436 4679547e Michael Hanselmann
    return job
437 4679547e Michael Hanselmann
438 4679547e Michael Hanselmann
  def testChangePriorityAllQueued(self):
439 4679547e Michael Hanselmann
    job = self._JobForPriority(24984)
440 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
441 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
442 4679547e Michael Hanselmann
                               for op in job.ops))
443 4679547e Michael Hanselmann
    result = job.ChangePriority(-10)
444 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -10)
445 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
446 4679547e Michael Hanselmann
    self.assertEqual(result,
447 4679547e Michael Hanselmann
                     (True, ("Priorities of pending opcodes for job 24984 have"
448 4679547e Michael Hanselmann
                             " been changed to -10")))
449 4679547e Michael Hanselmann
450 4679547e Michael Hanselmann
  def testChangePriorityAllFinished(self):
451 4679547e Michael Hanselmann
    job = self._JobForPriority(16405)
452 4679547e Michael Hanselmann
453 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
454 4679547e Michael Hanselmann
      if idx > 2:
455 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
456 4679547e Michael Hanselmann
      else:
457 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
458 4679547e Michael Hanselmann
459 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
460 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
461 4679547e Michael Hanselmann
    result = job.ChangePriority(-10)
462 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
463 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
464 4679547e Michael Hanselmann
                               for op in job.ops))
465 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
466 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
467 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
468 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
469 4679547e Michael Hanselmann
      constants.OP_STATUS_ERROR,
470 4679547e Michael Hanselmann
      ])
471 4679547e Michael Hanselmann
    self.assertEqual(result, (False, "Job 16405 is finished"))
472 4679547e Michael Hanselmann
473 4679547e Michael Hanselmann
  def testChangePriorityCancelling(self):
474 4679547e Michael Hanselmann
    job = self._JobForPriority(31572)
475 4679547e Michael Hanselmann
476 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
477 4679547e Michael Hanselmann
      if idx > 1:
478 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
479 4679547e Michael Hanselmann
      else:
480 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
481 4679547e Michael Hanselmann
482 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
483 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
484 4679547e Michael Hanselmann
    result = job.ChangePriority(5)
485 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
486 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
487 4679547e Michael Hanselmann
                               for op in job.ops))
488 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
489 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
490 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
491 4679547e Michael Hanselmann
      constants.OP_STATUS_CANCELING,
492 4679547e Michael Hanselmann
      constants.OP_STATUS_CANCELING,
493 4679547e Michael Hanselmann
      ])
494 4679547e Michael Hanselmann
    self.assertEqual(result, (False, "Job 31572 is cancelling"))
495 4679547e Michael Hanselmann
496 4679547e Michael Hanselmann
  def testChangePriorityFirstRunning(self):
497 4679547e Michael Hanselmann
    job = self._JobForPriority(1716215889)
498 4679547e Michael Hanselmann
499 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
500 4679547e Michael Hanselmann
      if idx == 0:
501 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_RUNNING
502 4679547e Michael Hanselmann
      else:
503 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_QUEUED
504 4679547e Michael Hanselmann
505 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
506 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
507 4679547e Michael Hanselmann
    result = job.ChangePriority(7)
508 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
509 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
510 4679547e Michael Hanselmann
                     [constants.OP_PRIO_DEFAULT, 7, 7, 7])
511 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
512 4679547e Michael Hanselmann
      constants.OP_STATUS_RUNNING,
513 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
514 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
515 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
516 4679547e Michael Hanselmann
      ])
517 4679547e Michael Hanselmann
    self.assertEqual(result,
518 4679547e Michael Hanselmann
                     (True, ("Priorities of pending opcodes for job"
519 4679547e Michael Hanselmann
                             " 1716215889 have been changed to 7")))
520 4679547e Michael Hanselmann
521 4679547e Michael Hanselmann
  def testChangePriorityLastRunning(self):
522 4679547e Michael Hanselmann
    job = self._JobForPriority(1308)
523 4679547e Michael Hanselmann
524 4679547e Michael Hanselmann
    for (idx, op) in enumerate(job.ops):
525 4679547e Michael Hanselmann
      if idx == (len(job.ops) - 1):
526 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_RUNNING
527 4679547e Michael Hanselmann
      else:
528 4679547e Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
529 4679547e Michael Hanselmann
530 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
531 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
532 4679547e Michael Hanselmann
    result = job.ChangePriority(-3)
533 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
534 4679547e Michael Hanselmann
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
535 4679547e Michael Hanselmann
                               for op in job.ops))
536 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
537 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
538 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
539 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
540 4679547e Michael Hanselmann
      constants.OP_STATUS_RUNNING,
541 4679547e Michael Hanselmann
      ])
542 4679547e Michael Hanselmann
    self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
543 4679547e Michael Hanselmann
544 4679547e Michael Hanselmann
  def testChangePrioritySecondOpcodeRunning(self):
545 4679547e Michael Hanselmann
    job = self._JobForPriority(27701)
546 4679547e Michael Hanselmann
547 4679547e Michael Hanselmann
    self.assertEqual(len(job.ops), 4)
548 4679547e Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
549 4679547e Michael Hanselmann
    job.ops[1].status = constants.OP_STATUS_RUNNING
550 4679547e Michael Hanselmann
    job.ops[2].status = constants.OP_STATUS_QUEUED
551 4679547e Michael Hanselmann
    job.ops[3].status = constants.OP_STATUS_QUEUED
552 4679547e Michael Hanselmann
553 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
554 4679547e Michael Hanselmann
    result = job.ChangePriority(-19)
555 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -19)
556 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
557 4679547e Michael Hanselmann
                     [constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
558 4679547e Michael Hanselmann
                      -19, -19])
559 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
560 4679547e Michael Hanselmann
      constants.OP_STATUS_SUCCESS,
561 4679547e Michael Hanselmann
      constants.OP_STATUS_RUNNING,
562 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
563 4679547e Michael Hanselmann
      constants.OP_STATUS_QUEUED,
564 4679547e Michael Hanselmann
      ])
565 4679547e Michael Hanselmann
    self.assertEqual(result,
566 4679547e Michael Hanselmann
                     (True, ("Priorities of pending opcodes for job"
567 4679547e Michael Hanselmann
                             " 27701 have been changed to -19")))
568 4679547e Michael Hanselmann
569 4679547e Michael Hanselmann
  def testChangePriorityWithInconsistentJob(self):
570 4679547e Michael Hanselmann
    job = self._JobForPriority(30097)
571 4679547e Michael Hanselmann
572 4679547e Michael Hanselmann
    self.assertEqual(len(job.ops), 4)
573 4679547e Michael Hanselmann
574 4679547e Michael Hanselmann
    # This job is invalid (as it has two opcodes marked as running) and make
575 4679547e Michael Hanselmann
    # the call fail because an unprocessed opcode precedes a running one (which
576 4679547e Michael Hanselmann
    # should never happen in reality)
577 4679547e Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
578 4679547e Michael Hanselmann
    job.ops[1].status = constants.OP_STATUS_RUNNING
579 4679547e Michael Hanselmann
    job.ops[2].status = constants.OP_STATUS_QUEUED
580 4679547e Michael Hanselmann
    job.ops[3].status = constants.OP_STATUS_RUNNING
581 4679547e Michael Hanselmann
582 4679547e Michael Hanselmann
    self.assertRaises(AssertionError, job.ChangePriority, 19)
583 4679547e Michael Hanselmann
584 db5bce34 Michael Hanselmann
  def testCalcStatus(self):
585 db5bce34 Michael Hanselmann
    def _Queued(ops):
586 db5bce34 Michael Hanselmann
      # The default status is "queued"
587 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
588 db5bce34 Michael Hanselmann
                              for op in ops))
589 db5bce34 Michael Hanselmann
590 db5bce34 Michael Hanselmann
    def _Waitlock1(ops):
591 47099cd1 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_WAITING
592 db5bce34 Michael Hanselmann
593 db5bce34 Michael Hanselmann
    def _Waitlock2(ops):
594 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
595 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
596 47099cd1 Michael Hanselmann
      ops[2].status = constants.OP_STATUS_WAITING
597 db5bce34 Michael Hanselmann
598 db5bce34 Michael Hanselmann
    def _Running(ops):
599 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
600 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_RUNNING
601 db5bce34 Michael Hanselmann
      for op in ops[2:]:
602 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_QUEUED
603 db5bce34 Michael Hanselmann
604 db5bce34 Michael Hanselmann
    def _Canceling1(ops):
605 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
606 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
607 db5bce34 Michael Hanselmann
      for op in ops[2:]:
608 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
609 db5bce34 Michael Hanselmann
610 db5bce34 Michael Hanselmann
    def _Canceling2(ops):
611 db5bce34 Michael Hanselmann
      for op in ops:
612 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
613 db5bce34 Michael Hanselmann
614 db5bce34 Michael Hanselmann
    def _Canceled(ops):
615 db5bce34 Michael Hanselmann
      for op in ops:
616 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELED
617 db5bce34 Michael Hanselmann
618 db5bce34 Michael Hanselmann
    def _Error1(ops):
619 db5bce34 Michael Hanselmann
      for idx, op in enumerate(ops):
620 db5bce34 Michael Hanselmann
        if idx > 3:
621 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_ERROR
622 db5bce34 Michael Hanselmann
        else:
623 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_SUCCESS
624 db5bce34 Michael Hanselmann
625 db5bce34 Michael Hanselmann
    def _Error2(ops):
626 db5bce34 Michael Hanselmann
      for op in ops:
627 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
628 db5bce34 Michael Hanselmann
629 db5bce34 Michael Hanselmann
    def _Success(ops):
630 db5bce34 Michael Hanselmann
      for op in ops:
631 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
632 db5bce34 Michael Hanselmann
633 db5bce34 Michael Hanselmann
    tests = {
634 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_QUEUED: [_Queued],
635 47099cd1 Michael Hanselmann
      constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
636 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_RUNNING: [_Running],
637 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
638 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELED: [_Canceled],
639 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
640 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_SUCCESS: [_Success],
641 db5bce34 Michael Hanselmann
      }
642 db5bce34 Michael Hanselmann
643 db5bce34 Michael Hanselmann
    def _NewJob():
644 db5bce34 Michael Hanselmann
      job = jqueue._QueuedJob(None, 1,
645 c0f6d0d8 Michael Hanselmann
                              [opcodes.OpTestDelay() for _ in range(10)],
646 c0f6d0d8 Michael Hanselmann
                              True)
647 db5bce34 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
648 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
649 db5bce34 Michael Hanselmann
                              for op in job.ops))
650 db5bce34 Michael Hanselmann
      return job
651 db5bce34 Michael Hanselmann
652 db5bce34 Michael Hanselmann
    for status in constants.JOB_STATUS_ALL:
653 db5bce34 Michael Hanselmann
      sttests = tests[status]
654 db5bce34 Michael Hanselmann
      assert sttests
655 db5bce34 Michael Hanselmann
      for fn in sttests:
656 db5bce34 Michael Hanselmann
        job = _NewJob()
657 db5bce34 Michael Hanselmann
        fn(job.ops)
658 db5bce34 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), status)
659 db5bce34 Michael Hanselmann
660 8f5c488d Michael Hanselmann
661 b95479a5 Michael Hanselmann
class _FakeDependencyManager:
662 be760ba8 Michael Hanselmann
  def __init__(self):
663 b95479a5 Michael Hanselmann
    self._checks = []
664 b95479a5 Michael Hanselmann
    self._notifications = []
665 b95479a5 Michael Hanselmann
    self._waiting = set()
666 b95479a5 Michael Hanselmann
667 b95479a5 Michael Hanselmann
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
668 b95479a5 Michael Hanselmann
    self._checks.append((job, dep_job_id, dep_status, result))
669 b95479a5 Michael Hanselmann
670 b95479a5 Michael Hanselmann
  def CountPendingResults(self):
671 b95479a5 Michael Hanselmann
    return len(self._checks)
672 b95479a5 Michael Hanselmann
673 b95479a5 Michael Hanselmann
  def CountWaitingJobs(self):
674 b95479a5 Michael Hanselmann
    return len(self._waiting)
675 b95479a5 Michael Hanselmann
676 b95479a5 Michael Hanselmann
  def GetNextNotification(self):
677 b95479a5 Michael Hanselmann
    return self._notifications.pop(0)
678 b95479a5 Michael Hanselmann
679 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
680 b95479a5 Michael Hanselmann
    return job in self._waiting
681 b95479a5 Michael Hanselmann
682 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
683 b95479a5 Michael Hanselmann
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
684 b95479a5 Michael Hanselmann
685 b95479a5 Michael Hanselmann
    assert exp_job == job
686 b95479a5 Michael Hanselmann
    assert exp_dep_job_id == dep_job_id
687 b95479a5 Michael Hanselmann
    assert exp_dep_status == dep_status
688 b95479a5 Michael Hanselmann
689 b95479a5 Michael Hanselmann
    (result_status, _) = result
690 b95479a5 Michael Hanselmann
691 b95479a5 Michael Hanselmann
    if result_status == jqueue._JobDependencyManager.WAIT:
692 b95479a5 Michael Hanselmann
      self._waiting.add(job)
693 b95479a5 Michael Hanselmann
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
694 b95479a5 Michael Hanselmann
      self._waiting.remove(job)
695 b95479a5 Michael Hanselmann
696 b95479a5 Michael Hanselmann
    return result
697 b95479a5 Michael Hanselmann
698 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
699 b95479a5 Michael Hanselmann
    self._notifications.append(job_id)
700 b95479a5 Michael Hanselmann
701 b95479a5 Michael Hanselmann
702 b95479a5 Michael Hanselmann
class _DisabledFakeDependencyManager:
703 b95479a5 Michael Hanselmann
  def JobWaiting(self, _):
704 b95479a5 Michael Hanselmann
    return False
705 b95479a5 Michael Hanselmann
706 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, *args):
707 b95479a5 Michael Hanselmann
    assert False, "Should not be called"
708 b95479a5 Michael Hanselmann
709 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, _):
710 b95479a5 Michael Hanselmann
    pass
711 b95479a5 Michael Hanselmann
712 b95479a5 Michael Hanselmann
713 b95479a5 Michael Hanselmann
class _FakeQueueForProc:
714 b95479a5 Michael Hanselmann
  def __init__(self, depmgr=None):
715 be760ba8 Michael Hanselmann
    self._acquired = False
716 ebb2a2a3 Michael Hanselmann
    self._updates = []
717 6a373640 Michael Hanselmann
    self._submitted = []
718 942e2262 Michael Hanselmann
    self._accepting_jobs = True
719 6a373640 Michael Hanselmann
720 6a373640 Michael Hanselmann
    self._submit_count = itertools.count(1000)
721 be760ba8 Michael Hanselmann
722 b95479a5 Michael Hanselmann
    if depmgr:
723 b95479a5 Michael Hanselmann
      self.depmgr = depmgr
724 b95479a5 Michael Hanselmann
    else:
725 b95479a5 Michael Hanselmann
      self.depmgr = _DisabledFakeDependencyManager()
726 b95479a5 Michael Hanselmann
727 be760ba8 Michael Hanselmann
  def IsAcquired(self):
728 be760ba8 Michael Hanselmann
    return self._acquired
729 be760ba8 Michael Hanselmann
730 ebb2a2a3 Michael Hanselmann
  def GetNextUpdate(self):
731 ebb2a2a3 Michael Hanselmann
    return self._updates.pop(0)
732 ebb2a2a3 Michael Hanselmann
733 6a373640 Michael Hanselmann
  def GetNextSubmittedJob(self):
734 6a373640 Michael Hanselmann
    return self._submitted.pop(0)
735 6a373640 Michael Hanselmann
736 be760ba8 Michael Hanselmann
  def acquire(self, shared=0):
737 be760ba8 Michael Hanselmann
    assert shared == 1
738 be760ba8 Michael Hanselmann
    self._acquired = True
739 be760ba8 Michael Hanselmann
740 be760ba8 Michael Hanselmann
  def release(self):
741 be760ba8 Michael Hanselmann
    assert self._acquired
742 be760ba8 Michael Hanselmann
    self._acquired = False
743 be760ba8 Michael Hanselmann
744 ebb2a2a3 Michael Hanselmann
  def UpdateJobUnlocked(self, job, replicate=True):
745 ebb2a2a3 Michael Hanselmann
    assert self._acquired, "Lock not acquired while updating job"
746 ebb2a2a3 Michael Hanselmann
    self._updates.append((job, bool(replicate)))
747 be760ba8 Michael Hanselmann
748 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
749 6a373640 Michael Hanselmann
    assert not self._acquired, "Lock acquired while submitting jobs"
750 6a373640 Michael Hanselmann
    job_ids = [self._submit_count.next() for _ in jobs]
751 6a373640 Michael Hanselmann
    self._submitted.extend(zip(job_ids, jobs))
752 6a373640 Michael Hanselmann
    return job_ids
753 6a373640 Michael Hanselmann
754 942e2262 Michael Hanselmann
  def StopAcceptingJobs(self):
755 942e2262 Michael Hanselmann
    self._accepting_jobs = False
756 942e2262 Michael Hanselmann
757 942e2262 Michael Hanselmann
  def AcceptingJobsUnlocked(self):
758 942e2262 Michael Hanselmann
    return self._accepting_jobs
759 942e2262 Michael Hanselmann
760 be760ba8 Michael Hanselmann
761 be760ba8 Michael Hanselmann
class _FakeExecOpCodeForProc:
762 ebb2a2a3 Michael Hanselmann
  def __init__(self, queue, before_start, after_start):
763 ebb2a2a3 Michael Hanselmann
    self._queue = queue
764 be760ba8 Michael Hanselmann
    self._before_start = before_start
765 be760ba8 Michael Hanselmann
    self._after_start = after_start
766 be760ba8 Michael Hanselmann
767 e4e59de8 Michael Hanselmann
  def __call__(self, op, cbs, timeout=None):
768 be760ba8 Michael Hanselmann
    assert isinstance(op, opcodes.OpTestDummy)
769 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired(), \
770 ebb2a2a3 Michael Hanselmann
           "Queue lock not released when executing opcode"
771 be760ba8 Michael Hanselmann
772 be760ba8 Michael Hanselmann
    if self._before_start:
773 e4e59de8 Michael Hanselmann
      self._before_start(timeout, cbs.CurrentPriority())
774 be760ba8 Michael Hanselmann
775 be760ba8 Michael Hanselmann
    cbs.NotifyStart()
776 be760ba8 Michael Hanselmann
777 be760ba8 Michael Hanselmann
    if self._after_start:
778 be760ba8 Michael Hanselmann
      self._after_start(op, cbs)
779 be760ba8 Michael Hanselmann
780 ebb2a2a3 Michael Hanselmann
    # Check again after the callbacks
781 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired()
782 ebb2a2a3 Michael Hanselmann
783 be760ba8 Michael Hanselmann
    if op.fail:
784 be760ba8 Michael Hanselmann
      raise errors.OpExecError("Error requested (%s)" % op.result)
785 be760ba8 Michael Hanselmann
786 6a373640 Michael Hanselmann
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
787 6a373640 Michael Hanselmann
      return cbs.SubmitManyJobs(op.submit_jobs)
788 6a373640 Michael Hanselmann
789 be760ba8 Michael Hanselmann
    return op.result
790 be760ba8 Michael Hanselmann
791 be760ba8 Michael Hanselmann
792 26d3fd2f Michael Hanselmann
class _JobProcessorTestUtils:
793 be760ba8 Michael Hanselmann
  def _CreateJob(self, queue, job_id, ops):
794 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(queue, job_id, ops, True)
795 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
796 be760ba8 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
797 be760ba8 Michael Hanselmann
    self.assertEqual(len(ops), len(job.ops))
798 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.input == inp
799 be760ba8 Michael Hanselmann
                            for (op, inp) in zip(job.ops, ops)))
800 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
801 be760ba8 Michael Hanselmann
    return job
802 be760ba8 Michael Hanselmann
803 26d3fd2f Michael Hanselmann
804 26d3fd2f Michael Hanselmann
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
805 be760ba8 Michael Hanselmann
  def _GenericCheckJob(self, job):
806 be760ba8 Michael Hanselmann
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
807 be760ba8 Michael Hanselmann
                      for op in job.ops)
808 be760ba8 Michael Hanselmann
809 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
810 be760ba8 Michael Hanselmann
                     [[op.start_timestamp for op in job.ops],
811 be760ba8 Michael Hanselmann
                      [op.exec_timestamp for op in job.ops],
812 be760ba8 Michael Hanselmann
                      [op.end_timestamp for op in job.ops]])
813 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
814 be760ba8 Michael Hanselmann
                     [job.received_timestamp,
815 be760ba8 Michael Hanselmann
                      job.start_timestamp,
816 be760ba8 Michael Hanselmann
                      job.end_timestamp])
817 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
818 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
819 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
820 be760ba8 Michael Hanselmann
821 be760ba8 Michael Hanselmann
  def testSuccess(self):
822 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
823 be760ba8 Michael Hanselmann
824 be760ba8 Michael Hanselmann
    for (job_id, opcount) in [(25351, 1), (6637, 3),
825 be760ba8 Michael Hanselmann
                              (24644, 10), (32207, 100)]:
826 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
827 be760ba8 Michael Hanselmann
             for i in range(opcount)]
828 be760ba8 Michael Hanselmann
829 be760ba8 Michael Hanselmann
      # Create job
830 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
831 be760ba8 Michael Hanselmann
832 f23db633 Michael Hanselmann
      def _BeforeStart(timeout, priority):
833 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
834 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
835 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
836 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
837 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
838 be760ba8 Michael Hanselmann
839 be760ba8 Michael Hanselmann
      def _AfterStart(op, cbs):
840 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
841 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
842 ebb2a2a3 Michael Hanselmann
843 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
844 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
845 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
846 be760ba8 Michael Hanselmann
847 be760ba8 Michael Hanselmann
        # Job is running, cancelling shouldn't be possible
848 be760ba8 Michael Hanselmann
        (success, _) = job.Cancel()
849 be760ba8 Michael Hanselmann
        self.assertFalse(success)
850 be760ba8 Michael Hanselmann
851 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
852 be760ba8 Michael Hanselmann
853 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
854 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
855 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
856 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
857 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
858 be760ba8 Michael Hanselmann
        if idx == len(ops) - 1:
859 be760ba8 Michael Hanselmann
          # Last opcode
860 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
861 be760ba8 Michael Hanselmann
        else:
862 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.DEFER)
863 be760ba8 Michael Hanselmann
864 be760ba8 Michael Hanselmann
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
865 be760ba8 Michael Hanselmann
          self.assert_(job.start_timestamp)
866 be760ba8 Michael Hanselmann
          self.assertFalse(job.end_timestamp)
867 be760ba8 Michael Hanselmann
868 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
869 ebb2a2a3 Michael Hanselmann
870 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
871 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
872 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opresult"]),
873 be760ba8 Michael Hanselmann
                       [[op.input.result for op in job.ops]])
874 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
875 be760ba8 Michael Hanselmann
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
876 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
877 be760ba8 Michael Hanselmann
                              for op in job.ops))
878 be760ba8 Michael Hanselmann
879 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
880 be760ba8 Michael Hanselmann
881 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
882 75d81fc8 Michael Hanselmann
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
883 75d81fc8 Michael Hanselmann
                       jqueue._JobProcessor.FINISHED)
884 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
885 be760ba8 Michael Hanselmann
886 be760ba8 Michael Hanselmann
  def testOpcodeError(self):
887 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
888 be760ba8 Michael Hanselmann
889 be760ba8 Michael Hanselmann
    testdata = [
890 be760ba8 Michael Hanselmann
      (17077, 1, 0, 0),
891 be760ba8 Michael Hanselmann
      (1782, 5, 2, 2),
892 be760ba8 Michael Hanselmann
      (18179, 10, 9, 9),
893 be760ba8 Michael Hanselmann
      (4744, 10, 3, 8),
894 be760ba8 Michael Hanselmann
      (23816, 100, 39, 45),
895 be760ba8 Michael Hanselmann
      ]
896 be760ba8 Michael Hanselmann
897 be760ba8 Michael Hanselmann
    for (job_id, opcount, failfrom, failto) in testdata:
898 be760ba8 Michael Hanselmann
      # Prepare opcodes
899 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
900 be760ba8 Michael Hanselmann
                                 fail=(failfrom <= i and
901 be760ba8 Michael Hanselmann
                                       i <= failto))
902 be760ba8 Michael Hanselmann
             for i in range(opcount)]
903 be760ba8 Michael Hanselmann
904 be760ba8 Michael Hanselmann
      # Create job
905 a06c6ae8 Michael Hanselmann
      job = self._CreateJob(queue, str(job_id), ops)
906 be760ba8 Michael Hanselmann
907 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, None, None)
908 be760ba8 Michael Hanselmann
909 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
910 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
911 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
912 ebb2a2a3 Michael Hanselmann
        # queued to waitlock
913 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
914 ebb2a2a3 Michael Hanselmann
        # waitlock to running
915 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
916 ebb2a2a3 Michael Hanselmann
        # Opcode result
917 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
918 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
919 be760ba8 Michael Hanselmann
920 be760ba8 Michael Hanselmann
        if idx in (failfrom, len(ops) - 1):
921 be760ba8 Michael Hanselmann
          # Last opcode
922 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
923 be760ba8 Michael Hanselmann
          break
924 be760ba8 Michael Hanselmann
925 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
926 be760ba8 Michael Hanselmann
927 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
928 be760ba8 Michael Hanselmann
929 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
930 ebb2a2a3 Michael Hanselmann
931 be760ba8 Michael Hanselmann
      # Check job status
932 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
933 76b62028 Iustin Pop
      self.assertEqual(job.GetInfo(["id"]), [job_id])
934 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
935 be760ba8 Michael Hanselmann
936 be760ba8 Michael Hanselmann
      # Check opcode status
937 be760ba8 Michael Hanselmann
      data = zip(job.ops,
938 be760ba8 Michael Hanselmann
                 job.GetInfo(["opstatus"])[0],
939 be760ba8 Michael Hanselmann
                 job.GetInfo(["opresult"])[0])
940 be760ba8 Michael Hanselmann
941 be760ba8 Michael Hanselmann
      for idx, (op, opstatus, opresult) in enumerate(data):
942 be760ba8 Michael Hanselmann
        if idx < failfrom:
943 be760ba8 Michael Hanselmann
          assert not op.input.fail
944 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
945 be760ba8 Michael Hanselmann
          self.assertEqual(opresult, op.input.result)
946 be760ba8 Michael Hanselmann
        elif idx <= failto:
947 be760ba8 Michael Hanselmann
          assert op.input.fail
948 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
949 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
950 be760ba8 Michael Hanselmann
        else:
951 be760ba8 Michael Hanselmann
          assert not op.input.fail
952 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
953 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
954 be760ba8 Michael Hanselmann
955 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
956 be760ba8 Michael Hanselmann
                              for op in job.ops[:failfrom]))
957 be760ba8 Michael Hanselmann
958 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
959 be760ba8 Michael Hanselmann
960 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
961 75d81fc8 Michael Hanselmann
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
962 75d81fc8 Michael Hanselmann
                       jqueue._JobProcessor.FINISHED)
963 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
964 be760ba8 Michael Hanselmann
965 be760ba8 Michael Hanselmann
  def testCancelWhileInQueue(self):
966 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
967 be760ba8 Michael Hanselmann
968 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
969 be760ba8 Michael Hanselmann
           for i in range(5)]
970 be760ba8 Michael Hanselmann
971 be760ba8 Michael Hanselmann
    # Create job
972 be760ba8 Michael Hanselmann
    job_id = 17045
973 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
974 be760ba8 Michael Hanselmann
975 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
976 be760ba8 Michael Hanselmann
977 be760ba8 Michael Hanselmann
    # Mark as cancelled
978 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
979 be760ba8 Michael Hanselmann
    self.assert_(success)
980 be760ba8 Michael Hanselmann
981 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
982 ebb2a2a3 Michael Hanselmann
983 66bd7445 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
984 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
985 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
986 be760ba8 Michael Hanselmann
                            for op in job.ops))
987 be760ba8 Michael Hanselmann
988 66bd7445 Michael Hanselmann
    # Serialize to check for differences
989 66bd7445 Michael Hanselmann
    before_proc = job.Serialize()
990 66bd7445 Michael Hanselmann
991 66bd7445 Michael Hanselmann
    # Simulate processor called in workerpool
992 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
993 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
994 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
995 30c945d0 Michael Hanselmann
996 30c945d0 Michael Hanselmann
    # Check result
997 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
998 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
999 30c945d0 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
1000 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
1001 30c945d0 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1002 30c945d0 Michael Hanselmann
                                for op in job.ops))
1003 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1004 30c945d0 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1005 30c945d0 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1006 30c945d0 Michael Hanselmann
1007 66bd7445 Michael Hanselmann
    # Must not have changed or written
1008 66bd7445 Michael Hanselmann
    self.assertEqual(before_proc, job.Serialize())
1009 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1010 66bd7445 Michael Hanselmann
1011 30c945d0 Michael Hanselmann
  def testCancelWhileWaitlockInQueue(self):
1012 30c945d0 Michael Hanselmann
    queue = _FakeQueueForProc()
1013 30c945d0 Michael Hanselmann
1014 30c945d0 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1015 30c945d0 Michael Hanselmann
           for i in range(5)]
1016 30c945d0 Michael Hanselmann
1017 30c945d0 Michael Hanselmann
    # Create job
1018 30c945d0 Michael Hanselmann
    job_id = 8645
1019 30c945d0 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1020 30c945d0 Michael Hanselmann
1021 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1022 30c945d0 Michael Hanselmann
1023 47099cd1 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_WAITING
1024 30c945d0 Michael Hanselmann
1025 30c945d0 Michael Hanselmann
    assert len(job.ops) == 5
1026 30c945d0 Michael Hanselmann
1027 47099cd1 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1028 30c945d0 Michael Hanselmann
1029 30c945d0 Michael Hanselmann
    # Mark as cancelling
1030 30c945d0 Michael Hanselmann
    (success, _) = job.Cancel()
1031 30c945d0 Michael Hanselmann
    self.assert_(success)
1032 30c945d0 Michael Hanselmann
1033 30c945d0 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1034 30c945d0 Michael Hanselmann
1035 30c945d0 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1036 30c945d0 Michael Hanselmann
                            for op in job.ops))
1037 30c945d0 Michael Hanselmann
1038 30c945d0 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1039 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1040 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1041 be760ba8 Michael Hanselmann
1042 be760ba8 Michael Hanselmann
    # Check result
1043 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1044 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1045 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
1046 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
1047 be760ba8 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1048 be760ba8 Michael Hanselmann
                                for op in job.ops))
1049 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1050 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1051 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1052 be760ba8 Michael Hanselmann
1053 be760ba8 Michael Hanselmann
  def testCancelWhileWaitlock(self):
1054 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1055 be760ba8 Michael Hanselmann
1056 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1057 be760ba8 Michael Hanselmann
           for i in range(5)]
1058 be760ba8 Michael Hanselmann
1059 be760ba8 Michael Hanselmann
    # Create job
1060 be760ba8 Michael Hanselmann
    job_id = 11009
1061 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1062 be760ba8 Michael Hanselmann
1063 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1064 be760ba8 Michael Hanselmann
1065 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1066 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1067 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1068 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1069 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1070 be760ba8 Michael Hanselmann
1071 be760ba8 Michael Hanselmann
      # Mark as cancelled
1072 be760ba8 Michael Hanselmann
      (success, _) = job.Cancel()
1073 be760ba8 Michael Hanselmann
      self.assert_(success)
1074 be760ba8 Michael Hanselmann
1075 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1076 be760ba8 Michael Hanselmann
                              for op in job.ops))
1077 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1078 be760ba8 Michael Hanselmann
1079 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
1080 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1081 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1082 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1083 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1084 be760ba8 Michael Hanselmann
1085 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1086 be760ba8 Michael Hanselmann
1087 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1088 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1089 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1090 ebb2a2a3 Michael Hanselmann
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1091 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1092 be760ba8 Michael Hanselmann
1093 be760ba8 Michael Hanselmann
    # Check result
1094 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1095 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1096 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
1097 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
1098 be760ba8 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1099 be760ba8 Michael Hanselmann
                                for op in job.ops))
1100 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1101 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1102 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1103 be760ba8 Michael Hanselmann
1104 942e2262 Michael Hanselmann
  def _TestCancelWhileSomething(self, cb):
1105 9e49dfc5 Michael Hanselmann
    queue = _FakeQueueForProc()
1106 9e49dfc5 Michael Hanselmann
1107 9e49dfc5 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1108 9e49dfc5 Michael Hanselmann
           for i in range(5)]
1109 9e49dfc5 Michael Hanselmann
1110 9e49dfc5 Michael Hanselmann
    # Create job
1111 9e49dfc5 Michael Hanselmann
    job_id = 24314
1112 9e49dfc5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1113 9e49dfc5 Michael Hanselmann
1114 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1115 9e49dfc5 Michael Hanselmann
1116 9e49dfc5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1117 9e49dfc5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1118 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1119 9e49dfc5 Michael Hanselmann
1120 9e49dfc5 Michael Hanselmann
      # Mark as cancelled
1121 9e49dfc5 Michael Hanselmann
      (success, _) = job.Cancel()
1122 9e49dfc5 Michael Hanselmann
      self.assert_(success)
1123 9e49dfc5 Michael Hanselmann
1124 9e49dfc5 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1125 9e49dfc5 Michael Hanselmann
                              for op in job.ops))
1126 9e49dfc5 Michael Hanselmann
1127 942e2262 Michael Hanselmann
      cb(queue)
1128 9e49dfc5 Michael Hanselmann
1129 9e49dfc5 Michael Hanselmann
    def _AfterStart(op, cbs):
1130 9e49dfc5 Michael Hanselmann
      self.fail("Should not reach this")
1131 9e49dfc5 Michael Hanselmann
1132 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1133 9e49dfc5 Michael Hanselmann
1134 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1135 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1136 9e49dfc5 Michael Hanselmann
1137 9e49dfc5 Michael Hanselmann
    # Check result
1138 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1139 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1140 9e49dfc5 Michael Hanselmann
    self.assert_(job.start_timestamp)
1141 9e49dfc5 Michael Hanselmann
    self.assert_(job.end_timestamp)
1142 9e49dfc5 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1143 9e49dfc5 Michael Hanselmann
                                for op in job.ops))
1144 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1145 9e49dfc5 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1146 9e49dfc5 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
1147 9e49dfc5 Michael Hanselmann
1148 942e2262 Michael Hanselmann
    return queue
1149 942e2262 Michael Hanselmann
1150 942e2262 Michael Hanselmann
  def testCancelWhileWaitlockWithTimeout(self):
1151 942e2262 Michael Hanselmann
    def fn(_):
1152 942e2262 Michael Hanselmann
      # Fake an acquire attempt timing out
1153 942e2262 Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
1154 942e2262 Michael Hanselmann
1155 942e2262 Michael Hanselmann
    self._TestCancelWhileSomething(fn)
1156 942e2262 Michael Hanselmann
1157 942e2262 Michael Hanselmann
  def testCancelDuringQueueShutdown(self):
1158 942e2262 Michael Hanselmann
    queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs())
1159 942e2262 Michael Hanselmann
    self.assertFalse(queue.AcceptingJobsUnlocked())
1160 942e2262 Michael Hanselmann
1161 be760ba8 Michael Hanselmann
  def testCancelWhileRunning(self):
1162 be760ba8 Michael Hanselmann
    # Tests canceling a job with finished opcodes and more, unprocessed ones
1163 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1164 be760ba8 Michael Hanselmann
1165 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1166 be760ba8 Michael Hanselmann
           for i in range(3)]
1167 be760ba8 Michael Hanselmann
1168 be760ba8 Michael Hanselmann
    # Create job
1169 76b62028 Iustin Pop
    job_id = 28492
1170 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1171 be760ba8 Michael Hanselmann
1172 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1173 be760ba8 Michael Hanselmann
1174 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1175 be760ba8 Michael Hanselmann
1176 be760ba8 Michael Hanselmann
    # Run one opcode
1177 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1178 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1179 be760ba8 Michael Hanselmann
1180 be760ba8 Michael Hanselmann
    # Job goes back to queued
1181 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1182 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1183 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1184 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
1185 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1186 be760ba8 Michael Hanselmann
                      ["Res0", None, None]])
1187 be760ba8 Michael Hanselmann
1188 be760ba8 Michael Hanselmann
    # Mark as cancelled
1189 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
1190 be760ba8 Michael Hanselmann
    self.assert_(success)
1191 be760ba8 Michael Hanselmann
1192 be760ba8 Michael Hanselmann
    # Try processing another opcode (this will actually cancel the job)
1193 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1194 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1195 be760ba8 Michael Hanselmann
1196 be760ba8 Michael Hanselmann
    # Check result
1197 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1198 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
1199 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1200 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1201 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1202 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
1203 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
1204 be760ba8 Michael Hanselmann
                      ["Res0", "Job canceled by request",
1205 be760ba8 Michael Hanselmann
                       "Job canceled by request"]])
1206 be760ba8 Michael Hanselmann
1207 942e2262 Michael Hanselmann
  def _TestQueueShutdown(self, queue, opexec, job, runcount):
1208 942e2262 Michael Hanselmann
    self.assertTrue(queue.AcceptingJobsUnlocked())
1209 942e2262 Michael Hanselmann
1210 942e2262 Michael Hanselmann
    # Simulate shutdown
1211 942e2262 Michael Hanselmann
    queue.StopAcceptingJobs()
1212 942e2262 Michael Hanselmann
1213 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1214 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1215 942e2262 Michael Hanselmann
1216 942e2262 Michael Hanselmann
    # Check result
1217 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1218 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
1219 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1220 942e2262 Michael Hanselmann
    self.assertTrue(job.start_timestamp)
1221 942e2262 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
1222 942e2262 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1223 942e2262 Michael Hanselmann
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1224 942e2262 Michael Hanselmann
                               for op in job.ops[:runcount]))
1225 942e2262 Michael Hanselmann
    self.assertFalse(job.ops[runcount].end_timestamp)
1226 942e2262 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1227 942e2262 Michael Hanselmann
                                for op in job.ops[(runcount + 1):]))
1228 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1229 942e2262 Michael Hanselmann
                     [(([constants.OP_STATUS_SUCCESS] * runcount) +
1230 942e2262 Michael Hanselmann
                       ([constants.OP_STATUS_QUEUED] *
1231 942e2262 Michael Hanselmann
                        (len(job.ops) - runcount))),
1232 942e2262 Michael Hanselmann
                      (["Res%s" % i for i in range(runcount)] +
1233 942e2262 Michael Hanselmann
                       ([None] * (len(job.ops) - runcount)))])
1234 942e2262 Michael Hanselmann
1235 942e2262 Michael Hanselmann
    # Must have been written and replicated
1236 942e2262 Michael Hanselmann
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1237 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1238 942e2262 Michael Hanselmann
1239 942e2262 Michael Hanselmann
  def testQueueShutdownWhileRunning(self):
1240 942e2262 Michael Hanselmann
    # Tests shutting down the queue while a job is running
1241 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1242 942e2262 Michael Hanselmann
1243 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1244 942e2262 Michael Hanselmann
           for i in range(3)]
1245 942e2262 Michael Hanselmann
1246 942e2262 Michael Hanselmann
    # Create job
1247 942e2262 Michael Hanselmann
    job_id = 2718211587
1248 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1249 942e2262 Michael Hanselmann
1250 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1251 942e2262 Michael Hanselmann
1252 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1253 942e2262 Michael Hanselmann
1254 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1255 942e2262 Michael Hanselmann
1256 942e2262 Michael Hanselmann
    # Run one opcode
1257 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1258 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1259 942e2262 Michael Hanselmann
1260 942e2262 Michael Hanselmann
    # Job goes back to queued
1261 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1262 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1263 942e2262 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1264 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
1265 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1266 942e2262 Michael Hanselmann
                      ["Res0", None, None]])
1267 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1268 942e2262 Michael Hanselmann
1269 942e2262 Michael Hanselmann
    # Writes for waiting, running and result
1270 942e2262 Michael Hanselmann
    for _ in range(3):
1271 942e2262 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1272 942e2262 Michael Hanselmann
1273 942e2262 Michael Hanselmann
    # Run second opcode
1274 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1275 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1276 942e2262 Michael Hanselmann
1277 942e2262 Michael Hanselmann
    # Job goes back to queued
1278 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1279 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1280 942e2262 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1281 942e2262 Michael Hanselmann
                       constants.OP_STATUS_SUCCESS,
1282 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1283 942e2262 Michael Hanselmann
                      ["Res0", "Res1", None]])
1284 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1285 942e2262 Michael Hanselmann
1286 942e2262 Michael Hanselmann
    # Writes for waiting, running and result
1287 942e2262 Michael Hanselmann
    for _ in range(3):
1288 942e2262 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1289 942e2262 Michael Hanselmann
1290 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 2)
1291 942e2262 Michael Hanselmann
1292 942e2262 Michael Hanselmann
  def testQueueShutdownWithLockTimeout(self):
1293 942e2262 Michael Hanselmann
    # Tests shutting down while a lock acquire times out
1294 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1295 942e2262 Michael Hanselmann
1296 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1297 942e2262 Michael Hanselmann
           for i in range(3)]
1298 942e2262 Michael Hanselmann
1299 942e2262 Michael Hanselmann
    # Create job
1300 942e2262 Michael Hanselmann
    job_id = 1304231178
1301 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1302 942e2262 Michael Hanselmann
1303 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1304 942e2262 Michael Hanselmann
1305 942e2262 Michael Hanselmann
    acquire_timeout = False
1306 942e2262 Michael Hanselmann
1307 942e2262 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1308 942e2262 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1309 942e2262 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1310 942e2262 Michael Hanselmann
      if acquire_timeout:
1311 942e2262 Michael Hanselmann
        raise mcpu.LockAcquireTimeout()
1312 942e2262 Michael Hanselmann
1313 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None)
1314 942e2262 Michael Hanselmann
1315 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1316 942e2262 Michael Hanselmann
1317 942e2262 Michael Hanselmann
    # Run one opcode
1318 942e2262 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1319 942e2262 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
1320 942e2262 Michael Hanselmann
1321 942e2262 Michael Hanselmann
    # Job goes back to queued
1322 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1323 942e2262 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1324 942e2262 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1325 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
1326 942e2262 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
1327 942e2262 Michael Hanselmann
                      ["Res0", None, None]])
1328 942e2262 Michael Hanselmann
    self.assertFalse(job.cur_opctx)
1329 942e2262 Michael Hanselmann
1330 942e2262 Michael Hanselmann
    # Writes for waiting, running and result
1331 942e2262 Michael Hanselmann
    for _ in range(3):
1332 942e2262 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1333 942e2262 Michael Hanselmann
1334 942e2262 Michael Hanselmann
    # The next opcode should have expiring lock acquires
1335 942e2262 Michael Hanselmann
    acquire_timeout = True
1336 942e2262 Michael Hanselmann
1337 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 1)
1338 942e2262 Michael Hanselmann
1339 942e2262 Michael Hanselmann
  def testQueueShutdownWhileInQueue(self):
1340 942e2262 Michael Hanselmann
    # This should never happen in reality (no new jobs are started by the
1341 942e2262 Michael Hanselmann
    # workerpool once a shutdown has been initiated), but it's better to test
1342 942e2262 Michael Hanselmann
    # the job processor for this scenario
1343 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1344 942e2262 Michael Hanselmann
1345 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1346 942e2262 Michael Hanselmann
           for i in range(5)]
1347 942e2262 Michael Hanselmann
1348 942e2262 Michael Hanselmann
    # Create job
1349 942e2262 Michael Hanselmann
    job_id = 2031
1350 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1351 942e2262 Michael Hanselmann
1352 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1353 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1354 942e2262 Michael Hanselmann
1355 942e2262 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
1356 942e2262 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
1357 942e2262 Michael Hanselmann
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
1358 942e2262 Michael Hanselmann
                               for op in job.ops))
1359 942e2262 Michael Hanselmann
1360 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1361 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 0)
1362 942e2262 Michael Hanselmann
1363 942e2262 Michael Hanselmann
  def testQueueShutdownWhileWaitlockInQueue(self):
1364 942e2262 Michael Hanselmann
    queue = _FakeQueueForProc()
1365 942e2262 Michael Hanselmann
1366 942e2262 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1367 942e2262 Michael Hanselmann
           for i in range(5)]
1368 942e2262 Michael Hanselmann
1369 942e2262 Michael Hanselmann
    # Create job
1370 942e2262 Michael Hanselmann
    job_id = 53125685
1371 942e2262 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1372 942e2262 Michael Hanselmann
1373 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1374 942e2262 Michael Hanselmann
1375 942e2262 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_WAITING
1376 942e2262 Michael Hanselmann
1377 942e2262 Michael Hanselmann
    assert len(job.ops) == 5
1378 942e2262 Michael Hanselmann
1379 942e2262 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1380 942e2262 Michael Hanselmann
1381 942e2262 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1382 942e2262 Michael Hanselmann
1383 942e2262 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1384 942e2262 Michael Hanselmann
    self._TestQueueShutdown(queue, opexec, job, 0)
1385 942e2262 Michael Hanselmann
1386 be760ba8 Michael Hanselmann
  def testPartiallyRun(self):
1387 be760ba8 Michael Hanselmann
    # Tests calling the processor on a job that's been partially run before the
1388 be760ba8 Michael Hanselmann
    # program was restarted
1389 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1390 be760ba8 Michael Hanselmann
1391 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1392 be760ba8 Michael Hanselmann
1393 be760ba8 Michael Hanselmann
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
1394 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1395 be760ba8 Michael Hanselmann
             for i in range(10)]
1396 be760ba8 Michael Hanselmann
1397 be760ba8 Michael Hanselmann
      # Create job
1398 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
1399 be760ba8 Michael Hanselmann
1400 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1401 be760ba8 Michael Hanselmann
1402 be760ba8 Michael Hanselmann
      for _ in range(successcount):
1403 75d81fc8 Michael Hanselmann
        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1404 75d81fc8 Michael Hanselmann
                         jqueue._JobProcessor.DEFER)
1405 be760ba8 Michael Hanselmann
1406 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1407 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
1408 be760ba8 Michael Hanselmann
                       [[constants.OP_STATUS_SUCCESS
1409 be760ba8 Michael Hanselmann
                         for _ in range(successcount)] +
1410 be760ba8 Michael Hanselmann
                        [constants.OP_STATUS_QUEUED
1411 be760ba8 Michael Hanselmann
                         for _ in range(len(ops) - successcount)]])
1412 be760ba8 Michael Hanselmann
1413 03b63608 Michael Hanselmann
      self.assert_(job.ops_iter)
1414 be760ba8 Michael Hanselmann
1415 be760ba8 Michael Hanselmann
      # Serialize and restore (simulates program restart)
1416 8a3cd185 Michael Hanselmann
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1417 03b63608 Michael Hanselmann
      self.assertFalse(newjob.ops_iter)
1418 be760ba8 Michael Hanselmann
      self._TestPartial(newjob, successcount)
1419 be760ba8 Michael Hanselmann
1420 be760ba8 Michael Hanselmann
  def _TestPartial(self, job, successcount):
1421 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1422 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1423 be760ba8 Michael Hanselmann
1424 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1425 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1426 be760ba8 Michael Hanselmann
1427 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops) - successcount)):
1428 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1429 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1430 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1431 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1432 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1433 be760ba8 Michael Hanselmann
1434 be760ba8 Michael Hanselmann
      if remaining == 0:
1435 be760ba8 Michael Hanselmann
        # Last opcode
1436 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1437 be760ba8 Michael Hanselmann
        break
1438 be760ba8 Michael Hanselmann
1439 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1440 be760ba8 Michael Hanselmann
1441 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1442 be760ba8 Michael Hanselmann
1443 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1444 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1445 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1446 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1447 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1448 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1449 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1450 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1451 be760ba8 Michael Hanselmann
                            for op in job.ops))
1452 be760ba8 Michael Hanselmann
1453 be760ba8 Michael Hanselmann
    self._GenericCheckJob(job)
1454 be760ba8 Michael Hanselmann
1455 66bd7445 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1456 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1457 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1458 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1459 be760ba8 Michael Hanselmann
1460 be760ba8 Michael Hanselmann
    # ... also after being restored
1461 8a3cd185 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1462 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1463 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1464 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1465 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1466 be760ba8 Michael Hanselmann
1467 be760ba8 Michael Hanselmann
  def testProcessorOnRunningJob(self):
1468 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1469 be760ba8 Michael Hanselmann
1470 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1471 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1472 be760ba8 Michael Hanselmann
1473 be760ba8 Michael Hanselmann
    # Create job
1474 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 9571, ops)
1475 be760ba8 Michael Hanselmann
1476 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1477 be760ba8 Michael Hanselmann
1478 be760ba8 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
1479 be760ba8 Michael Hanselmann
1480 be760ba8 Michael Hanselmann
    assert len(job.ops) == 1
1481 be760ba8 Michael Hanselmann
1482 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1483 be760ba8 Michael Hanselmann
1484 be760ba8 Michael Hanselmann
    # Calling on running job must fail
1485 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
1486 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job))
1487 be760ba8 Michael Hanselmann
1488 be760ba8 Michael Hanselmann
  def testLogMessages(self):
1489 be760ba8 Michael Hanselmann
    # Tests the "Feedback" callback function
1490 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1491 be760ba8 Michael Hanselmann
1492 be760ba8 Michael Hanselmann
    messages = {
1493 be760ba8 Michael Hanselmann
      1: [
1494 be760ba8 Michael Hanselmann
        (None, "Hello"),
1495 be760ba8 Michael Hanselmann
        (None, "World"),
1496 be760ba8 Michael Hanselmann
        (constants.ELOG_MESSAGE, "there"),
1497 be760ba8 Michael Hanselmann
        ],
1498 be760ba8 Michael Hanselmann
      4: [
1499 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1500 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1501 be760ba8 Michael Hanselmann
        ],
1502 be760ba8 Michael Hanselmann
      }
1503 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1504 be760ba8 Michael Hanselmann
                               messages=messages.get(i, []))
1505 be760ba8 Michael Hanselmann
           for i in range(5)]
1506 be760ba8 Michael Hanselmann
1507 be760ba8 Michael Hanselmann
    # Create job
1508 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 29386, ops)
1509 be760ba8 Michael Hanselmann
1510 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1511 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1512 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1513 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1514 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1515 be760ba8 Michael Hanselmann
1516 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
1517 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1518 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1519 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1520 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1521 be760ba8 Michael Hanselmann
1522 be760ba8 Michael Hanselmann
      self.assertRaises(AssertionError, cbs.Feedback,
1523 be760ba8 Michael Hanselmann
                        "too", "many", "arguments")
1524 be760ba8 Michael Hanselmann
1525 be760ba8 Michael Hanselmann
      for (log_type, msg) in op.messages:
1526 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1527 be760ba8 Michael Hanselmann
        if log_type:
1528 be760ba8 Michael Hanselmann
          cbs.Feedback(log_type, msg)
1529 be760ba8 Michael Hanselmann
        else:
1530 be760ba8 Michael Hanselmann
          cbs.Feedback(msg)
1531 ebb2a2a3 Michael Hanselmann
        # Check for job update without replication
1532 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1533 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1534 be760ba8 Michael Hanselmann
1535 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1536 be760ba8 Michael Hanselmann
1537 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops))):
1538 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1539 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1540 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1541 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1542 be760ba8 Michael Hanselmann
1543 be760ba8 Michael Hanselmann
      if remaining == 0:
1544 be760ba8 Michael Hanselmann
        # Last opcode
1545 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1546 be760ba8 Michael Hanselmann
        break
1547 be760ba8 Michael Hanselmann
1548 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1549 be760ba8 Michael Hanselmann
1550 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1551 be760ba8 Michael Hanselmann
1552 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1553 ebb2a2a3 Michael Hanselmann
1554 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1555 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1556 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1557 be760ba8 Michael Hanselmann
1558 be760ba8 Michael Hanselmann
    logmsgcount = sum(len(m) for m in messages.values())
1559 be760ba8 Michael Hanselmann
1560 be760ba8 Michael Hanselmann
    self._CheckLogMessages(job, logmsgcount)
1561 be760ba8 Michael Hanselmann
1562 be760ba8 Michael Hanselmann
    # Serialize and restore (simulates program restart)
1563 8a3cd185 Michael Hanselmann
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1564 be760ba8 Michael Hanselmann
    self._CheckLogMessages(newjob, logmsgcount)
1565 be760ba8 Michael Hanselmann
1566 be760ba8 Michael Hanselmann
    # Check each message
1567 be760ba8 Michael Hanselmann
    prevserial = -1
1568 be760ba8 Michael Hanselmann
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1569 be760ba8 Michael Hanselmann
      for (serial, timestamp, log_type, msg) in oplog:
1570 be760ba8 Michael Hanselmann
        (exptype, expmsg) = messages.get(idx).pop(0)
1571 be760ba8 Michael Hanselmann
        if exptype:
1572 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, exptype)
1573 be760ba8 Michael Hanselmann
        else:
1574 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1575 be760ba8 Michael Hanselmann
        self.assertEqual(expmsg, msg)
1576 be760ba8 Michael Hanselmann
        self.assert_(serial > prevserial)
1577 be760ba8 Michael Hanselmann
        prevserial = serial
1578 be760ba8 Michael Hanselmann
1579 be760ba8 Michael Hanselmann
  def _CheckLogMessages(self, job, count):
1580 be760ba8 Michael Hanselmann
    # Check serial
1581 be760ba8 Michael Hanselmann
    self.assertEqual(job.log_serial, count)
1582 be760ba8 Michael Hanselmann
1583 be760ba8 Michael Hanselmann
    # No filter
1584 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(None),
1585 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1586 be760ba8 Michael Hanselmann
                      for entry in entries])
1587 be760ba8 Michael Hanselmann
1588 be760ba8 Michael Hanselmann
    # Filter with serial
1589 be760ba8 Michael Hanselmann
    assert count > 3
1590 be760ba8 Michael Hanselmann
    self.assert_(job.GetLogEntries(3))
1591 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(3),
1592 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1593 be760ba8 Michael Hanselmann
                      for entry in entries][3:])
1594 be760ba8 Michael Hanselmann
1595 be760ba8 Michael Hanselmann
    # No log message after highest serial
1596 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count))
1597 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count + 3))
1598 be760ba8 Michael Hanselmann
1599 6a373640 Michael Hanselmann
  def testSubmitManyJobs(self):
1600 6a373640 Michael Hanselmann
    queue = _FakeQueueForProc()
1601 6a373640 Michael Hanselmann
1602 6a373640 Michael Hanselmann
    job_id = 15656
1603 6a373640 Michael Hanselmann
    ops = [
1604 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1605 6a373640 Michael Hanselmann
                          submit_jobs=[]),
1606 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1607 6a373640 Michael Hanselmann
                          submit_jobs=[
1608 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1609 6a373640 Michael Hanselmann
                            ]),
1610 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False,
1611 6a373640 Michael Hanselmann
                          submit_jobs=[
1612 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1613 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1614 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1615 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1616 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1617 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1618 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1619 6a373640 Michael Hanselmann
                            ]),
1620 6a373640 Michael Hanselmann
      ]
1621 6a373640 Michael Hanselmann
1622 6a373640 Michael Hanselmann
    # Create job
1623 6a373640 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1624 6a373640 Michael Hanselmann
1625 6a373640 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1626 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1627 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1628 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1629 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1630 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1631 6a373640 Michael Hanselmann
1632 6a373640 Michael Hanselmann
    def _AfterStart(op, cbs):
1633 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1634 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1635 6a373640 Michael Hanselmann
1636 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1637 6a373640 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1638 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1639 6a373640 Michael Hanselmann
1640 6a373640 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1641 6a373640 Michael Hanselmann
      (success, _) = job.Cancel()
1642 6a373640 Michael Hanselmann
      self.assertFalse(success)
1643 6a373640 Michael Hanselmann
1644 6a373640 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1645 6a373640 Michael Hanselmann
1646 6a373640 Michael Hanselmann
    for idx in range(len(ops)):
1647 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1648 6a373640 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1649 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1650 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1651 6a373640 Michael Hanselmann
      if idx == len(ops) - 1:
1652 6a373640 Michael Hanselmann
        # Last opcode
1653 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1654 6a373640 Michael Hanselmann
      else:
1655 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
1656 6a373640 Michael Hanselmann
1657 6a373640 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1658 6a373640 Michael Hanselmann
        self.assert_(job.start_timestamp)
1659 6a373640 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1660 6a373640 Michael Hanselmann
1661 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1662 6a373640 Michael Hanselmann
1663 6a373640 Michael Hanselmann
    for idx, submitted_ops in enumerate(job_ops
1664 6a373640 Michael Hanselmann
                                        for op in ops
1665 6a373640 Michael Hanselmann
                                        for job_ops in op.submit_jobs):
1666 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextSubmittedJob(),
1667 6a373640 Michael Hanselmann
                       (1000 + idx, submitted_ops))
1668 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1669 6a373640 Michael Hanselmann
1670 6a373640 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1671 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1672 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1673 6a373640 Michael Hanselmann
                     [[[], [1000], [1001, 1002, 1003]]])
1674 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1675 6a373640 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1676 6a373640 Michael Hanselmann
1677 6a373640 Michael Hanselmann
    self._GenericCheckJob(job)
1678 6a373640 Michael Hanselmann
1679 1e6d5750 Iustin Pop
    # Calling the processor on a finished job should be a no-op
1680 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1681 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1682 1e6d5750 Iustin Pop
    self.assertRaises(IndexError, queue.GetNextUpdate)
1683 6a373640 Michael Hanselmann
1684 b95479a5 Michael Hanselmann
  def testJobDependency(self):
1685 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1686 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1687 b95479a5 Michael Hanselmann
1688 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1689 b95479a5 Michael Hanselmann
1690 b95479a5 Michael Hanselmann
    prev_job_id = 22113
1691 b95479a5 Michael Hanselmann
    prev_job_id2 = 28102
1692 b95479a5 Michael Hanselmann
    job_id = 29929
1693 b95479a5 Michael Hanselmann
    ops = [
1694 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1695 b95479a5 Michael Hanselmann
                          depends=[
1696 b95479a5 Michael Hanselmann
                            [prev_job_id2, None],
1697 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1698 b95479a5 Michael Hanselmann
                            ]),
1699 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False),
1700 b95479a5 Michael Hanselmann
      ]
1701 b95479a5 Michael Hanselmann
1702 b95479a5 Michael Hanselmann
    # Create job
1703 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1704 b95479a5 Michael Hanselmann
1705 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1706 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1707 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1708 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1709 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1710 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1711 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1712 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1713 b95479a5 Michael Hanselmann
1714 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1715 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1716 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1717 b95479a5 Michael Hanselmann
1718 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1719 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1720 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1721 b95479a5 Michael Hanselmann
1722 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1723 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1724 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1725 b95479a5 Michael Hanselmann
1726 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1727 b95479a5 Michael Hanselmann
1728 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1729 b95479a5 Michael Hanselmann
1730 b95479a5 Michael Hanselmann
    counter = itertools.count()
1731 b95479a5 Michael Hanselmann
    while True:
1732 b95479a5 Michael Hanselmann
      attempt = counter.next()
1733 b95479a5 Michael Hanselmann
1734 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1735 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1736 b95479a5 Michael Hanselmann
1737 b95479a5 Michael Hanselmann
      if attempt < 2:
1738 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1739 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1740 b95479a5 Michael Hanselmann
      elif attempt == 2:
1741 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1742 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1743 b95479a5 Michael Hanselmann
        # The processor will ask for the next dependency immediately
1744 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1745 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1746 b95479a5 Michael Hanselmann
      elif attempt < 5:
1747 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1748 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1749 b95479a5 Michael Hanselmann
      elif attempt == 5:
1750 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1751 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1752 b95479a5 Michael Hanselmann
      if attempt == 2:
1753 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 2)
1754 b95479a5 Michael Hanselmann
      elif attempt > 5:
1755 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1756 b95479a5 Michael Hanselmann
      else:
1757 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1758 b95479a5 Michael Hanselmann
1759 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1760 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt >= 5:
1761 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1762 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1763 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1764 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1765 b95479a5 Michael Hanselmann
1766 b95479a5 Michael Hanselmann
      if attempt < 5:
1767 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1768 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1769 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1770 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1771 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1772 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1773 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1774 b95479a5 Michael Hanselmann
        continue
1775 b95479a5 Michael Hanselmann
1776 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1777 b95479a5 Michael Hanselmann
        # Last opcode
1778 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1779 b95479a5 Michael Hanselmann
        break
1780 b95479a5 Michael Hanselmann
1781 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1782 b95479a5 Michael Hanselmann
1783 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1784 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1785 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1786 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1787 b95479a5 Michael Hanselmann
1788 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1789 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1790 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1791 b95479a5 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1792 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1793 b95479a5 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1794 b95479a5 Michael Hanselmann
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1795 b95479a5 Michael Hanselmann
                               for op in job.ops))
1796 b95479a5 Michael Hanselmann
1797 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1798 b95479a5 Michael Hanselmann
1799 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1800 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1801 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1802 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountWaitingJobs())
1803 b95479a5 Michael Hanselmann
1804 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1805 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1806 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1807 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1808 b95479a5 Michael Hanselmann
1809 b95479a5 Michael Hanselmann
  def testJobDependencyCancel(self):
1810 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1811 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1812 b95479a5 Michael Hanselmann
1813 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1814 b95479a5 Michael Hanselmann
1815 b95479a5 Michael Hanselmann
    prev_job_id = 13623
1816 b95479a5 Michael Hanselmann
    job_id = 30876
1817 b95479a5 Michael Hanselmann
    ops = [
1818 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1819 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1820 b95479a5 Michael Hanselmann
                          depends=[
1821 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1822 b95479a5 Michael Hanselmann
                            ]),
1823 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1824 b95479a5 Michael Hanselmann
      ]
1825 b95479a5 Michael Hanselmann
1826 b95479a5 Michael Hanselmann
    # Create job
1827 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1828 b95479a5 Michael Hanselmann
1829 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1830 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1831 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1832 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1833 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1834 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1835 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1836 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1837 b95479a5 Michael Hanselmann
1838 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1839 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1840 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1841 b95479a5 Michael Hanselmann
1842 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1843 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1844 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1845 b95479a5 Michael Hanselmann
1846 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1847 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1848 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1849 b95479a5 Michael Hanselmann
1850 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1851 b95479a5 Michael Hanselmann
1852 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1853 b95479a5 Michael Hanselmann
1854 b95479a5 Michael Hanselmann
    counter = itertools.count()
1855 b95479a5 Michael Hanselmann
    while True:
1856 b95479a5 Michael Hanselmann
      attempt = counter.next()
1857 b95479a5 Michael Hanselmann
1858 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1859 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1860 b95479a5 Michael Hanselmann
1861 b95479a5 Michael Hanselmann
      if attempt == 0:
1862 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1863 b95479a5 Michael Hanselmann
        pass
1864 b95479a5 Michael Hanselmann
      elif attempt < 4:
1865 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1866 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1867 b95479a5 Michael Hanselmann
      elif attempt == 4:
1868 b95479a5 Michael Hanselmann
        # Other job was cancelled
1869 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1870 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1871 b95479a5 Michael Hanselmann
1872 b95479a5 Michael Hanselmann
      if attempt == 0:
1873 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1874 b95479a5 Michael Hanselmann
      else:
1875 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1876 b95479a5 Michael Hanselmann
1877 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1878 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
1879 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1880 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1881 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1882 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1883 b95479a5 Michael Hanselmann
1884 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
1885 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1886 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1887 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1888 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1889 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1890 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1891 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1892 b95479a5 Michael Hanselmann
        continue
1893 b95479a5 Michael Hanselmann
1894 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1895 b95479a5 Michael Hanselmann
        # Last opcode
1896 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1897 b95479a5 Michael Hanselmann
        break
1898 b95479a5 Michael Hanselmann
1899 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1900 b95479a5 Michael Hanselmann
1901 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1902 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1903 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1904 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1905 b95479a5 Michael Hanselmann
1906 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1907 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1908 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1909 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1910 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
1911 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
1912 b95479a5 Michael Hanselmann
                      ["Res0", "Job canceled by request",
1913 b95479a5 Michael Hanselmann
                       "Job canceled by request"]])
1914 b95479a5 Michael Hanselmann
1915 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1916 b95479a5 Michael Hanselmann
1917 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1918 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1919 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1920 b95479a5 Michael Hanselmann
1921 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1922 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1923 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1924 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1925 b95479a5 Michael Hanselmann
1926 b95479a5 Michael Hanselmann
  def testJobDependencyWrongstatus(self):
1927 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1928 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1929 b95479a5 Michael Hanselmann
1930 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1931 b95479a5 Michael Hanselmann
1932 b95479a5 Michael Hanselmann
    prev_job_id = 9741
1933 b95479a5 Michael Hanselmann
    job_id = 11763
1934 b95479a5 Michael Hanselmann
    ops = [
1935 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1936 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1937 b95479a5 Michael Hanselmann
                          depends=[
1938 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1939 b95479a5 Michael Hanselmann
                            ]),
1940 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1941 b95479a5 Michael Hanselmann
      ]
1942 b95479a5 Michael Hanselmann
1943 b95479a5 Michael Hanselmann
    # Create job
1944 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1945 b95479a5 Michael Hanselmann
1946 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1947 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1948 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1949 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1950 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1951 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1952 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1953 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1954 b95479a5 Michael Hanselmann
1955 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1956 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1957 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1958 b95479a5 Michael Hanselmann
1959 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1960 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1961 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1962 b95479a5 Michael Hanselmann
1963 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1964 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1965 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1966 b95479a5 Michael Hanselmann
1967 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1968 b95479a5 Michael Hanselmann
1969 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1970 b95479a5 Michael Hanselmann
1971 b95479a5 Michael Hanselmann
    counter = itertools.count()
1972 b95479a5 Michael Hanselmann
    while True:
1973 b95479a5 Michael Hanselmann
      attempt = counter.next()
1974 b95479a5 Michael Hanselmann
1975 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1976 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1977 b95479a5 Michael Hanselmann
1978 b95479a5 Michael Hanselmann
      if attempt == 0:
1979 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1980 b95479a5 Michael Hanselmann
        pass
1981 b95479a5 Michael Hanselmann
      elif attempt < 4:
1982 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1983 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1984 b95479a5 Michael Hanselmann
      elif attempt == 4:
1985 b95479a5 Michael Hanselmann
        # Other job failed
1986 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1987 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1988 b95479a5 Michael Hanselmann
1989 b95479a5 Michael Hanselmann
      if attempt == 0:
1990 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1991 b95479a5 Michael Hanselmann
      else:
1992 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1993 b95479a5 Michael Hanselmann
1994 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1995 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
1996 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1997 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1998 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1999 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
2000 b95479a5 Michael Hanselmann
2001 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
2002 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
2003 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
2004 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
2005 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2006 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
2007 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
2008 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
2009 b95479a5 Michael Hanselmann
        continue
2010 b95479a5 Michael Hanselmann
2011 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
2012 b95479a5 Michael Hanselmann
        # Last opcode
2013 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
2014 b95479a5 Michael Hanselmann
        break
2015 b95479a5 Michael Hanselmann
2016 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
2017 b95479a5 Michael Hanselmann
2018 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
2019 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2020 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
2021 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
2022 b95479a5 Michael Hanselmann
2023 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
2024 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
2025 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
2026 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
2027 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR,
2028 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR]]),
2029 b95479a5 Michael Hanselmann
2030 b95479a5 Michael Hanselmann
    (opresult, ) = job.GetInfo(["opresult"])
2031 b95479a5 Michael Hanselmann
    self.assertEqual(len(opresult), len(ops))
2032 b95479a5 Michael Hanselmann
    self.assertEqual(opresult[0], "Res0")
2033 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[1]))
2034 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[2]))
2035 b95479a5 Michael Hanselmann
2036 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
2037 b95479a5 Michael Hanselmann
2038 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
2039 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2040 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
2041 b95479a5 Michael Hanselmann
2042 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
2043 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
2044 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
2045 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
2046 b95479a5 Michael Hanselmann
2047 be760ba8 Michael Hanselmann
2048 df5a5730 Michael Hanselmann
class TestEvaluateJobProcessorResult(unittest.TestCase):
2049 df5a5730 Michael Hanselmann
  def testFinished(self):
2050 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2051 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(30953)
2052 df5a5730 Michael Hanselmann
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2053 df5a5730 Michael Hanselmann
                                       jqueue._JobProcessor.FINISHED)
2054 df5a5730 Michael Hanselmann
    self.assertEqual(depmgr.GetNextNotification(), job.id)
2055 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2056 df5a5730 Michael Hanselmann
2057 df5a5730 Michael Hanselmann
  def testDefer(self):
2058 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2059 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(11326, priority=5463)
2060 df5a5730 Michael Hanselmann
    try:
2061 df5a5730 Michael Hanselmann
      jqueue._EvaluateJobProcessorResult(depmgr, job,
2062 df5a5730 Michael Hanselmann
                                         jqueue._JobProcessor.DEFER)
2063 df5a5730 Michael Hanselmann
    except workerpool.DeferTask, err:
2064 df5a5730 Michael Hanselmann
      self.assertEqual(err.priority, 5463)
2065 df5a5730 Michael Hanselmann
    else:
2066 df5a5730 Michael Hanselmann
      self.fail("Didn't raise exception")
2067 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2068 df5a5730 Michael Hanselmann
2069 df5a5730 Michael Hanselmann
  def testWaitdep(self):
2070 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2071 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(21317)
2072 df5a5730 Michael Hanselmann
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2073 df5a5730 Michael Hanselmann
                                       jqueue._JobProcessor.WAITDEP)
2074 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2075 df5a5730 Michael Hanselmann
2076 df5a5730 Michael Hanselmann
  def testOther(self):
2077 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
2078 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(5813)
2079 df5a5730 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
2080 df5a5730 Michael Hanselmann
                      jqueue._EvaluateJobProcessorResult,
2081 df5a5730 Michael Hanselmann
                      depmgr, job, "Other result")
2082 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2083 df5a5730 Michael Hanselmann
2084 df5a5730 Michael Hanselmann
2085 26d3fd2f Michael Hanselmann
class _FakeTimeoutStrategy:
2086 26d3fd2f Michael Hanselmann
  def __init__(self, timeouts):
2087 26d3fd2f Michael Hanselmann
    self.timeouts = timeouts
2088 26d3fd2f Michael Hanselmann
    self.attempts = 0
2089 26d3fd2f Michael Hanselmann
    self.last_timeout = None
2090 26d3fd2f Michael Hanselmann
2091 26d3fd2f Michael Hanselmann
  def NextAttempt(self):
2092 26d3fd2f Michael Hanselmann
    self.attempts += 1
2093 26d3fd2f Michael Hanselmann
    if self.timeouts:
2094 26d3fd2f Michael Hanselmann
      timeout = self.timeouts.pop(0)
2095 26d3fd2f Michael Hanselmann
    else:
2096 26d3fd2f Michael Hanselmann
      timeout = None
2097 26d3fd2f Michael Hanselmann
    self.last_timeout = timeout
2098 26d3fd2f Michael Hanselmann
    return timeout
2099 26d3fd2f Michael Hanselmann
2100 26d3fd2f Michael Hanselmann
2101 26d3fd2f Michael Hanselmann
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
2102 26d3fd2f Michael Hanselmann
  def setUp(self):
2103 26d3fd2f Michael Hanselmann
    self.queue = _FakeQueueForProc()
2104 26d3fd2f Michael Hanselmann
    self.job = None
2105 26d3fd2f Michael Hanselmann
    self.curop = None
2106 26d3fd2f Michael Hanselmann
    self.opcounter = None
2107 26d3fd2f Michael Hanselmann
    self.timeout_strategy = None
2108 26d3fd2f Michael Hanselmann
    self.retries = 0
2109 26d3fd2f Michael Hanselmann
    self.prev_tsop = None
2110 26d3fd2f Michael Hanselmann
    self.prev_prio = None
2111 5fd6b694 Michael Hanselmann
    self.prev_status = None
2112 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = None
2113 26d3fd2f Michael Hanselmann
    self.gave_lock = None
2114 26d3fd2f Michael Hanselmann
    self.done_lock_before_blocking = False
2115 26d3fd2f Michael Hanselmann
2116 f23db633 Michael Hanselmann
  def _BeforeStart(self, timeout, priority):
2117 26d3fd2f Michael Hanselmann
    job = self.job
2118 26d3fd2f Michael Hanselmann
2119 5fd6b694 Michael Hanselmann
    # If status has changed, job must've been written
2120 5fd6b694 Michael Hanselmann
    if self.prev_status != self.job.ops[self.curop].status:
2121 5fd6b694 Michael Hanselmann
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2122 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2123 5fd6b694 Michael Hanselmann
2124 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
2125 47099cd1 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2126 26d3fd2f Michael Hanselmann
2127 26d3fd2f Michael Hanselmann
    ts = self.timeout_strategy
2128 26d3fd2f Michael Hanselmann
2129 26d3fd2f Michael Hanselmann
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
2130 26d3fd2f Michael Hanselmann
    self.assertEqual(timeout, ts.last_timeout)
2131 f23db633 Michael Hanselmann
    self.assertEqual(priority, job.ops[self.curop].priority)
2132 26d3fd2f Michael Hanselmann
2133 26d3fd2f Michael Hanselmann
    self.gave_lock = True
2134 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = priority
2135 26d3fd2f Michael Hanselmann
2136 26d3fd2f Michael Hanselmann
    if (self.curop == 3 and
2137 26d3fd2f Michael Hanselmann
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
2138 26d3fd2f Michael Hanselmann
      # Give locks before running into blocking acquire
2139 26d3fd2f Michael Hanselmann
      assert self.retries == 7
2140 26d3fd2f Michael Hanselmann
      self.retries = 0
2141 26d3fd2f Michael Hanselmann
      self.done_lock_before_blocking = True
2142 26d3fd2f Michael Hanselmann
      return
2143 26d3fd2f Michael Hanselmann
2144 26d3fd2f Michael Hanselmann
    if self.retries > 0:
2145 26d3fd2f Michael Hanselmann
      self.assert_(timeout is not None)
2146 26d3fd2f Michael Hanselmann
      self.retries -= 1
2147 26d3fd2f Michael Hanselmann
      self.gave_lock = False
2148 26d3fd2f Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
2149 26d3fd2f Michael Hanselmann
2150 26d3fd2f Michael Hanselmann
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
2151 26d3fd2f Michael Hanselmann
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
2152 26d3fd2f Michael Hanselmann
      assert not ts.timeouts
2153 26d3fd2f Michael Hanselmann
      self.assert_(timeout is None)
2154 26d3fd2f Michael Hanselmann
2155 26d3fd2f Michael Hanselmann
  def _AfterStart(self, op, cbs):
2156 26d3fd2f Michael Hanselmann
    job = self.job
2157 26d3fd2f Michael Hanselmann
2158 5fd6b694 Michael Hanselmann
    # Setting to "running" requires an update
2159 ebb2a2a3 Michael Hanselmann
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2160 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2161 5fd6b694 Michael Hanselmann
2162 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
2163 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
2164 26d3fd2f Michael Hanselmann
2165 26d3fd2f Michael Hanselmann
    # Job is running, cancelling shouldn't be possible
2166 26d3fd2f Michael Hanselmann
    (success, _) = job.Cancel()
2167 26d3fd2f Michael Hanselmann
    self.assertFalse(success)
2168 26d3fd2f Michael Hanselmann
2169 26d3fd2f Michael Hanselmann
  def _NextOpcode(self):
2170 26d3fd2f Michael Hanselmann
    self.curop = self.opcounter.next()
2171 26d3fd2f Michael Hanselmann
    self.prev_prio = self.job.ops[self.curop].priority
2172 5fd6b694 Michael Hanselmann
    self.prev_status = self.job.ops[self.curop].status
2173 26d3fd2f Michael Hanselmann
2174 26d3fd2f Michael Hanselmann
  def _NewTimeoutStrategy(self):
2175 26d3fd2f Michael Hanselmann
    job = self.job
2176 26d3fd2f Michael Hanselmann
2177 26d3fd2f Michael Hanselmann
    self.assertEqual(self.retries, 0)
2178 26d3fd2f Michael Hanselmann
2179 26d3fd2f Michael Hanselmann
    if self.prev_tsop == self.curop:
2180 26d3fd2f Michael Hanselmann
      # Still on the same opcode, priority must've been increased
2181 26d3fd2f Michael Hanselmann
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
2182 26d3fd2f Michael Hanselmann
2183 26d3fd2f Michael Hanselmann
    if self.curop == 1:
2184 26d3fd2f Michael Hanselmann
      # Normal retry
2185 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
2186 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts) - 1
2187 26d3fd2f Michael Hanselmann
2188 26d3fd2f Michael Hanselmann
    elif self.curop == 2:
2189 26d3fd2f Michael Hanselmann
      # Let this run into a blocking acquire
2190 26d3fd2f Michael Hanselmann
      timeouts = range(11, 61, 12)
2191 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
2192 26d3fd2f Michael Hanselmann
2193 26d3fd2f Michael Hanselmann
    elif self.curop == 3:
2194 26d3fd2f Michael Hanselmann
      # Wait for priority to increase, but give lock before blocking acquire
2195 26d3fd2f Michael Hanselmann
      timeouts = range(12, 100, 14)
2196 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
2197 26d3fd2f Michael Hanselmann
2198 26d3fd2f Michael Hanselmann
      self.assertFalse(self.done_lock_before_blocking)
2199 26d3fd2f Michael Hanselmann
2200 26d3fd2f Michael Hanselmann
    elif self.curop == 4:
2201 26d3fd2f Michael Hanselmann
      self.assert_(self.done_lock_before_blocking)
2202 26d3fd2f Michael Hanselmann
2203 26d3fd2f Michael Hanselmann
      # Timeouts, but no need to retry
2204 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
2205 26d3fd2f Michael Hanselmann
      self.retries = 0
2206 26d3fd2f Michael Hanselmann
2207 26d3fd2f Michael Hanselmann
    elif self.curop == 5:
2208 26d3fd2f Michael Hanselmann
      # Normal retry
2209 26d3fd2f Michael Hanselmann
      timeouts = range(19, 100, 11)
2210 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
2211 26d3fd2f Michael Hanselmann
2212 26d3fd2f Michael Hanselmann
    else:
2213 26d3fd2f Michael Hanselmann
      timeouts = []
2214 26d3fd2f Michael Hanselmann
      self.retries = 0
2215 26d3fd2f Michael Hanselmann
2216 26d3fd2f Michael Hanselmann
    assert len(job.ops) == 10
2217 26d3fd2f Michael Hanselmann
    assert self.retries <= len(timeouts)
2218 26d3fd2f Michael Hanselmann
2219 26d3fd2f Michael Hanselmann
    ts = _FakeTimeoutStrategy(timeouts)
2220 26d3fd2f Michael Hanselmann
2221 26d3fd2f Michael Hanselmann
    self.timeout_strategy = ts
2222 26d3fd2f Michael Hanselmann
    self.prev_tsop = self.curop
2223 26d3fd2f Michael Hanselmann
    self.prev_prio = job.ops[self.curop].priority
2224 26d3fd2f Michael Hanselmann
2225 26d3fd2f Michael Hanselmann
    return ts
2226 26d3fd2f Michael Hanselmann
2227 26d3fd2f Michael Hanselmann
  def testTimeout(self):
2228 26d3fd2f Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2229 26d3fd2f Michael Hanselmann
           for i in range(10)]
2230 26d3fd2f Michael Hanselmann
2231 26d3fd2f Michael Hanselmann
    # Create job
2232 26d3fd2f Michael Hanselmann
    job_id = 15801
2233 26d3fd2f Michael Hanselmann
    job = self._CreateJob(self.queue, job_id, ops)
2234 26d3fd2f Michael Hanselmann
    self.job = job
2235 26d3fd2f Michael Hanselmann
2236 26d3fd2f Michael Hanselmann
    self.opcounter = itertools.count(0)
2237 26d3fd2f Michael Hanselmann
2238 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
2239 ebb2a2a3 Michael Hanselmann
                                    self._AfterStart)
2240 26d3fd2f Michael Hanselmann
    tsf = self._NewTimeoutStrategy
2241 26d3fd2f Michael Hanselmann
2242 26d3fd2f Michael Hanselmann
    self.assertFalse(self.done_lock_before_blocking)
2243 26d3fd2f Michael Hanselmann
2244 5fd6b694 Michael Hanselmann
    while True:
2245 26d3fd2f Michael Hanselmann
      proc = jqueue._JobProcessor(self.queue, opexec, job,
2246 26d3fd2f Michael Hanselmann
                                  _timeout_strategy_factory=tsf)
2247 26d3fd2f Michael Hanselmann
2248 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
2249 5fd6b694 Michael Hanselmann
2250 5fd6b694 Michael Hanselmann
      if self.curop is not None:
2251 5fd6b694 Michael Hanselmann
        self.prev_status = self.job.ops[self.curop].status
2252 5fd6b694 Michael Hanselmann
2253 5fd6b694 Michael Hanselmann
      self.lock_acq_prio = None
2254 5fd6b694 Michael Hanselmann
2255 26d3fd2f Michael Hanselmann
      result = proc(_nextop_fn=self._NextOpcode)
2256 5fd6b694 Michael Hanselmann
      assert self.curop is not None
2257 5fd6b694 Michael Hanselmann
2258 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
2259 5fd6b694 Michael Hanselmann
        # Got lock and/or job is done, result must've been written
2260 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
2261 5fd6b694 Michael Hanselmann
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2262 5fd6b694 Michael Hanselmann
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
2263 5fd6b694 Michael Hanselmann
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
2264 5fd6b694 Michael Hanselmann
        self.assert_(job.ops[self.curop].exec_timestamp)
2265 5fd6b694 Michael Hanselmann
2266 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
2267 26d3fd2f Michael Hanselmann
        self.assertFalse(job.cur_opctx)
2268 26d3fd2f Michael Hanselmann
        break
2269 26d3fd2f Michael Hanselmann
2270 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
2271 26d3fd2f Michael Hanselmann
2272 5fd6b694 Michael Hanselmann
      if self.curop == 0:
2273 5fd6b694 Michael Hanselmann
        self.assertEqual(job.ops[self.curop].start_timestamp,
2274 5fd6b694 Michael Hanselmann
                         job.start_timestamp)
2275 5fd6b694 Michael Hanselmann
2276 26d3fd2f Michael Hanselmann
      if self.gave_lock:
2277 5fd6b694 Michael Hanselmann
        # Opcode finished, but job not yet done
2278 5fd6b694 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2279 26d3fd2f Michael Hanselmann
      else:
2280 5fd6b694 Michael Hanselmann
        # Did not get locks
2281 26d3fd2f Michael Hanselmann
        self.assert_(job.cur_opctx)
2282 26d3fd2f Michael Hanselmann
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
2283 26d3fd2f Michael Hanselmann
                         self.timeout_strategy.NextAttempt)
2284 5fd6b694 Michael Hanselmann
        self.assertFalse(job.ops[self.curop].exec_timestamp)
2285 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2286 5fd6b694 Michael Hanselmann
2287 5fd6b694 Michael Hanselmann
        # If priority has changed since acquiring locks, the job must've been
2288 5fd6b694 Michael Hanselmann
        # updated
2289 5fd6b694 Michael Hanselmann
        if self.lock_acq_prio != job.ops[self.curop].priority:
2290 5fd6b694 Michael Hanselmann
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2291 5fd6b694 Michael Hanselmann
2292 5fd6b694 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
2293 26d3fd2f Michael Hanselmann
2294 26d3fd2f Michael Hanselmann
      self.assert_(job.start_timestamp)
2295 26d3fd2f Michael Hanselmann
      self.assertFalse(job.end_timestamp)
2296 26d3fd2f Michael Hanselmann
2297 26d3fd2f Michael Hanselmann
    self.assertEqual(self.curop, len(job.ops) - 1)
2298 26d3fd2f Michael Hanselmann
    self.assertEqual(self.job, job)
2299 26d3fd2f Michael Hanselmann
    self.assertEqual(self.opcounter.next(), len(job.ops))
2300 26d3fd2f Michael Hanselmann
    self.assert_(self.done_lock_before_blocking)
2301 26d3fd2f Michael Hanselmann
2302 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2303 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2304 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2305 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
2306 26d3fd2f Michael Hanselmann
                     [[op.input.result for op in job.ops]])
2307 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
2308 26d3fd2f Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
2309 26d3fd2f Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
2310 26d3fd2f Michael Hanselmann
                            for op in job.ops))
2311 26d3fd2f Michael Hanselmann
2312 66bd7445 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
2313 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2314 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
2315 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2316 26d3fd2f Michael Hanselmann
2317 26d3fd2f Michael Hanselmann
2318 4679547e Michael Hanselmann
class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
2319 4679547e Michael Hanselmann
  def setUp(self):
2320 4679547e Michael Hanselmann
    self.queue = _FakeQueueForProc()
2321 4679547e Michael Hanselmann
    self.opexecprio = []
2322 4679547e Michael Hanselmann
2323 4679547e Michael Hanselmann
  def _BeforeStart(self, timeout, priority):
2324 4679547e Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
2325 4679547e Michael Hanselmann
    self.opexecprio.append(priority)
2326 4679547e Michael Hanselmann
2327 4679547e Michael Hanselmann
  def testChangePriorityWhileRunning(self):
2328 4679547e Michael Hanselmann
    # Tests changing the priority on a job while it has finished opcodes
2329 4679547e Michael Hanselmann
    # (successful) and more, unprocessed ones
2330 4679547e Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2331 4679547e Michael Hanselmann
           for i in range(3)]
2332 4679547e Michael Hanselmann
2333 4679547e Michael Hanselmann
    # Create job
2334 4679547e Michael Hanselmann
    job_id = 3499
2335 4679547e Michael Hanselmann
    job = self._CreateJob(self.queue, job_id, ops)
2336 4679547e Michael Hanselmann
2337 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2338 4679547e Michael Hanselmann
2339 4679547e Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
2340 4679547e Michael Hanselmann
2341 4679547e Michael Hanselmann
    # Run first opcode
2342 4679547e Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2343 4679547e Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
2344 4679547e Michael Hanselmann
2345 4679547e Michael Hanselmann
    # Job goes back to queued
2346 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2347 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2348 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2349 4679547e Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
2350 4679547e Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
2351 4679547e Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
2352 4679547e Michael Hanselmann
                      ["Res0", None, None]])
2353 4679547e Michael Hanselmann
2354 4679547e Michael Hanselmann
    self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
2355 4679547e Michael Hanselmann
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2356 4679547e Michael Hanselmann
2357 4679547e Michael Hanselmann
    # Change priority
2358 4679547e Michael Hanselmann
    self.assertEqual(job.ChangePriority(-10),
2359 4679547e Michael Hanselmann
                     (True,
2360 4679547e Michael Hanselmann
                      ("Priorities of pending opcodes for job 3499 have"
2361 4679547e Michael Hanselmann
                       " been changed to -10")))
2362 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -10)
2363 4679547e Michael Hanselmann
2364 4679547e Michael Hanselmann
    # Process second opcode
2365 4679547e Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2366 4679547e Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
2367 4679547e Michael Hanselmann
2368 4679547e Michael Hanselmann
    self.assertEqual(self.opexecprio.pop(0), -10)
2369 4679547e Michael Hanselmann
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2370 4679547e Michael Hanselmann
2371 4679547e Michael Hanselmann
    # Check status
2372 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2373 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), -10)
2374 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2375 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
2376 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2377 4679547e Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
2378 4679547e Michael Hanselmann
                       constants.OP_STATUS_SUCCESS,
2379 4679547e Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
2380 4679547e Michael Hanselmann
                      ["Res0", "Res1", None]])
2381 4679547e Michael Hanselmann
2382 4679547e Michael Hanselmann
    # Change priority once more
2383 4679547e Michael Hanselmann
    self.assertEqual(job.ChangePriority(5),
2384 4679547e Michael Hanselmann
                     (True,
2385 4679547e Michael Hanselmann
                      ("Priorities of pending opcodes for job 3499 have"
2386 4679547e Michael Hanselmann
                       " been changed to 5")))
2387 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), 5)
2388 4679547e Michael Hanselmann
2389 4679547e Michael Hanselmann
    # Process third opcode
2390 4679547e Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2391 4679547e Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
2392 4679547e Michael Hanselmann
2393 4679547e Michael Hanselmann
    self.assertEqual(self.opexecprio.pop(0), 5)
2394 4679547e Michael Hanselmann
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2395 4679547e Michael Hanselmann
2396 4679547e Michael Hanselmann
    # Check status
2397 4679547e Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2398 4679547e Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2399 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2400 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2401 4679547e Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2402 4679547e Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
2403 4679547e Michael Hanselmann
                       constants.OP_STATUS_SUCCESS,
2404 4679547e Michael Hanselmann
                       constants.OP_STATUS_SUCCESS],
2405 4679547e Michael Hanselmann
                      ["Res0", "Res1", "Res2"]])
2406 4679547e Michael Hanselmann
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
2407 4679547e Michael Hanselmann
                     [constants.OP_PRIO_DEFAULT, -10, 5])
2408 4679547e Michael Hanselmann
2409 4679547e Michael Hanselmann
2410 1b5150ab Michael Hanselmann
class _IdOnlyFakeJob:
2411 1b5150ab Michael Hanselmann
  def __init__(self, job_id, priority=NotImplemented):
2412 1b5150ab Michael Hanselmann
    self.id = str(job_id)
2413 1b5150ab Michael Hanselmann
    self._priority = priority
2414 1b5150ab Michael Hanselmann
2415 1b5150ab Michael Hanselmann
  def CalcPriority(self):
2416 1b5150ab Michael Hanselmann
    return self._priority
2417 b95479a5 Michael Hanselmann
2418 1b5150ab Michael Hanselmann
2419 1b5150ab Michael Hanselmann
class TestJobDependencyManager(unittest.TestCase):
2420 b95479a5 Michael Hanselmann
  def setUp(self):
2421 b95479a5 Michael Hanselmann
    self._status = []
2422 b95479a5 Michael Hanselmann
    self._queue = []
2423 b95479a5 Michael Hanselmann
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
2424 b95479a5 Michael Hanselmann
2425 b95479a5 Michael Hanselmann
  def _GetStatus(self, job_id):
2426 b95479a5 Michael Hanselmann
    (exp_job_id, result) = self._status.pop(0)
2427 b95479a5 Michael Hanselmann
    self.assertEqual(exp_job_id, job_id)
2428 b95479a5 Michael Hanselmann
    return result
2429 b95479a5 Michael Hanselmann
2430 b95479a5 Michael Hanselmann
  def _Enqueue(self, jobs):
2431 37d76f1e Michael Hanselmann
    self.assertFalse(self.jdm._lock.is_owned(),
2432 37d76f1e Michael Hanselmann
                     msg=("Must not own manager lock while re-adding jobs"
2433 37d76f1e Michael Hanselmann
                          " (potential deadlock)"))
2434 b95479a5 Michael Hanselmann
    self._queue.append(jobs)
2435 b95479a5 Michael Hanselmann
2436 b95479a5 Michael Hanselmann
  def testNotFinalizedThenCancel(self):
2437 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(17697)
2438 b95479a5 Michael Hanselmann
    job_id = str(28625)
2439 b95479a5 Michael Hanselmann
2440 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2441 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2442 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2443 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2444 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2445 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2446 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2447 b95479a5 Michael Hanselmann
      job_id: set([job]),
2448 b95479a5 Michael Hanselmann
      })
2449 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2450 fcb21ad7 Michael Hanselmann
      ("job/28625", None, None, [("job", [job.id])])
2451 fcb21ad7 Michael Hanselmann
      ])
2452 b95479a5 Michael Hanselmann
2453 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2454 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2455 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CANCEL)
2456 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2457 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2458 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2459 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2460 b95479a5 Michael Hanselmann
2461 942e2262 Michael Hanselmann
  def testNotFinalizedThenQueued(self):
2462 942e2262 Michael Hanselmann
    # This can happen on a queue shutdown
2463 942e2262 Michael Hanselmann
    job = _IdOnlyFakeJob(1320)
2464 942e2262 Michael Hanselmann
    job_id = str(22971)
2465 942e2262 Michael Hanselmann
2466 942e2262 Michael Hanselmann
    for i in range(5):
2467 942e2262 Michael Hanselmann
      if i > 2:
2468 942e2262 Michael Hanselmann
        self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2469 942e2262 Michael Hanselmann
      else:
2470 942e2262 Michael Hanselmann
        self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2471 942e2262 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2472 942e2262 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
2473 942e2262 Michael Hanselmann
      self.assertFalse(self._status)
2474 942e2262 Michael Hanselmann
      self.assertFalse(self._queue)
2475 942e2262 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
2476 942e2262 Michael Hanselmann
      self.assertEqual(self.jdm._waiters, {
2477 942e2262 Michael Hanselmann
        job_id: set([job]),
2478 942e2262 Michael Hanselmann
        })
2479 942e2262 Michael Hanselmann
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2480 942e2262 Michael Hanselmann
        ("job/22971", None, None, [("job", [job.id])])
2481 942e2262 Michael Hanselmann
        ])
2482 942e2262 Michael Hanselmann
2483 b95479a5 Michael Hanselmann
  def testRequireCancel(self):
2484 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(5278)
2485 b95479a5 Michael Hanselmann
    job_id = str(9610)
2486 b95479a5 Michael Hanselmann
    dep_status = [constants.JOB_STATUS_CANCELED]
2487 b95479a5 Michael Hanselmann
2488 47099cd1 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
2489 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2490 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2491 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2492 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2493 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2494 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2495 b95479a5 Michael Hanselmann
      job_id: set([job]),
2496 b95479a5 Michael Hanselmann
      })
2497 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2498 fcb21ad7 Michael Hanselmann
      ("job/9610", None, None, [("job", [job.id])])
2499 fcb21ad7 Michael Hanselmann
      ])
2500 b95479a5 Michael Hanselmann
2501 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2502 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2503 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2504 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2505 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2506 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2507 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2508 b95479a5 Michael Hanselmann
2509 b95479a5 Michael Hanselmann
  def testRequireError(self):
2510 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(21459)
2511 b95479a5 Michael Hanselmann
    job_id = str(25519)
2512 b95479a5 Michael Hanselmann
    dep_status = [constants.JOB_STATUS_ERROR]
2513 b95479a5 Michael Hanselmann
2514 47099cd1 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
2515 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2516 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2517 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2518 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2519 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2520 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2521 b95479a5 Michael Hanselmann
      job_id: set([job]),
2522 b95479a5 Michael Hanselmann
      })
2523 b95479a5 Michael Hanselmann
2524 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2525 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2526 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2527 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2528 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2529 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2530 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2531 b95479a5 Michael Hanselmann
2532 b95479a5 Michael Hanselmann
  def testRequireMultiple(self):
2533 b95479a5 Michael Hanselmann
    dep_status = list(constants.JOBS_FINALIZED)
2534 b95479a5 Michael Hanselmann
2535 b95479a5 Michael Hanselmann
    for end_status in dep_status:
2536 1b5150ab Michael Hanselmann
      job = _IdOnlyFakeJob(21343)
2537 b95479a5 Michael Hanselmann
      job_id = str(14609)
2538 b95479a5 Michael Hanselmann
2539 47099cd1 Michael Hanselmann
      self._status.append((job_id, constants.JOB_STATUS_WAITING))
2540 b95479a5 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2541 b95479a5 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
2542 b95479a5 Michael Hanselmann
      self.assertFalse(self._status)
2543 b95479a5 Michael Hanselmann
      self.assertFalse(self._queue)
2544 b95479a5 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
2545 b95479a5 Michael Hanselmann
      self.assertEqual(self.jdm._waiters, {
2546 b95479a5 Michael Hanselmann
        job_id: set([job]),
2547 b95479a5 Michael Hanselmann
        })
2548 fcb21ad7 Michael Hanselmann
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2549 fcb21ad7 Michael Hanselmann
        ("job/14609", None, None, [("job", [job.id])])
2550 fcb21ad7 Michael Hanselmann
        ])
2551 b95479a5 Michael Hanselmann
2552 b95479a5 Michael Hanselmann
      self._status.append((job_id, end_status))
2553 b95479a5 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2554 b95479a5 Michael Hanselmann
      self.assertEqual(result, self.jdm.CONTINUE)
2555 b95479a5 Michael Hanselmann
      self.assertFalse(self._status)
2556 b95479a5 Michael Hanselmann
      self.assertFalse(self._queue)
2557 b95479a5 Michael Hanselmann
      self.assertFalse(self.jdm.JobWaiting(job))
2558 fcb21ad7 Michael Hanselmann
      self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2559 b95479a5 Michael Hanselmann
2560 b95479a5 Michael Hanselmann
  def testNotify(self):
2561 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(8227)
2562 b95479a5 Michael Hanselmann
    job_id = str(4113)
2563 b95479a5 Michael Hanselmann
2564 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2565 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2566 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2567 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2568 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2569 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2570 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2571 b95479a5 Michael Hanselmann
      job_id: set([job]),
2572 b95479a5 Michael Hanselmann
      })
2573 b95479a5 Michael Hanselmann
2574 b95479a5 Michael Hanselmann
    self.jdm.NotifyWaiters(job_id)
2575 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2576 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm._waiters)
2577 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2578 b95479a5 Michael Hanselmann
    self.assertEqual(self._queue, [set([job])])
2579 b95479a5 Michael Hanselmann
2580 b95479a5 Michael Hanselmann
  def testWrongStatus(self):
2581 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(10102)
2582 b95479a5 Michael Hanselmann
    job_id = str(1271)
2583 b95479a5 Michael Hanselmann
2584 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2585 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2586 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2587 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2588 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2589 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2590 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2591 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2592 b95479a5 Michael Hanselmann
      job_id: set([job]),
2593 b95479a5 Michael Hanselmann
      })
2594 b95479a5 Michael Hanselmann
2595 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2596 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2597 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2598 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WRONGSTATUS)
2599 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2600 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2601 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2602 b95479a5 Michael Hanselmann
2603 b95479a5 Michael Hanselmann
  def testCorrectStatus(self):
2604 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(24273)
2605 b95479a5 Michael Hanselmann
    job_id = str(23885)
2606 b95479a5 Michael Hanselmann
2607 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2608 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2609 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2610 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2611 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2612 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2613 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2614 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2615 b95479a5 Michael Hanselmann
      job_id: set([job]),
2616 b95479a5 Michael Hanselmann
      })
2617 b95479a5 Michael Hanselmann
2618 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2619 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2620 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2621 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2622 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2623 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2624 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2625 b95479a5 Michael Hanselmann
2626 b95479a5 Michael Hanselmann
  def testFinalizedRightAway(self):
2627 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(224)
2628 b95479a5 Michael Hanselmann
    job_id = str(3081)
2629 b95479a5 Michael Hanselmann
2630 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2631 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2632 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2633 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2634 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2635 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2636 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2637 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2638 b95479a5 Michael Hanselmann
      job_id: set(),
2639 b95479a5 Michael Hanselmann
      })
2640 b95479a5 Michael Hanselmann
2641 b95479a5 Michael Hanselmann
    # Force cleanup
2642 b95479a5 Michael Hanselmann
    self.jdm.NotifyWaiters("0")
2643 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm._waiters)
2644 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2645 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2646 b95479a5 Michael Hanselmann
2647 fcb21ad7 Michael Hanselmann
  def testMultipleWaiting(self):
2648 fcb21ad7 Michael Hanselmann
    # Use a deterministic random generator
2649 fcb21ad7 Michael Hanselmann
    rnd = random.Random(21402)
2650 fcb21ad7 Michael Hanselmann
2651 fcb21ad7 Michael Hanselmann
    job_ids = map(str, rnd.sample(range(1, 10000), 150))
2652 fcb21ad7 Michael Hanselmann
2653 fcb21ad7 Michael Hanselmann
    waiters = dict((job_ids.pop(),
2654 1b5150ab Michael Hanselmann
                    set(map(_IdOnlyFakeJob,
2655 fcb21ad7 Michael Hanselmann
                            [job_ids.pop()
2656 fcb21ad7 Michael Hanselmann
                             for _ in range(rnd.randint(1, 20))])))
2657 fcb21ad7 Michael Hanselmann
                   for _ in range(10))
2658 fcb21ad7 Michael Hanselmann
2659 fcb21ad7 Michael Hanselmann
    # Ensure there are no duplicate job IDs
2660 fcb21ad7 Michael Hanselmann
    assert not utils.FindDuplicates(waiters.keys() +
2661 fcb21ad7 Michael Hanselmann
                                    [job.id
2662 fcb21ad7 Michael Hanselmann
                                     for jobs in waiters.values()
2663 fcb21ad7 Michael Hanselmann
                                     for job in jobs])
2664 fcb21ad7 Michael Hanselmann
2665 fcb21ad7 Michael Hanselmann
    # Register all jobs as waiters
2666 fcb21ad7 Michael Hanselmann
    for job_id, job in [(job_id, job)
2667 fcb21ad7 Michael Hanselmann
                        for (job_id, jobs) in waiters.items()
2668 fcb21ad7 Michael Hanselmann
                        for job in jobs]:
2669 fcb21ad7 Michael Hanselmann
      self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2670 fcb21ad7 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id,
2671 fcb21ad7 Michael Hanselmann
                                              [constants.JOB_STATUS_SUCCESS])
2672 fcb21ad7 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
2673 fcb21ad7 Michael Hanselmann
      self.assertFalse(self._status)
2674 fcb21ad7 Michael Hanselmann
      self.assertFalse(self._queue)
2675 fcb21ad7 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
2676 fcb21ad7 Michael Hanselmann
2677 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, waiters)
2678 fcb21ad7 Michael Hanselmann
2679 fcb21ad7 Michael Hanselmann
    def _MakeSet((name, mode, owner_names, pending)):
2680 fcb21ad7 Michael Hanselmann
      return (name, mode, owner_names,
2681 fcb21ad7 Michael Hanselmann
              [(pendmode, set(pend)) for (pendmode, pend) in pending])
2682 fcb21ad7 Michael Hanselmann
2683 fcb21ad7 Michael Hanselmann
    def _CheckLockInfo():
2684 fcb21ad7 Michael Hanselmann
      info = self.jdm.GetLockInfo([query.LQ_PENDING])
2685 fcb21ad7 Michael Hanselmann
      self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2686 fcb21ad7 Michael Hanselmann
        ("job/%s" % job_id, None, None,
2687 fcb21ad7 Michael Hanselmann
         [("job", set([job.id for job in jobs]))])
2688 fcb21ad7 Michael Hanselmann
        for job_id, jobs in waiters.items()
2689 fcb21ad7 Michael Hanselmann
        if jobs
2690 fcb21ad7 Michael Hanselmann
        ]))
2691 fcb21ad7 Michael Hanselmann
2692 fcb21ad7 Michael Hanselmann
    _CheckLockInfo()
2693 fcb21ad7 Michael Hanselmann
2694 fcb21ad7 Michael Hanselmann
    # Notify in random order
2695 fcb21ad7 Michael Hanselmann
    for job_id in rnd.sample(waiters, len(waiters)):
2696 fcb21ad7 Michael Hanselmann
      # Remove from pending waiter list
2697 fcb21ad7 Michael Hanselmann
      jobs = waiters.pop(job_id)
2698 fcb21ad7 Michael Hanselmann
      for job in jobs:
2699 fcb21ad7 Michael Hanselmann
        self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2700 fcb21ad7 Michael Hanselmann
        (result, _) = self.jdm.CheckAndRegister(job, job_id,
2701 fcb21ad7 Michael Hanselmann
                                                [constants.JOB_STATUS_SUCCESS])
2702 fcb21ad7 Michael Hanselmann
        self.assertEqual(result, self.jdm.CONTINUE)
2703 fcb21ad7 Michael Hanselmann
        self.assertFalse(self._status)
2704 fcb21ad7 Michael Hanselmann
        self.assertFalse(self._queue)
2705 fcb21ad7 Michael Hanselmann
        self.assertFalse(self.jdm.JobWaiting(job))
2706 fcb21ad7 Michael Hanselmann
2707 fcb21ad7 Michael Hanselmann
      _CheckLockInfo()
2708 fcb21ad7 Michael Hanselmann
2709 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2710 fcb21ad7 Michael Hanselmann
2711 fcb21ad7 Michael Hanselmann
    assert not waiters
2712 fcb21ad7 Michael Hanselmann
2713 b95479a5 Michael Hanselmann
  def testSelfDependency(self):
2714 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(18937)
2715 b95479a5 Michael Hanselmann
2716 b95479a5 Michael Hanselmann
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2717 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2718 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.ERROR)
2719 b95479a5 Michael Hanselmann
2720 b95479a5 Michael Hanselmann
  def testJobDisappears(self):
2721 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(30540)
2722 b95479a5 Michael Hanselmann
    job_id = str(23769)
2723 b95479a5 Michael Hanselmann
2724 b95479a5 Michael Hanselmann
    def _FakeStatus(_):
2725 b95479a5 Michael Hanselmann
      raise errors.JobLost("#msg#")
2726 b95479a5 Michael Hanselmann
2727 b95479a5 Michael Hanselmann
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2728 b95479a5 Michael Hanselmann
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2729 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.ERROR)
2730 b95479a5 Michael Hanselmann
    self.assertFalse(jdm.JobWaiting(job))
2731 fcb21ad7 Michael Hanselmann
    self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2732 b95479a5 Michael Hanselmann
2733 b95479a5 Michael Hanselmann
2734 989a8bee Michael Hanselmann
if __name__ == "__main__":
2735 989a8bee Michael Hanselmann
  testutils.GanetiTestProgram()