Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ 75d81fc8

History | View | Annotate | Download (71.2 kB)

1 989a8bee Michael Hanselmann
#!/usr/bin/python
2 989a8bee Michael Hanselmann
#
3 989a8bee Michael Hanselmann
4 1e6d5750 Iustin Pop
# Copyright (C) 2010, 2011 Google Inc.
5 989a8bee Michael Hanselmann
#
6 989a8bee Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 989a8bee Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 989a8bee Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 989a8bee Michael Hanselmann
# (at your option) any later version.
10 989a8bee Michael Hanselmann
#
11 989a8bee Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 989a8bee Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 989a8bee Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 989a8bee Michael Hanselmann
# General Public License for more details.
15 989a8bee Michael Hanselmann
#
16 989a8bee Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 989a8bee Michael Hanselmann
# along with this program; if not, write to the Free Software
18 989a8bee Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 989a8bee Michael Hanselmann
# 02110-1301, USA.
20 989a8bee Michael Hanselmann
21 989a8bee Michael Hanselmann
22 989a8bee Michael Hanselmann
"""Script for testing ganeti.jqueue"""
23 989a8bee Michael Hanselmann
24 989a8bee Michael Hanselmann
import os
25 989a8bee Michael Hanselmann
import sys
26 989a8bee Michael Hanselmann
import unittest
27 989a8bee Michael Hanselmann
import tempfile
28 989a8bee Michael Hanselmann
import shutil
29 989a8bee Michael Hanselmann
import errno
30 26d3fd2f Michael Hanselmann
import itertools
31 989a8bee Michael Hanselmann
32 989a8bee Michael Hanselmann
from ganeti import constants
33 989a8bee Michael Hanselmann
from ganeti import utils
34 989a8bee Michael Hanselmann
from ganeti import errors
35 989a8bee Michael Hanselmann
from ganeti import jqueue
36 8f5c488d Michael Hanselmann
from ganeti import opcodes
37 8f5c488d Michael Hanselmann
from ganeti import compat
38 26d3fd2f Michael Hanselmann
from ganeti import mcpu
39 989a8bee Michael Hanselmann
40 989a8bee Michael Hanselmann
import testutils
41 989a8bee Michael Hanselmann
42 989a8bee Michael Hanselmann
43 989a8bee Michael Hanselmann
class _FakeJob:
44 989a8bee Michael Hanselmann
  def __init__(self, job_id, status):
45 989a8bee Michael Hanselmann
    self.id = job_id
46 c0f6d0d8 Michael Hanselmann
    self.writable = False
47 989a8bee Michael Hanselmann
    self._status = status
48 989a8bee Michael Hanselmann
    self._log = []
49 989a8bee Michael Hanselmann
50 989a8bee Michael Hanselmann
  def SetStatus(self, status):
51 989a8bee Michael Hanselmann
    self._status = status
52 989a8bee Michael Hanselmann
53 989a8bee Michael Hanselmann
  def AddLogEntry(self, msg):
54 989a8bee Michael Hanselmann
    self._log.append((len(self._log), msg))
55 989a8bee Michael Hanselmann
56 989a8bee Michael Hanselmann
  def CalcStatus(self):
57 989a8bee Michael Hanselmann
    return self._status
58 989a8bee Michael Hanselmann
59 989a8bee Michael Hanselmann
  def GetInfo(self, fields):
60 989a8bee Michael Hanselmann
    result = []
61 989a8bee Michael Hanselmann
62 989a8bee Michael Hanselmann
    for name in fields:
63 989a8bee Michael Hanselmann
      if name == "status":
64 989a8bee Michael Hanselmann
        result.append(self._status)
65 989a8bee Michael Hanselmann
      else:
66 989a8bee Michael Hanselmann
        raise Exception("Unknown field")
67 989a8bee Michael Hanselmann
68 989a8bee Michael Hanselmann
    return result
69 989a8bee Michael Hanselmann
70 989a8bee Michael Hanselmann
  def GetLogEntries(self, newer_than):
71 989a8bee Michael Hanselmann
    assert newer_than is None or newer_than >= 0
72 989a8bee Michael Hanselmann
73 989a8bee Michael Hanselmann
    if newer_than is None:
74 989a8bee Michael Hanselmann
      return self._log
75 989a8bee Michael Hanselmann
76 989a8bee Michael Hanselmann
    return self._log[newer_than:]
77 989a8bee Michael Hanselmann
78 989a8bee Michael Hanselmann
79 989a8bee Michael Hanselmann
class TestJobChangesChecker(unittest.TestCase):
80 989a8bee Michael Hanselmann
  def testStatus(self):
81 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
82 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
83 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
84 989a8bee Michael Hanselmann
85 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
86 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
87 989a8bee Michael Hanselmann
88 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
89 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
90 989a8bee Michael Hanselmann
91 989a8bee Michael Hanselmann
    # job.id is used by checker
92 989a8bee Michael Hanselmann
    self.assertEqual(job.id, 9094)
93 989a8bee Michael Hanselmann
94 989a8bee Michael Hanselmann
  def testStatusWithPrev(self):
95 989a8bee Michael Hanselmann
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
96 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"],
97 989a8bee Michael Hanselmann
                                        [constants.JOB_STATUS_QUEUED], None)
98 989a8bee Michael Hanselmann
    self.assert_(checker(job) is None)
99 989a8bee Michael Hanselmann
100 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
101 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
102 989a8bee Michael Hanselmann
103 989a8bee Michael Hanselmann
  def testFinalStatus(self):
104 989a8bee Michael Hanselmann
    for status in constants.JOBS_FINALIZED:
105 989a8bee Michael Hanselmann
      job = _FakeJob(2178711, status)
106 989a8bee Michael Hanselmann
      checker = jqueue._JobChangesChecker(["status"], [status], None)
107 989a8bee Michael Hanselmann
      # There won't be any changes in this status, hence it should signal
108 989a8bee Michael Hanselmann
      # a change immediately
109 989a8bee Michael Hanselmann
      self.assertEqual(checker(job), ([status], []))
110 989a8bee Michael Hanselmann
111 989a8bee Michael Hanselmann
  def testLog(self):
112 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
113 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
114 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
115 989a8bee Michael Hanselmann
116 989a8bee Michael Hanselmann
    job.AddLogEntry("Hello World")
117 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker(job)
118 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
119 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"]])
120 989a8bee Michael Hanselmann
121 989a8bee Michael Hanselmann
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
122 989a8bee Michael Hanselmann
    self.assert_(checker2(job) is None)
123 989a8bee Michael Hanselmann
124 989a8bee Michael Hanselmann
    job.AddLogEntry("Foo Bar")
125 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_ERROR)
126 989a8bee Michael Hanselmann
127 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker2(job)
128 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
129 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
130 989a8bee Michael Hanselmann
131 989a8bee Michael Hanselmann
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
132 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker3(job)
133 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
134 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
135 989a8bee Michael Hanselmann
136 989a8bee Michael Hanselmann
137 989a8bee Michael Hanselmann
class TestJobChangesWaiter(unittest.TestCase):
138 989a8bee Michael Hanselmann
  def setUp(self):
139 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
140 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
141 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
142 989a8bee Michael Hanselmann
143 989a8bee Michael Hanselmann
  def tearDown(self):
144 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
145 989a8bee Michael Hanselmann
146 989a8bee Michael Hanselmann
  def _EnsureNotifierClosed(self, notifier):
147 989a8bee Michael Hanselmann
    try:
148 989a8bee Michael Hanselmann
      os.fstat(notifier._fd)
149 989a8bee Michael Hanselmann
    except EnvironmentError, err:
150 989a8bee Michael Hanselmann
      self.assertEqual(err.errno, errno.EBADF)
151 989a8bee Michael Hanselmann
    else:
152 989a8bee Michael Hanselmann
      self.fail("File descriptor wasn't closed")
153 989a8bee Michael Hanselmann
154 989a8bee Michael Hanselmann
  def testClose(self):
155 989a8bee Michael Hanselmann
    for wait in [False, True]:
156 989a8bee Michael Hanselmann
      waiter = jqueue._JobFileChangesWaiter(self.filename)
157 989a8bee Michael Hanselmann
      try:
158 989a8bee Michael Hanselmann
        if wait:
159 989a8bee Michael Hanselmann
          waiter.Wait(0.001)
160 989a8bee Michael Hanselmann
      finally:
161 989a8bee Michael Hanselmann
        waiter.Close()
162 989a8bee Michael Hanselmann
163 989a8bee Michael Hanselmann
      # Ensure file descriptor was closed
164 989a8bee Michael Hanselmann
      self._EnsureNotifierClosed(waiter._notifier)
165 989a8bee Michael Hanselmann
166 989a8bee Michael Hanselmann
  def testChangingFile(self):
167 989a8bee Michael Hanselmann
    waiter = jqueue._JobFileChangesWaiter(self.filename)
168 989a8bee Michael Hanselmann
    try:
169 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
170 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
171 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
172 989a8bee Michael Hanselmann
    finally:
173 989a8bee Michael Hanselmann
      waiter.Close()
174 989a8bee Michael Hanselmann
175 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._notifier)
176 989a8bee Michael Hanselmann
177 989a8bee Michael Hanselmann
  def testChangingFile2(self):
178 989a8bee Michael Hanselmann
    waiter = jqueue._JobChangesWaiter(self.filename)
179 989a8bee Michael Hanselmann
    try:
180 989a8bee Michael Hanselmann
      self.assertFalse(waiter._filewaiter)
181 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(0.1))
182 989a8bee Michael Hanselmann
      self.assert_(waiter._filewaiter)
183 989a8bee Michael Hanselmann
184 989a8bee Michael Hanselmann
      # File waiter is now used, but there have been no changes
185 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
186 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
187 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
188 989a8bee Michael Hanselmann
    finally:
189 989a8bee Michael Hanselmann
      waiter.Close()
190 989a8bee Michael Hanselmann
191 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
192 989a8bee Michael Hanselmann
193 989a8bee Michael Hanselmann
194 989a8bee Michael Hanselmann
class TestWaitForJobChangesHelper(unittest.TestCase):
195 989a8bee Michael Hanselmann
  def setUp(self):
196 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
197 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
198 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
199 989a8bee Michael Hanselmann
200 989a8bee Michael Hanselmann
  def tearDown(self):
201 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
202 989a8bee Michael Hanselmann
203 989a8bee Michael Hanselmann
  def _LoadWaitingJob(self):
204 989a8bee Michael Hanselmann
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
205 989a8bee Michael Hanselmann
206 989a8bee Michael Hanselmann
  def _LoadLostJob(self):
207 989a8bee Michael Hanselmann
    return None
208 989a8bee Michael Hanselmann
209 989a8bee Michael Hanselmann
  def testNoChanges(self):
210 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
211 989a8bee Michael Hanselmann
212 989a8bee Michael Hanselmann
    # No change
213 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
214 989a8bee Michael Hanselmann
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
215 989a8bee Michael Hanselmann
                     constants.JOB_NOTCHANGED)
216 989a8bee Michael Hanselmann
217 989a8bee Michael Hanselmann
    # No previous information
218 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
219 989a8bee Michael Hanselmann
                          ["status"], None, None, 1.0),
220 989a8bee Michael Hanselmann
                     ([constants.JOB_STATUS_WAITLOCK], []))
221 989a8bee Michael Hanselmann
222 989a8bee Michael Hanselmann
  def testLostJob(self):
223 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
224 989a8bee Michael Hanselmann
    self.assert_(wfjc(self.filename, self._LoadLostJob,
225 989a8bee Michael Hanselmann
                      ["status"], None, None, 1.0) is None)
226 989a8bee Michael Hanselmann
227 989a8bee Michael Hanselmann
228 6760e4ed Michael Hanselmann
class TestEncodeOpError(unittest.TestCase):
229 6760e4ed Michael Hanselmann
  def test(self):
230 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
231 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
232 6760e4ed Michael Hanselmann
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
233 6760e4ed Michael Hanselmann
234 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
235 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
236 6760e4ed Michael Hanselmann
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
237 6760e4ed Michael Hanselmann
238 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
239 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
240 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
241 6760e4ed Michael Hanselmann
242 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError("Hello World")
243 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
244 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
245 6760e4ed Michael Hanselmann
246 6760e4ed Michael Hanselmann
247 8f5c488d Michael Hanselmann
class TestQueuedOpCode(unittest.TestCase):
248 8f5c488d Michael Hanselmann
  def testDefaults(self):
249 8f5c488d Michael Hanselmann
    def _Check(op):
250 8f5c488d Michael Hanselmann
      self.assertFalse(hasattr(op.input, "dry_run"))
251 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
252 8f5c488d Michael Hanselmann
      self.assertFalse(op.log)
253 8f5c488d Michael Hanselmann
      self.assert_(op.start_timestamp is None)
254 8f5c488d Michael Hanselmann
      self.assert_(op.exec_timestamp is None)
255 8f5c488d Michael Hanselmann
      self.assert_(op.end_timestamp is None)
256 8f5c488d Michael Hanselmann
      self.assert_(op.result is None)
257 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
258 8f5c488d Michael Hanselmann
259 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
260 8f5c488d Michael Hanselmann
    _Check(op1)
261 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
262 8f5c488d Michael Hanselmann
    _Check(op2)
263 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
264 8f5c488d Michael Hanselmann
265 8f5c488d Michael Hanselmann
  def testPriority(self):
266 8f5c488d Michael Hanselmann
    def _Check(op):
267 8f5c488d Michael Hanselmann
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
268 8f5c488d Michael Hanselmann
             "Default priority equals high priority; test can't work"
269 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
270 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
271 8f5c488d Michael Hanselmann
272 c6afb1ca Iustin Pop
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
273 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(inpop)
274 8f5c488d Michael Hanselmann
    _Check(op1)
275 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
276 8f5c488d Michael Hanselmann
    _Check(op2)
277 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
278 8f5c488d Michael Hanselmann
279 8f5c488d Michael Hanselmann
280 8f5c488d Michael Hanselmann
class TestQueuedJob(unittest.TestCase):
281 5f6b0b71 Michael Hanselmann
  def test(self):
282 5f6b0b71 Michael Hanselmann
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
283 c0f6d0d8 Michael Hanselmann
                      None, 1, [], False)
284 5f6b0b71 Michael Hanselmann
285 8f5c488d Michael Hanselmann
  def testDefaults(self):
286 8f5c488d Michael Hanselmann
    job_id = 4260
287 8f5c488d Michael Hanselmann
    ops = [
288 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(),
289 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
290 8f5c488d Michael Hanselmann
      ]
291 8f5c488d Michael Hanselmann
292 8f5c488d Michael Hanselmann
    def _Check(job):
293 c0f6d0d8 Michael Hanselmann
      self.assertTrue(job.writable)
294 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
295 8f5c488d Michael Hanselmann
      self.assertEqual(job.log_serial, 0)
296 8f5c488d Michael Hanselmann
      self.assert_(job.received_timestamp)
297 8f5c488d Michael Hanselmann
      self.assert_(job.start_timestamp is None)
298 8f5c488d Michael Hanselmann
      self.assert_(job.end_timestamp is None)
299 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
300 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
301 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
302 8f5c488d Michael Hanselmann
      self.assertEqual(len(job.ops), len(ops))
303 8f5c488d Michael Hanselmann
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
304 8f5c488d Michael Hanselmann
                              for (inp, op) in zip(ops, job.ops)))
305 5f6b0b71 Michael Hanselmann
      self.assertRaises(errors.OpExecError, job.GetInfo,
306 5f6b0b71 Michael Hanselmann
                        ["unknown-field"])
307 5f6b0b71 Michael Hanselmann
      self.assertEqual(job.GetInfo(["summary"]),
308 5f6b0b71 Michael Hanselmann
                       [[op.input.Summary() for op in job.ops]])
309 8f5c488d Michael Hanselmann
310 c0f6d0d8 Michael Hanselmann
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
311 8f5c488d Michael Hanselmann
    _Check(job1)
312 c0f6d0d8 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
313 8f5c488d Michael Hanselmann
    _Check(job2)
314 8f5c488d Michael Hanselmann
    self.assertEqual(job1.Serialize(), job2.Serialize())
315 8f5c488d Michael Hanselmann
316 c0f6d0d8 Michael Hanselmann
  def testWritable(self):
317 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
318 c0f6d0d8 Michael Hanselmann
    self.assertFalse(job.writable)
319 c0f6d0d8 Michael Hanselmann
320 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
321 c0f6d0d8 Michael Hanselmann
    self.assertTrue(job.writable)
322 c0f6d0d8 Michael Hanselmann
323 8f5c488d Michael Hanselmann
  def testPriority(self):
324 8f5c488d Michael Hanselmann
    job_id = 4283
325 8f5c488d Michael Hanselmann
    ops = [
326 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
327 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
328 8f5c488d Michael Hanselmann
      ]
329 8f5c488d Michael Hanselmann
330 8f5c488d Michael Hanselmann
    def _Check(job):
331 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
332 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
333 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
334 8f5c488d Michael Hanselmann
335 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, job_id, ops, True)
336 8f5c488d Michael Hanselmann
    _Check(job)
337 8f5c488d Michael Hanselmann
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
338 8f5c488d Michael Hanselmann
                            for op in job.ops))
339 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
340 8f5c488d Michael Hanselmann
341 8f5c488d Michael Hanselmann
    # Increase first
342 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 1
343 8f5c488d Michael Hanselmann
    _Check(job)
344 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
345 8f5c488d Michael Hanselmann
346 8f5c488d Michael Hanselmann
    # Mark opcode as finished
347 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
348 8f5c488d Michael Hanselmann
    _Check(job)
349 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
350 8f5c488d Michael Hanselmann
351 8f5c488d Michael Hanselmann
    # Increase second
352 8f5c488d Michael Hanselmann
    job.ops[1].priority -= 10
353 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
354 8f5c488d Michael Hanselmann
355 8f5c488d Michael Hanselmann
    # Test increasing first
356 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
357 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 19
358 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
359 8f5c488d Michael Hanselmann
360 db5bce34 Michael Hanselmann
  def testCalcStatus(self):
361 db5bce34 Michael Hanselmann
    def _Queued(ops):
362 db5bce34 Michael Hanselmann
      # The default status is "queued"
363 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
364 db5bce34 Michael Hanselmann
                              for op in ops))
365 db5bce34 Michael Hanselmann
366 db5bce34 Michael Hanselmann
    def _Waitlock1(ops):
367 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_WAITLOCK
368 db5bce34 Michael Hanselmann
369 db5bce34 Michael Hanselmann
    def _Waitlock2(ops):
370 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
371 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
372 db5bce34 Michael Hanselmann
      ops[2].status = constants.OP_STATUS_WAITLOCK
373 db5bce34 Michael Hanselmann
374 db5bce34 Michael Hanselmann
    def _Running(ops):
375 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
376 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_RUNNING
377 db5bce34 Michael Hanselmann
      for op in ops[2:]:
378 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_QUEUED
379 db5bce34 Michael Hanselmann
380 db5bce34 Michael Hanselmann
    def _Canceling1(ops):
381 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
382 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
383 db5bce34 Michael Hanselmann
      for op in ops[2:]:
384 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
385 db5bce34 Michael Hanselmann
386 db5bce34 Michael Hanselmann
    def _Canceling2(ops):
387 db5bce34 Michael Hanselmann
      for op in ops:
388 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
389 db5bce34 Michael Hanselmann
390 db5bce34 Michael Hanselmann
    def _Canceled(ops):
391 db5bce34 Michael Hanselmann
      for op in ops:
392 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELED
393 db5bce34 Michael Hanselmann
394 db5bce34 Michael Hanselmann
    def _Error1(ops):
395 db5bce34 Michael Hanselmann
      for idx, op in enumerate(ops):
396 db5bce34 Michael Hanselmann
        if idx > 3:
397 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_ERROR
398 db5bce34 Michael Hanselmann
        else:
399 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_SUCCESS
400 db5bce34 Michael Hanselmann
401 db5bce34 Michael Hanselmann
    def _Error2(ops):
402 db5bce34 Michael Hanselmann
      for op in ops:
403 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
404 db5bce34 Michael Hanselmann
405 db5bce34 Michael Hanselmann
    def _Success(ops):
406 db5bce34 Michael Hanselmann
      for op in ops:
407 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
408 db5bce34 Michael Hanselmann
409 db5bce34 Michael Hanselmann
    tests = {
410 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_QUEUED: [_Queued],
411 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
412 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_RUNNING: [_Running],
413 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
414 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELED: [_Canceled],
415 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
416 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_SUCCESS: [_Success],
417 db5bce34 Michael Hanselmann
      }
418 db5bce34 Michael Hanselmann
419 db5bce34 Michael Hanselmann
    def _NewJob():
420 db5bce34 Michael Hanselmann
      job = jqueue._QueuedJob(None, 1,
421 c0f6d0d8 Michael Hanselmann
                              [opcodes.OpTestDelay() for _ in range(10)],
422 c0f6d0d8 Michael Hanselmann
                              True)
423 db5bce34 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
424 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
425 db5bce34 Michael Hanselmann
                              for op in job.ops))
426 db5bce34 Michael Hanselmann
      return job
427 db5bce34 Michael Hanselmann
428 db5bce34 Michael Hanselmann
    for status in constants.JOB_STATUS_ALL:
429 db5bce34 Michael Hanselmann
      sttests = tests[status]
430 db5bce34 Michael Hanselmann
      assert sttests
431 db5bce34 Michael Hanselmann
      for fn in sttests:
432 db5bce34 Michael Hanselmann
        job = _NewJob()
433 db5bce34 Michael Hanselmann
        fn(job.ops)
434 db5bce34 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), status)
435 db5bce34 Michael Hanselmann
436 8f5c488d Michael Hanselmann
437 b95479a5 Michael Hanselmann
class _FakeDependencyManager:
438 be760ba8 Michael Hanselmann
  def __init__(self):
439 b95479a5 Michael Hanselmann
    self._checks = []
440 b95479a5 Michael Hanselmann
    self._notifications = []
441 b95479a5 Michael Hanselmann
    self._waiting = set()
442 b95479a5 Michael Hanselmann
443 b95479a5 Michael Hanselmann
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
444 b95479a5 Michael Hanselmann
    self._checks.append((job, dep_job_id, dep_status, result))
445 b95479a5 Michael Hanselmann
446 b95479a5 Michael Hanselmann
  def CountPendingResults(self):
447 b95479a5 Michael Hanselmann
    return len(self._checks)
448 b95479a5 Michael Hanselmann
449 b95479a5 Michael Hanselmann
  def CountWaitingJobs(self):
450 b95479a5 Michael Hanselmann
    return len(self._waiting)
451 b95479a5 Michael Hanselmann
452 b95479a5 Michael Hanselmann
  def GetNextNotification(self):
453 b95479a5 Michael Hanselmann
    return self._notifications.pop(0)
454 b95479a5 Michael Hanselmann
455 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
456 b95479a5 Michael Hanselmann
    return job in self._waiting
457 b95479a5 Michael Hanselmann
458 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
459 b95479a5 Michael Hanselmann
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
460 b95479a5 Michael Hanselmann
461 b95479a5 Michael Hanselmann
    assert exp_job == job
462 b95479a5 Michael Hanselmann
    assert exp_dep_job_id == dep_job_id
463 b95479a5 Michael Hanselmann
    assert exp_dep_status == dep_status
464 b95479a5 Michael Hanselmann
465 b95479a5 Michael Hanselmann
    (result_status, _) = result
466 b95479a5 Michael Hanselmann
467 b95479a5 Michael Hanselmann
    if result_status == jqueue._JobDependencyManager.WAIT:
468 b95479a5 Michael Hanselmann
      self._waiting.add(job)
469 b95479a5 Michael Hanselmann
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
470 b95479a5 Michael Hanselmann
      self._waiting.remove(job)
471 b95479a5 Michael Hanselmann
472 b95479a5 Michael Hanselmann
    return result
473 b95479a5 Michael Hanselmann
474 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
475 b95479a5 Michael Hanselmann
    self._notifications.append(job_id)
476 b95479a5 Michael Hanselmann
477 b95479a5 Michael Hanselmann
478 b95479a5 Michael Hanselmann
class _DisabledFakeDependencyManager:
479 b95479a5 Michael Hanselmann
  def JobWaiting(self, _):
480 b95479a5 Michael Hanselmann
    return False
481 b95479a5 Michael Hanselmann
482 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, *args):
483 b95479a5 Michael Hanselmann
    assert False, "Should not be called"
484 b95479a5 Michael Hanselmann
485 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, _):
486 b95479a5 Michael Hanselmann
    pass
487 b95479a5 Michael Hanselmann
488 b95479a5 Michael Hanselmann
489 b95479a5 Michael Hanselmann
class _FakeQueueForProc:
490 b95479a5 Michael Hanselmann
  def __init__(self, depmgr=None):
491 be760ba8 Michael Hanselmann
    self._acquired = False
492 ebb2a2a3 Michael Hanselmann
    self._updates = []
493 6a373640 Michael Hanselmann
    self._submitted = []
494 6a373640 Michael Hanselmann
495 6a373640 Michael Hanselmann
    self._submit_count = itertools.count(1000)
496 be760ba8 Michael Hanselmann
497 b95479a5 Michael Hanselmann
    if depmgr:
498 b95479a5 Michael Hanselmann
      self.depmgr = depmgr
499 b95479a5 Michael Hanselmann
    else:
500 b95479a5 Michael Hanselmann
      self.depmgr = _DisabledFakeDependencyManager()
501 b95479a5 Michael Hanselmann
502 be760ba8 Michael Hanselmann
  def IsAcquired(self):
503 be760ba8 Michael Hanselmann
    return self._acquired
504 be760ba8 Michael Hanselmann
505 ebb2a2a3 Michael Hanselmann
  def GetNextUpdate(self):
506 ebb2a2a3 Michael Hanselmann
    return self._updates.pop(0)
507 ebb2a2a3 Michael Hanselmann
508 6a373640 Michael Hanselmann
  def GetNextSubmittedJob(self):
509 6a373640 Michael Hanselmann
    return self._submitted.pop(0)
510 6a373640 Michael Hanselmann
511 be760ba8 Michael Hanselmann
  def acquire(self, shared=0):
512 be760ba8 Michael Hanselmann
    assert shared == 1
513 be760ba8 Michael Hanselmann
    self._acquired = True
514 be760ba8 Michael Hanselmann
515 be760ba8 Michael Hanselmann
  def release(self):
516 be760ba8 Michael Hanselmann
    assert self._acquired
517 be760ba8 Michael Hanselmann
    self._acquired = False
518 be760ba8 Michael Hanselmann
519 ebb2a2a3 Michael Hanselmann
  def UpdateJobUnlocked(self, job, replicate=True):
520 ebb2a2a3 Michael Hanselmann
    assert self._acquired, "Lock not acquired while updating job"
521 ebb2a2a3 Michael Hanselmann
    self._updates.append((job, bool(replicate)))
522 be760ba8 Michael Hanselmann
523 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
524 6a373640 Michael Hanselmann
    assert not self._acquired, "Lock acquired while submitting jobs"
525 6a373640 Michael Hanselmann
    job_ids = [self._submit_count.next() for _ in jobs]
526 6a373640 Michael Hanselmann
    self._submitted.extend(zip(job_ids, jobs))
527 6a373640 Michael Hanselmann
    return job_ids
528 6a373640 Michael Hanselmann
529 be760ba8 Michael Hanselmann
530 be760ba8 Michael Hanselmann
class _FakeExecOpCodeForProc:
531 ebb2a2a3 Michael Hanselmann
  def __init__(self, queue, before_start, after_start):
532 ebb2a2a3 Michael Hanselmann
    self._queue = queue
533 be760ba8 Michael Hanselmann
    self._before_start = before_start
534 be760ba8 Michael Hanselmann
    self._after_start = after_start
535 be760ba8 Michael Hanselmann
536 f23db633 Michael Hanselmann
  def __call__(self, op, cbs, timeout=None, priority=None):
537 be760ba8 Michael Hanselmann
    assert isinstance(op, opcodes.OpTestDummy)
538 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired(), \
539 ebb2a2a3 Michael Hanselmann
           "Queue lock not released when executing opcode"
540 be760ba8 Michael Hanselmann
541 be760ba8 Michael Hanselmann
    if self._before_start:
542 f23db633 Michael Hanselmann
      self._before_start(timeout, priority)
543 be760ba8 Michael Hanselmann
544 be760ba8 Michael Hanselmann
    cbs.NotifyStart()
545 be760ba8 Michael Hanselmann
546 be760ba8 Michael Hanselmann
    if self._after_start:
547 be760ba8 Michael Hanselmann
      self._after_start(op, cbs)
548 be760ba8 Michael Hanselmann
549 ebb2a2a3 Michael Hanselmann
    # Check again after the callbacks
550 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired()
551 ebb2a2a3 Michael Hanselmann
552 be760ba8 Michael Hanselmann
    if op.fail:
553 be760ba8 Michael Hanselmann
      raise errors.OpExecError("Error requested (%s)" % op.result)
554 be760ba8 Michael Hanselmann
555 6a373640 Michael Hanselmann
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
556 6a373640 Michael Hanselmann
      return cbs.SubmitManyJobs(op.submit_jobs)
557 6a373640 Michael Hanselmann
558 be760ba8 Michael Hanselmann
    return op.result
559 be760ba8 Michael Hanselmann
560 be760ba8 Michael Hanselmann
561 26d3fd2f Michael Hanselmann
class _JobProcessorTestUtils:
562 be760ba8 Michael Hanselmann
  def _CreateJob(self, queue, job_id, ops):
563 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(queue, job_id, ops, True)
564 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
565 be760ba8 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
566 be760ba8 Michael Hanselmann
    self.assertEqual(len(ops), len(job.ops))
567 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.input == inp
568 be760ba8 Michael Hanselmann
                            for (op, inp) in zip(job.ops, ops)))
569 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
570 be760ba8 Michael Hanselmann
    return job
571 be760ba8 Michael Hanselmann
572 26d3fd2f Michael Hanselmann
573 26d3fd2f Michael Hanselmann
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
574 be760ba8 Michael Hanselmann
  def _GenericCheckJob(self, job):
575 be760ba8 Michael Hanselmann
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
576 be760ba8 Michael Hanselmann
                      for op in job.ops)
577 be760ba8 Michael Hanselmann
578 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
579 be760ba8 Michael Hanselmann
                     [[op.start_timestamp for op in job.ops],
580 be760ba8 Michael Hanselmann
                      [op.exec_timestamp for op in job.ops],
581 be760ba8 Michael Hanselmann
                      [op.end_timestamp for op in job.ops]])
582 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
583 be760ba8 Michael Hanselmann
                     [job.received_timestamp,
584 be760ba8 Michael Hanselmann
                      job.start_timestamp,
585 be760ba8 Michael Hanselmann
                      job.end_timestamp])
586 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
587 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
588 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
589 be760ba8 Michael Hanselmann
590 be760ba8 Michael Hanselmann
  def testSuccess(self):
591 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
592 be760ba8 Michael Hanselmann
593 be760ba8 Michael Hanselmann
    for (job_id, opcount) in [(25351, 1), (6637, 3),
594 be760ba8 Michael Hanselmann
                              (24644, 10), (32207, 100)]:
595 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
596 be760ba8 Michael Hanselmann
             for i in range(opcount)]
597 be760ba8 Michael Hanselmann
598 be760ba8 Michael Hanselmann
      # Create job
599 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
600 be760ba8 Michael Hanselmann
601 f23db633 Michael Hanselmann
      def _BeforeStart(timeout, priority):
602 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
603 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
604 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
605 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
606 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
607 be760ba8 Michael Hanselmann
608 be760ba8 Michael Hanselmann
      def _AfterStart(op, cbs):
609 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
610 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
611 ebb2a2a3 Michael Hanselmann
612 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
613 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
614 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
615 be760ba8 Michael Hanselmann
616 be760ba8 Michael Hanselmann
        # Job is running, cancelling shouldn't be possible
617 be760ba8 Michael Hanselmann
        (success, _) = job.Cancel()
618 be760ba8 Michael Hanselmann
        self.assertFalse(success)
619 be760ba8 Michael Hanselmann
620 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
621 be760ba8 Michael Hanselmann
622 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
623 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
624 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
625 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
626 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
627 be760ba8 Michael Hanselmann
        if idx == len(ops) - 1:
628 be760ba8 Michael Hanselmann
          # Last opcode
629 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
630 be760ba8 Michael Hanselmann
        else:
631 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.DEFER)
632 be760ba8 Michael Hanselmann
633 be760ba8 Michael Hanselmann
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
634 be760ba8 Michael Hanselmann
          self.assert_(job.start_timestamp)
635 be760ba8 Michael Hanselmann
          self.assertFalse(job.end_timestamp)
636 be760ba8 Michael Hanselmann
637 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
638 ebb2a2a3 Michael Hanselmann
639 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
640 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
641 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opresult"]),
642 be760ba8 Michael Hanselmann
                       [[op.input.result for op in job.ops]])
643 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
644 be760ba8 Michael Hanselmann
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
645 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
646 be760ba8 Michael Hanselmann
                              for op in job.ops))
647 be760ba8 Michael Hanselmann
648 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
649 be760ba8 Michael Hanselmann
650 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
651 75d81fc8 Michael Hanselmann
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
652 75d81fc8 Michael Hanselmann
                       jqueue._JobProcessor.FINISHED)
653 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
654 be760ba8 Michael Hanselmann
655 be760ba8 Michael Hanselmann
  def testOpcodeError(self):
656 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
657 be760ba8 Michael Hanselmann
658 be760ba8 Michael Hanselmann
    testdata = [
659 be760ba8 Michael Hanselmann
      (17077, 1, 0, 0),
660 be760ba8 Michael Hanselmann
      (1782, 5, 2, 2),
661 be760ba8 Michael Hanselmann
      (18179, 10, 9, 9),
662 be760ba8 Michael Hanselmann
      (4744, 10, 3, 8),
663 be760ba8 Michael Hanselmann
      (23816, 100, 39, 45),
664 be760ba8 Michael Hanselmann
      ]
665 be760ba8 Michael Hanselmann
666 be760ba8 Michael Hanselmann
    for (job_id, opcount, failfrom, failto) in testdata:
667 be760ba8 Michael Hanselmann
      # Prepare opcodes
668 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
669 be760ba8 Michael Hanselmann
                                 fail=(failfrom <= i and
670 be760ba8 Michael Hanselmann
                                       i <= failto))
671 be760ba8 Michael Hanselmann
             for i in range(opcount)]
672 be760ba8 Michael Hanselmann
673 be760ba8 Michael Hanselmann
      # Create job
674 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
675 be760ba8 Michael Hanselmann
676 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, None, None)
677 be760ba8 Michael Hanselmann
678 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
679 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
680 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
681 ebb2a2a3 Michael Hanselmann
        # queued to waitlock
682 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
683 ebb2a2a3 Michael Hanselmann
        # waitlock to running
684 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
685 ebb2a2a3 Michael Hanselmann
        # Opcode result
686 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
687 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
688 be760ba8 Michael Hanselmann
689 be760ba8 Michael Hanselmann
        if idx in (failfrom, len(ops) - 1):
690 be760ba8 Michael Hanselmann
          # Last opcode
691 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
692 be760ba8 Michael Hanselmann
          break
693 be760ba8 Michael Hanselmann
694 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
695 be760ba8 Michael Hanselmann
696 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
697 be760ba8 Michael Hanselmann
698 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
699 ebb2a2a3 Michael Hanselmann
700 be760ba8 Michael Hanselmann
      # Check job status
701 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
702 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["id"]), [job_id])
703 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
704 be760ba8 Michael Hanselmann
705 be760ba8 Michael Hanselmann
      # Check opcode status
706 be760ba8 Michael Hanselmann
      data = zip(job.ops,
707 be760ba8 Michael Hanselmann
                 job.GetInfo(["opstatus"])[0],
708 be760ba8 Michael Hanselmann
                 job.GetInfo(["opresult"])[0])
709 be760ba8 Michael Hanselmann
710 be760ba8 Michael Hanselmann
      for idx, (op, opstatus, opresult) in enumerate(data):
711 be760ba8 Michael Hanselmann
        if idx < failfrom:
712 be760ba8 Michael Hanselmann
          assert not op.input.fail
713 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
714 be760ba8 Michael Hanselmann
          self.assertEqual(opresult, op.input.result)
715 be760ba8 Michael Hanselmann
        elif idx <= failto:
716 be760ba8 Michael Hanselmann
          assert op.input.fail
717 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
718 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
719 be760ba8 Michael Hanselmann
        else:
720 be760ba8 Michael Hanselmann
          assert not op.input.fail
721 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
722 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
723 be760ba8 Michael Hanselmann
724 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
725 be760ba8 Michael Hanselmann
                              for op in job.ops[:failfrom]))
726 be760ba8 Michael Hanselmann
727 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
728 be760ba8 Michael Hanselmann
729 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
730 75d81fc8 Michael Hanselmann
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
731 75d81fc8 Michael Hanselmann
                       jqueue._JobProcessor.FINISHED)
732 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
733 be760ba8 Michael Hanselmann
734 be760ba8 Michael Hanselmann
  def testCancelWhileInQueue(self):
735 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
736 be760ba8 Michael Hanselmann
737 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
738 be760ba8 Michael Hanselmann
           for i in range(5)]
739 be760ba8 Michael Hanselmann
740 be760ba8 Michael Hanselmann
    # Create job
741 be760ba8 Michael Hanselmann
    job_id = 17045
742 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
743 be760ba8 Michael Hanselmann
744 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
745 be760ba8 Michael Hanselmann
746 be760ba8 Michael Hanselmann
    # Mark as cancelled
747 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
748 be760ba8 Michael Hanselmann
    self.assert_(success)
749 be760ba8 Michael Hanselmann
750 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
751 ebb2a2a3 Michael Hanselmann
752 66bd7445 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
753 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
754 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
755 be760ba8 Michael Hanselmann
                            for op in job.ops))
756 be760ba8 Michael Hanselmann
757 66bd7445 Michael Hanselmann
    # Serialize to check for differences
758 66bd7445 Michael Hanselmann
    before_proc = job.Serialize()
759 66bd7445 Michael Hanselmann
760 66bd7445 Michael Hanselmann
    # Simulate processor called in workerpool
761 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
762 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
763 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
764 30c945d0 Michael Hanselmann
765 30c945d0 Michael Hanselmann
    # Check result
766 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
767 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
768 30c945d0 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
769 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
770 30c945d0 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
771 30c945d0 Michael Hanselmann
                                for op in job.ops))
772 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
773 30c945d0 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
774 30c945d0 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
775 30c945d0 Michael Hanselmann
776 66bd7445 Michael Hanselmann
    # Must not have changed or written
777 66bd7445 Michael Hanselmann
    self.assertEqual(before_proc, job.Serialize())
778 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
779 66bd7445 Michael Hanselmann
780 30c945d0 Michael Hanselmann
  def testCancelWhileWaitlockInQueue(self):
781 30c945d0 Michael Hanselmann
    queue = _FakeQueueForProc()
782 30c945d0 Michael Hanselmann
783 30c945d0 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
784 30c945d0 Michael Hanselmann
           for i in range(5)]
785 30c945d0 Michael Hanselmann
786 30c945d0 Michael Hanselmann
    # Create job
787 30c945d0 Michael Hanselmann
    job_id = 8645
788 30c945d0 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
789 30c945d0 Michael Hanselmann
790 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
791 30c945d0 Michael Hanselmann
792 30c945d0 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_WAITLOCK
793 30c945d0 Michael Hanselmann
794 30c945d0 Michael Hanselmann
    assert len(job.ops) == 5
795 30c945d0 Michael Hanselmann
796 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
797 30c945d0 Michael Hanselmann
798 30c945d0 Michael Hanselmann
    # Mark as cancelling
799 30c945d0 Michael Hanselmann
    (success, _) = job.Cancel()
800 30c945d0 Michael Hanselmann
    self.assert_(success)
801 30c945d0 Michael Hanselmann
802 30c945d0 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
803 30c945d0 Michael Hanselmann
804 30c945d0 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
805 30c945d0 Michael Hanselmann
                            for op in job.ops))
806 30c945d0 Michael Hanselmann
807 30c945d0 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
808 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
809 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
810 be760ba8 Michael Hanselmann
811 be760ba8 Michael Hanselmann
    # Check result
812 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
813 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
814 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
815 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
816 be760ba8 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
817 be760ba8 Michael Hanselmann
                                for op in job.ops))
818 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
819 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
820 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
821 be760ba8 Michael Hanselmann
822 be760ba8 Michael Hanselmann
  def testCancelWhileWaitlock(self):
823 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
824 be760ba8 Michael Hanselmann
825 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
826 be760ba8 Michael Hanselmann
           for i in range(5)]
827 be760ba8 Michael Hanselmann
828 be760ba8 Michael Hanselmann
    # Create job
829 be760ba8 Michael Hanselmann
    job_id = 11009
830 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
831 be760ba8 Michael Hanselmann
832 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
833 be760ba8 Michael Hanselmann
834 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
835 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
836 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
837 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
838 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
839 be760ba8 Michael Hanselmann
840 be760ba8 Michael Hanselmann
      # Mark as cancelled
841 be760ba8 Michael Hanselmann
      (success, _) = job.Cancel()
842 be760ba8 Michael Hanselmann
      self.assert_(success)
843 be760ba8 Michael Hanselmann
844 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
845 be760ba8 Michael Hanselmann
                              for op in job.ops))
846 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
847 be760ba8 Michael Hanselmann
848 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
849 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
850 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
851 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
852 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
853 be760ba8 Michael Hanselmann
854 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
855 be760ba8 Michael Hanselmann
856 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
857 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
858 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
859 ebb2a2a3 Michael Hanselmann
    self.assertEqual(queue.GetNextUpdate(), (job, True))
860 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
861 be760ba8 Michael Hanselmann
862 be760ba8 Michael Hanselmann
    # Check result
863 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
864 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
865 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
866 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
867 be760ba8 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
868 be760ba8 Michael Hanselmann
                                for op in job.ops))
869 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
870 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
871 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
872 be760ba8 Michael Hanselmann
873 9e49dfc5 Michael Hanselmann
  def testCancelWhileWaitlockWithTimeout(self):
874 9e49dfc5 Michael Hanselmann
    queue = _FakeQueueForProc()
875 9e49dfc5 Michael Hanselmann
876 9e49dfc5 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
877 9e49dfc5 Michael Hanselmann
           for i in range(5)]
878 9e49dfc5 Michael Hanselmann
879 9e49dfc5 Michael Hanselmann
    # Create job
880 9e49dfc5 Michael Hanselmann
    job_id = 24314
881 9e49dfc5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
882 9e49dfc5 Michael Hanselmann
883 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
884 9e49dfc5 Michael Hanselmann
885 9e49dfc5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
886 9e49dfc5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
887 9e49dfc5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
888 9e49dfc5 Michael Hanselmann
889 9e49dfc5 Michael Hanselmann
      # Mark as cancelled
890 9e49dfc5 Michael Hanselmann
      (success, _) = job.Cancel()
891 9e49dfc5 Michael Hanselmann
      self.assert_(success)
892 9e49dfc5 Michael Hanselmann
893 9e49dfc5 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
894 9e49dfc5 Michael Hanselmann
                              for op in job.ops))
895 9e49dfc5 Michael Hanselmann
896 9e49dfc5 Michael Hanselmann
      # Fake an acquire attempt timing out
897 9e49dfc5 Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
898 9e49dfc5 Michael Hanselmann
899 9e49dfc5 Michael Hanselmann
    def _AfterStart(op, cbs):
900 9e49dfc5 Michael Hanselmann
      self.fail("Should not reach this")
901 9e49dfc5 Michael Hanselmann
902 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
903 9e49dfc5 Michael Hanselmann
904 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
905 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
906 9e49dfc5 Michael Hanselmann
907 9e49dfc5 Michael Hanselmann
    # Check result
908 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
909 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
910 9e49dfc5 Michael Hanselmann
    self.assert_(job.start_timestamp)
911 9e49dfc5 Michael Hanselmann
    self.assert_(job.end_timestamp)
912 9e49dfc5 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
913 9e49dfc5 Michael Hanselmann
                                for op in job.ops))
914 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
915 9e49dfc5 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
916 9e49dfc5 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
917 9e49dfc5 Michael Hanselmann
918 be760ba8 Michael Hanselmann
  def testCancelWhileRunning(self):
919 be760ba8 Michael Hanselmann
    # Tests canceling a job with finished opcodes and more, unprocessed ones
920 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
921 be760ba8 Michael Hanselmann
922 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
923 be760ba8 Michael Hanselmann
           for i in range(3)]
924 be760ba8 Michael Hanselmann
925 be760ba8 Michael Hanselmann
    # Create job
926 be760ba8 Michael Hanselmann
    job_id = 28492
927 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
928 be760ba8 Michael Hanselmann
929 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
930 be760ba8 Michael Hanselmann
931 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
932 be760ba8 Michael Hanselmann
933 be760ba8 Michael Hanselmann
    # Run one opcode
934 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
935 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
936 be760ba8 Michael Hanselmann
937 be760ba8 Michael Hanselmann
    # Job goes back to queued
938 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
939 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
940 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
941 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
942 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
943 be760ba8 Michael Hanselmann
                      ["Res0", None, None]])
944 be760ba8 Michael Hanselmann
945 be760ba8 Michael Hanselmann
    # Mark as cancelled
946 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
947 be760ba8 Michael Hanselmann
    self.assert_(success)
948 be760ba8 Michael Hanselmann
949 be760ba8 Michael Hanselmann
    # Try processing another opcode (this will actually cancel the job)
950 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
951 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
952 be760ba8 Michael Hanselmann
953 be760ba8 Michael Hanselmann
    # Check result
954 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
955 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
956 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
957 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
958 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
959 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
960 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
961 be760ba8 Michael Hanselmann
                      ["Res0", "Job canceled by request",
962 be760ba8 Michael Hanselmann
                       "Job canceled by request"]])
963 be760ba8 Michael Hanselmann
964 be760ba8 Michael Hanselmann
  def testPartiallyRun(self):
965 be760ba8 Michael Hanselmann
    # Tests calling the processor on a job that's been partially run before the
966 be760ba8 Michael Hanselmann
    # program was restarted
967 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
968 be760ba8 Michael Hanselmann
969 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
970 be760ba8 Michael Hanselmann
971 be760ba8 Michael Hanselmann
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
972 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
973 be760ba8 Michael Hanselmann
             for i in range(10)]
974 be760ba8 Michael Hanselmann
975 be760ba8 Michael Hanselmann
      # Create job
976 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
977 be760ba8 Michael Hanselmann
978 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
979 be760ba8 Michael Hanselmann
980 be760ba8 Michael Hanselmann
      for _ in range(successcount):
981 75d81fc8 Michael Hanselmann
        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
982 75d81fc8 Michael Hanselmann
                         jqueue._JobProcessor.DEFER)
983 be760ba8 Michael Hanselmann
984 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
985 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
986 be760ba8 Michael Hanselmann
                       [[constants.OP_STATUS_SUCCESS
987 be760ba8 Michael Hanselmann
                         for _ in range(successcount)] +
988 be760ba8 Michael Hanselmann
                        [constants.OP_STATUS_QUEUED
989 be760ba8 Michael Hanselmann
                         for _ in range(len(ops) - successcount)]])
990 be760ba8 Michael Hanselmann
991 03b63608 Michael Hanselmann
      self.assert_(job.ops_iter)
992 be760ba8 Michael Hanselmann
993 be760ba8 Michael Hanselmann
      # Serialize and restore (simulates program restart)
994 c0f6d0d8 Michael Hanselmann
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
995 03b63608 Michael Hanselmann
      self.assertFalse(newjob.ops_iter)
996 be760ba8 Michael Hanselmann
      self._TestPartial(newjob, successcount)
997 be760ba8 Michael Hanselmann
998 be760ba8 Michael Hanselmann
  def _TestPartial(self, job, successcount):
999 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1000 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1001 be760ba8 Michael Hanselmann
1002 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1003 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1004 be760ba8 Michael Hanselmann
1005 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops) - successcount)):
1006 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1007 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1008 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1009 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1010 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1011 be760ba8 Michael Hanselmann
1012 be760ba8 Michael Hanselmann
      if remaining == 0:
1013 be760ba8 Michael Hanselmann
        # Last opcode
1014 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1015 be760ba8 Michael Hanselmann
        break
1016 be760ba8 Michael Hanselmann
1017 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1018 be760ba8 Michael Hanselmann
1019 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1020 be760ba8 Michael Hanselmann
1021 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1022 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1023 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1024 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1025 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1026 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1027 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1028 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1029 be760ba8 Michael Hanselmann
                            for op in job.ops))
1030 be760ba8 Michael Hanselmann
1031 be760ba8 Michael Hanselmann
    self._GenericCheckJob(job)
1032 be760ba8 Michael Hanselmann
1033 66bd7445 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1034 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1035 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1036 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1037 be760ba8 Michael Hanselmann
1038 be760ba8 Michael Hanselmann
    # ... also after being restored
1039 c0f6d0d8 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1040 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1041 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1042 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1043 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1044 be760ba8 Michael Hanselmann
1045 be760ba8 Michael Hanselmann
  def testProcessorOnRunningJob(self):
1046 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1047 be760ba8 Michael Hanselmann
1048 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1049 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1050 be760ba8 Michael Hanselmann
1051 be760ba8 Michael Hanselmann
    # Create job
1052 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 9571, ops)
1053 be760ba8 Michael Hanselmann
1054 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1055 be760ba8 Michael Hanselmann
1056 be760ba8 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
1057 be760ba8 Michael Hanselmann
1058 be760ba8 Michael Hanselmann
    assert len(job.ops) == 1
1059 be760ba8 Michael Hanselmann
1060 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1061 be760ba8 Michael Hanselmann
1062 be760ba8 Michael Hanselmann
    # Calling on running job must fail
1063 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
1064 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job))
1065 be760ba8 Michael Hanselmann
1066 be760ba8 Michael Hanselmann
  def testLogMessages(self):
1067 be760ba8 Michael Hanselmann
    # Tests the "Feedback" callback function
1068 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1069 be760ba8 Michael Hanselmann
1070 be760ba8 Michael Hanselmann
    messages = {
1071 be760ba8 Michael Hanselmann
      1: [
1072 be760ba8 Michael Hanselmann
        (None, "Hello"),
1073 be760ba8 Michael Hanselmann
        (None, "World"),
1074 be760ba8 Michael Hanselmann
        (constants.ELOG_MESSAGE, "there"),
1075 be760ba8 Michael Hanselmann
        ],
1076 be760ba8 Michael Hanselmann
      4: [
1077 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1078 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1079 be760ba8 Michael Hanselmann
        ],
1080 be760ba8 Michael Hanselmann
      }
1081 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1082 be760ba8 Michael Hanselmann
                               messages=messages.get(i, []))
1083 be760ba8 Michael Hanselmann
           for i in range(5)]
1084 be760ba8 Michael Hanselmann
1085 be760ba8 Michael Hanselmann
    # Create job
1086 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 29386, ops)
1087 be760ba8 Michael Hanselmann
1088 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1089 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1090 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1091 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1092 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1093 be760ba8 Michael Hanselmann
1094 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
1095 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1096 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1097 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1098 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1099 be760ba8 Michael Hanselmann
1100 be760ba8 Michael Hanselmann
      self.assertRaises(AssertionError, cbs.Feedback,
1101 be760ba8 Michael Hanselmann
                        "too", "many", "arguments")
1102 be760ba8 Michael Hanselmann
1103 be760ba8 Michael Hanselmann
      for (log_type, msg) in op.messages:
1104 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1105 be760ba8 Michael Hanselmann
        if log_type:
1106 be760ba8 Michael Hanselmann
          cbs.Feedback(log_type, msg)
1107 be760ba8 Michael Hanselmann
        else:
1108 be760ba8 Michael Hanselmann
          cbs.Feedback(msg)
1109 ebb2a2a3 Michael Hanselmann
        # Check for job update without replication
1110 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1111 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1112 be760ba8 Michael Hanselmann
1113 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1114 be760ba8 Michael Hanselmann
1115 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops))):
1116 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1117 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1118 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1119 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1120 be760ba8 Michael Hanselmann
1121 be760ba8 Michael Hanselmann
      if remaining == 0:
1122 be760ba8 Michael Hanselmann
        # Last opcode
1123 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1124 be760ba8 Michael Hanselmann
        break
1125 be760ba8 Michael Hanselmann
1126 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1127 be760ba8 Michael Hanselmann
1128 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1129 be760ba8 Michael Hanselmann
1130 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1131 ebb2a2a3 Michael Hanselmann
1132 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1133 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1134 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1135 be760ba8 Michael Hanselmann
1136 be760ba8 Michael Hanselmann
    logmsgcount = sum(len(m) for m in messages.values())
1137 be760ba8 Michael Hanselmann
1138 be760ba8 Michael Hanselmann
    self._CheckLogMessages(job, logmsgcount)
1139 be760ba8 Michael Hanselmann
1140 be760ba8 Michael Hanselmann
    # Serialize and restore (simulates program restart)
1141 c0f6d0d8 Michael Hanselmann
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1142 be760ba8 Michael Hanselmann
    self._CheckLogMessages(newjob, logmsgcount)
1143 be760ba8 Michael Hanselmann
1144 be760ba8 Michael Hanselmann
    # Check each message
1145 be760ba8 Michael Hanselmann
    prevserial = -1
1146 be760ba8 Michael Hanselmann
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1147 be760ba8 Michael Hanselmann
      for (serial, timestamp, log_type, msg) in oplog:
1148 be760ba8 Michael Hanselmann
        (exptype, expmsg) = messages.get(idx).pop(0)
1149 be760ba8 Michael Hanselmann
        if exptype:
1150 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, exptype)
1151 be760ba8 Michael Hanselmann
        else:
1152 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1153 be760ba8 Michael Hanselmann
        self.assertEqual(expmsg, msg)
1154 be760ba8 Michael Hanselmann
        self.assert_(serial > prevserial)
1155 be760ba8 Michael Hanselmann
        prevserial = serial
1156 be760ba8 Michael Hanselmann
1157 be760ba8 Michael Hanselmann
  def _CheckLogMessages(self, job, count):
1158 be760ba8 Michael Hanselmann
    # Check serial
1159 be760ba8 Michael Hanselmann
    self.assertEqual(job.log_serial, count)
1160 be760ba8 Michael Hanselmann
1161 be760ba8 Michael Hanselmann
    # No filter
1162 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(None),
1163 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1164 be760ba8 Michael Hanselmann
                      for entry in entries])
1165 be760ba8 Michael Hanselmann
1166 be760ba8 Michael Hanselmann
    # Filter with serial
1167 be760ba8 Michael Hanselmann
    assert count > 3
1168 be760ba8 Michael Hanselmann
    self.assert_(job.GetLogEntries(3))
1169 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(3),
1170 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1171 be760ba8 Michael Hanselmann
                      for entry in entries][3:])
1172 be760ba8 Michael Hanselmann
1173 be760ba8 Michael Hanselmann
    # No log message after highest serial
1174 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count))
1175 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count + 3))
1176 be760ba8 Michael Hanselmann
1177 6a373640 Michael Hanselmann
  def testSubmitManyJobs(self):
1178 6a373640 Michael Hanselmann
    queue = _FakeQueueForProc()
1179 6a373640 Michael Hanselmann
1180 6a373640 Michael Hanselmann
    job_id = 15656
1181 6a373640 Michael Hanselmann
    ops = [
1182 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1183 6a373640 Michael Hanselmann
                          submit_jobs=[]),
1184 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1185 6a373640 Michael Hanselmann
                          submit_jobs=[
1186 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1187 6a373640 Michael Hanselmann
                            ]),
1188 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False,
1189 6a373640 Michael Hanselmann
                          submit_jobs=[
1190 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1191 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1192 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1193 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1194 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1195 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1196 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1197 6a373640 Michael Hanselmann
                            ]),
1198 6a373640 Michael Hanselmann
      ]
1199 6a373640 Michael Hanselmann
1200 6a373640 Michael Hanselmann
    # Create job
1201 6a373640 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1202 6a373640 Michael Hanselmann
1203 6a373640 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1204 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1205 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1206 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1207 6a373640 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1208 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1209 6a373640 Michael Hanselmann
1210 6a373640 Michael Hanselmann
    def _AfterStart(op, cbs):
1211 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1212 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1213 6a373640 Michael Hanselmann
1214 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1215 6a373640 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1216 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1217 6a373640 Michael Hanselmann
1218 6a373640 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1219 6a373640 Michael Hanselmann
      (success, _) = job.Cancel()
1220 6a373640 Michael Hanselmann
      self.assertFalse(success)
1221 6a373640 Michael Hanselmann
1222 6a373640 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1223 6a373640 Michael Hanselmann
1224 6a373640 Michael Hanselmann
    for idx in range(len(ops)):
1225 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1226 6a373640 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1227 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1228 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1229 6a373640 Michael Hanselmann
      if idx == len(ops) - 1:
1230 6a373640 Michael Hanselmann
        # Last opcode
1231 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1232 6a373640 Michael Hanselmann
      else:
1233 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
1234 6a373640 Michael Hanselmann
1235 6a373640 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1236 6a373640 Michael Hanselmann
        self.assert_(job.start_timestamp)
1237 6a373640 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1238 6a373640 Michael Hanselmann
1239 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1240 6a373640 Michael Hanselmann
1241 6a373640 Michael Hanselmann
    for idx, submitted_ops in enumerate(job_ops
1242 6a373640 Michael Hanselmann
                                        for op in ops
1243 6a373640 Michael Hanselmann
                                        for job_ops in op.submit_jobs):
1244 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextSubmittedJob(),
1245 6a373640 Michael Hanselmann
                       (1000 + idx, submitted_ops))
1246 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1247 6a373640 Michael Hanselmann
1248 6a373640 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1249 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1250 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1251 6a373640 Michael Hanselmann
                     [[[], [1000], [1001, 1002, 1003]]])
1252 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1253 6a373640 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1254 6a373640 Michael Hanselmann
1255 6a373640 Michael Hanselmann
    self._GenericCheckJob(job)
1256 6a373640 Michael Hanselmann
1257 1e6d5750 Iustin Pop
    # Calling the processor on a finished job should be a no-op
1258 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1259 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1260 1e6d5750 Iustin Pop
    self.assertRaises(IndexError, queue.GetNextUpdate)
1261 6a373640 Michael Hanselmann
1262 b95479a5 Michael Hanselmann
  def testJobDependency(self):
1263 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1264 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1265 b95479a5 Michael Hanselmann
1266 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1267 b95479a5 Michael Hanselmann
1268 b95479a5 Michael Hanselmann
    prev_job_id = 22113
1269 b95479a5 Michael Hanselmann
    prev_job_id2 = 28102
1270 b95479a5 Michael Hanselmann
    job_id = 29929
1271 b95479a5 Michael Hanselmann
    ops = [
1272 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1273 b95479a5 Michael Hanselmann
                          depends=[
1274 b95479a5 Michael Hanselmann
                            [prev_job_id2, None],
1275 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1276 b95479a5 Michael Hanselmann
                            ]),
1277 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False),
1278 b95479a5 Michael Hanselmann
      ]
1279 b95479a5 Michael Hanselmann
1280 b95479a5 Michael Hanselmann
    # Create job
1281 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1282 b95479a5 Michael Hanselmann
1283 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1284 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1285 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1286 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1287 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1288 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1289 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1290 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1291 b95479a5 Michael Hanselmann
1292 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1293 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1294 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1295 b95479a5 Michael Hanselmann
1296 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1297 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1298 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1299 b95479a5 Michael Hanselmann
1300 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1301 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1302 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1303 b95479a5 Michael Hanselmann
1304 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1305 b95479a5 Michael Hanselmann
1306 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1307 b95479a5 Michael Hanselmann
1308 b95479a5 Michael Hanselmann
    counter = itertools.count()
1309 b95479a5 Michael Hanselmann
    while True:
1310 b95479a5 Michael Hanselmann
      attempt = counter.next()
1311 b95479a5 Michael Hanselmann
1312 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1313 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1314 b95479a5 Michael Hanselmann
1315 b95479a5 Michael Hanselmann
      if attempt < 2:
1316 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1317 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1318 b95479a5 Michael Hanselmann
      elif attempt == 2:
1319 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1320 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1321 b95479a5 Michael Hanselmann
        # The processor will ask for the next dependency immediately
1322 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1323 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1324 b95479a5 Michael Hanselmann
      elif attempt < 5:
1325 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1326 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1327 b95479a5 Michael Hanselmann
      elif attempt == 5:
1328 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1329 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1330 b95479a5 Michael Hanselmann
      if attempt == 2:
1331 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 2)
1332 b95479a5 Michael Hanselmann
      elif attempt > 5:
1333 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1334 b95479a5 Michael Hanselmann
      else:
1335 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1336 b95479a5 Michael Hanselmann
1337 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1338 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt >= 5:
1339 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1340 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1341 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1342 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1343 b95479a5 Michael Hanselmann
1344 b95479a5 Michael Hanselmann
      if attempt < 5:
1345 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1346 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1347 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1348 b95479a5 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1349 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1350 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1351 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1352 b95479a5 Michael Hanselmann
        continue
1353 b95479a5 Michael Hanselmann
1354 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1355 b95479a5 Michael Hanselmann
        # Last opcode
1356 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1357 b95479a5 Michael Hanselmann
        break
1358 b95479a5 Michael Hanselmann
1359 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1360 b95479a5 Michael Hanselmann
1361 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1362 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1363 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1364 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1365 b95479a5 Michael Hanselmann
1366 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1367 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1368 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1369 b95479a5 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1370 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1371 b95479a5 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1372 b95479a5 Michael Hanselmann
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1373 b95479a5 Michael Hanselmann
                               for op in job.ops))
1374 b95479a5 Michael Hanselmann
1375 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1376 b95479a5 Michael Hanselmann
1377 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1378 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1379 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1380 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountWaitingJobs())
1381 b95479a5 Michael Hanselmann
1382 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1383 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1384 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1385 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1386 b95479a5 Michael Hanselmann
1387 b95479a5 Michael Hanselmann
  def testJobDependencyCancel(self):
1388 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1389 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1390 b95479a5 Michael Hanselmann
1391 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1392 b95479a5 Michael Hanselmann
1393 b95479a5 Michael Hanselmann
    prev_job_id = 13623
1394 b95479a5 Michael Hanselmann
    job_id = 30876
1395 b95479a5 Michael Hanselmann
    ops = [
1396 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1397 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1398 b95479a5 Michael Hanselmann
                          depends=[
1399 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1400 b95479a5 Michael Hanselmann
                            ]),
1401 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1402 b95479a5 Michael Hanselmann
      ]
1403 b95479a5 Michael Hanselmann
1404 b95479a5 Michael Hanselmann
    # Create job
1405 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1406 b95479a5 Michael Hanselmann
1407 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1408 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1409 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1410 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1411 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1412 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1413 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1414 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1415 b95479a5 Michael Hanselmann
1416 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1417 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1418 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1419 b95479a5 Michael Hanselmann
1420 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1421 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1422 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1423 b95479a5 Michael Hanselmann
1424 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1425 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1426 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1427 b95479a5 Michael Hanselmann
1428 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1429 b95479a5 Michael Hanselmann
1430 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1431 b95479a5 Michael Hanselmann
1432 b95479a5 Michael Hanselmann
    counter = itertools.count()
1433 b95479a5 Michael Hanselmann
    while True:
1434 b95479a5 Michael Hanselmann
      attempt = counter.next()
1435 b95479a5 Michael Hanselmann
1436 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1437 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1438 b95479a5 Michael Hanselmann
1439 b95479a5 Michael Hanselmann
      if attempt == 0:
1440 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1441 b95479a5 Michael Hanselmann
        pass
1442 b95479a5 Michael Hanselmann
      elif attempt < 4:
1443 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1444 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1445 b95479a5 Michael Hanselmann
      elif attempt == 4:
1446 b95479a5 Michael Hanselmann
        # Other job was cancelled
1447 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1448 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1449 b95479a5 Michael Hanselmann
1450 b95479a5 Michael Hanselmann
      if attempt == 0:
1451 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1452 b95479a5 Michael Hanselmann
      else:
1453 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1454 b95479a5 Michael Hanselmann
1455 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1456 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
1457 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1458 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1459 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1460 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1461 b95479a5 Michael Hanselmann
1462 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
1463 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1464 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1465 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1466 b95479a5 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1467 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1468 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1469 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1470 b95479a5 Michael Hanselmann
        continue
1471 b95479a5 Michael Hanselmann
1472 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1473 b95479a5 Michael Hanselmann
        # Last opcode
1474 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1475 b95479a5 Michael Hanselmann
        break
1476 b95479a5 Michael Hanselmann
1477 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1478 b95479a5 Michael Hanselmann
1479 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1480 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1481 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1482 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1483 b95479a5 Michael Hanselmann
1484 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1485 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1486 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1487 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1488 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
1489 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
1490 b95479a5 Michael Hanselmann
                      ["Res0", "Job canceled by request",
1491 b95479a5 Michael Hanselmann
                       "Job canceled by request"]])
1492 b95479a5 Michael Hanselmann
1493 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1494 b95479a5 Michael Hanselmann
1495 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1496 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1497 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1498 b95479a5 Michael Hanselmann
1499 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1500 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1501 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1502 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1503 b95479a5 Michael Hanselmann
1504 b95479a5 Michael Hanselmann
  def testJobDependencyWrongstatus(self):
1505 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1506 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1507 b95479a5 Michael Hanselmann
1508 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1509 b95479a5 Michael Hanselmann
1510 b95479a5 Michael Hanselmann
    prev_job_id = 9741
1511 b95479a5 Michael Hanselmann
    job_id = 11763
1512 b95479a5 Michael Hanselmann
    ops = [
1513 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1514 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1515 b95479a5 Michael Hanselmann
                          depends=[
1516 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1517 b95479a5 Michael Hanselmann
                            ]),
1518 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1519 b95479a5 Michael Hanselmann
      ]
1520 b95479a5 Michael Hanselmann
1521 b95479a5 Michael Hanselmann
    # Create job
1522 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1523 b95479a5 Michael Hanselmann
1524 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1525 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1526 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1527 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1528 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1529 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1530 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1531 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1532 b95479a5 Michael Hanselmann
1533 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1534 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1535 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1536 b95479a5 Michael Hanselmann
1537 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1538 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1539 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1540 b95479a5 Michael Hanselmann
1541 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1542 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1543 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1544 b95479a5 Michael Hanselmann
1545 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1546 b95479a5 Michael Hanselmann
1547 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1548 b95479a5 Michael Hanselmann
1549 b95479a5 Michael Hanselmann
    counter = itertools.count()
1550 b95479a5 Michael Hanselmann
    while True:
1551 b95479a5 Michael Hanselmann
      attempt = counter.next()
1552 b95479a5 Michael Hanselmann
1553 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1554 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1555 b95479a5 Michael Hanselmann
1556 b95479a5 Michael Hanselmann
      if attempt == 0:
1557 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1558 b95479a5 Michael Hanselmann
        pass
1559 b95479a5 Michael Hanselmann
      elif attempt < 4:
1560 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1561 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1562 b95479a5 Michael Hanselmann
      elif attempt == 4:
1563 b95479a5 Michael Hanselmann
        # Other job failed
1564 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1565 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1566 b95479a5 Michael Hanselmann
1567 b95479a5 Michael Hanselmann
      if attempt == 0:
1568 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1569 b95479a5 Michael Hanselmann
      else:
1570 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1571 b95479a5 Michael Hanselmann
1572 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1573 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
1574 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1575 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1576 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1577 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1578 b95479a5 Michael Hanselmann
1579 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
1580 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1581 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1582 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1583 b95479a5 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1584 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1585 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1586 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1587 b95479a5 Michael Hanselmann
        continue
1588 b95479a5 Michael Hanselmann
1589 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1590 b95479a5 Michael Hanselmann
        # Last opcode
1591 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1592 b95479a5 Michael Hanselmann
        break
1593 b95479a5 Michael Hanselmann
1594 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1595 b95479a5 Michael Hanselmann
1596 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1597 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1598 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1599 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1600 b95479a5 Michael Hanselmann
1601 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1602 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1603 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1604 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1605 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR,
1606 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR]]),
1607 b95479a5 Michael Hanselmann
1608 b95479a5 Michael Hanselmann
    (opresult, ) = job.GetInfo(["opresult"])
1609 b95479a5 Michael Hanselmann
    self.assertEqual(len(opresult), len(ops))
1610 b95479a5 Michael Hanselmann
    self.assertEqual(opresult[0], "Res0")
1611 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[1]))
1612 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[2]))
1613 b95479a5 Michael Hanselmann
1614 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1615 b95479a5 Michael Hanselmann
1616 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1617 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1618 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1619 b95479a5 Michael Hanselmann
1620 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1621 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1622 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1623 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1624 b95479a5 Michael Hanselmann
1625 be760ba8 Michael Hanselmann
1626 26d3fd2f Michael Hanselmann
class _FakeTimeoutStrategy:
1627 26d3fd2f Michael Hanselmann
  def __init__(self, timeouts):
1628 26d3fd2f Michael Hanselmann
    self.timeouts = timeouts
1629 26d3fd2f Michael Hanselmann
    self.attempts = 0
1630 26d3fd2f Michael Hanselmann
    self.last_timeout = None
1631 26d3fd2f Michael Hanselmann
1632 26d3fd2f Michael Hanselmann
  def NextAttempt(self):
1633 26d3fd2f Michael Hanselmann
    self.attempts += 1
1634 26d3fd2f Michael Hanselmann
    if self.timeouts:
1635 26d3fd2f Michael Hanselmann
      timeout = self.timeouts.pop(0)
1636 26d3fd2f Michael Hanselmann
    else:
1637 26d3fd2f Michael Hanselmann
      timeout = None
1638 26d3fd2f Michael Hanselmann
    self.last_timeout = timeout
1639 26d3fd2f Michael Hanselmann
    return timeout
1640 26d3fd2f Michael Hanselmann
1641 26d3fd2f Michael Hanselmann
1642 26d3fd2f Michael Hanselmann
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1643 26d3fd2f Michael Hanselmann
  def setUp(self):
1644 26d3fd2f Michael Hanselmann
    self.queue = _FakeQueueForProc()
1645 26d3fd2f Michael Hanselmann
    self.job = None
1646 26d3fd2f Michael Hanselmann
    self.curop = None
1647 26d3fd2f Michael Hanselmann
    self.opcounter = None
1648 26d3fd2f Michael Hanselmann
    self.timeout_strategy = None
1649 26d3fd2f Michael Hanselmann
    self.retries = 0
1650 26d3fd2f Michael Hanselmann
    self.prev_tsop = None
1651 26d3fd2f Michael Hanselmann
    self.prev_prio = None
1652 5fd6b694 Michael Hanselmann
    self.prev_status = None
1653 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = None
1654 26d3fd2f Michael Hanselmann
    self.gave_lock = None
1655 26d3fd2f Michael Hanselmann
    self.done_lock_before_blocking = False
1656 26d3fd2f Michael Hanselmann
1657 f23db633 Michael Hanselmann
  def _BeforeStart(self, timeout, priority):
1658 26d3fd2f Michael Hanselmann
    job = self.job
1659 26d3fd2f Michael Hanselmann
1660 5fd6b694 Michael Hanselmann
    # If status has changed, job must've been written
1661 5fd6b694 Michael Hanselmann
    if self.prev_status != self.job.ops[self.curop].status:
1662 5fd6b694 Michael Hanselmann
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1663 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1664 5fd6b694 Michael Hanselmann
1665 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
1666 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1667 26d3fd2f Michael Hanselmann
1668 26d3fd2f Michael Hanselmann
    ts = self.timeout_strategy
1669 26d3fd2f Michael Hanselmann
1670 26d3fd2f Michael Hanselmann
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
1671 26d3fd2f Michael Hanselmann
    self.assertEqual(timeout, ts.last_timeout)
1672 f23db633 Michael Hanselmann
    self.assertEqual(priority, job.ops[self.curop].priority)
1673 26d3fd2f Michael Hanselmann
1674 26d3fd2f Michael Hanselmann
    self.gave_lock = True
1675 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = priority
1676 26d3fd2f Michael Hanselmann
1677 26d3fd2f Michael Hanselmann
    if (self.curop == 3 and
1678 26d3fd2f Michael Hanselmann
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1679 26d3fd2f Michael Hanselmann
      # Give locks before running into blocking acquire
1680 26d3fd2f Michael Hanselmann
      assert self.retries == 7
1681 26d3fd2f Michael Hanselmann
      self.retries = 0
1682 26d3fd2f Michael Hanselmann
      self.done_lock_before_blocking = True
1683 26d3fd2f Michael Hanselmann
      return
1684 26d3fd2f Michael Hanselmann
1685 26d3fd2f Michael Hanselmann
    if self.retries > 0:
1686 26d3fd2f Michael Hanselmann
      self.assert_(timeout is not None)
1687 26d3fd2f Michael Hanselmann
      self.retries -= 1
1688 26d3fd2f Michael Hanselmann
      self.gave_lock = False
1689 26d3fd2f Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
1690 26d3fd2f Michael Hanselmann
1691 26d3fd2f Michael Hanselmann
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1692 26d3fd2f Michael Hanselmann
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1693 26d3fd2f Michael Hanselmann
      assert not ts.timeouts
1694 26d3fd2f Michael Hanselmann
      self.assert_(timeout is None)
1695 26d3fd2f Michael Hanselmann
1696 26d3fd2f Michael Hanselmann
  def _AfterStart(self, op, cbs):
1697 26d3fd2f Michael Hanselmann
    job = self.job
1698 26d3fd2f Michael Hanselmann
1699 5fd6b694 Michael Hanselmann
    # Setting to "running" requires an update
1700 ebb2a2a3 Michael Hanselmann
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1701 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1702 5fd6b694 Michael Hanselmann
1703 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
1704 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1705 26d3fd2f Michael Hanselmann
1706 26d3fd2f Michael Hanselmann
    # Job is running, cancelling shouldn't be possible
1707 26d3fd2f Michael Hanselmann
    (success, _) = job.Cancel()
1708 26d3fd2f Michael Hanselmann
    self.assertFalse(success)
1709 26d3fd2f Michael Hanselmann
1710 26d3fd2f Michael Hanselmann
  def _NextOpcode(self):
1711 26d3fd2f Michael Hanselmann
    self.curop = self.opcounter.next()
1712 26d3fd2f Michael Hanselmann
    self.prev_prio = self.job.ops[self.curop].priority
1713 5fd6b694 Michael Hanselmann
    self.prev_status = self.job.ops[self.curop].status
1714 26d3fd2f Michael Hanselmann
1715 26d3fd2f Michael Hanselmann
  def _NewTimeoutStrategy(self):
1716 26d3fd2f Michael Hanselmann
    job = self.job
1717 26d3fd2f Michael Hanselmann
1718 26d3fd2f Michael Hanselmann
    self.assertEqual(self.retries, 0)
1719 26d3fd2f Michael Hanselmann
1720 26d3fd2f Michael Hanselmann
    if self.prev_tsop == self.curop:
1721 26d3fd2f Michael Hanselmann
      # Still on the same opcode, priority must've been increased
1722 26d3fd2f Michael Hanselmann
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1723 26d3fd2f Michael Hanselmann
1724 26d3fd2f Michael Hanselmann
    if self.curop == 1:
1725 26d3fd2f Michael Hanselmann
      # Normal retry
1726 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
1727 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts) - 1
1728 26d3fd2f Michael Hanselmann
1729 26d3fd2f Michael Hanselmann
    elif self.curop == 2:
1730 26d3fd2f Michael Hanselmann
      # Let this run into a blocking acquire
1731 26d3fd2f Michael Hanselmann
      timeouts = range(11, 61, 12)
1732 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1733 26d3fd2f Michael Hanselmann
1734 26d3fd2f Michael Hanselmann
    elif self.curop == 3:
1735 26d3fd2f Michael Hanselmann
      # Wait for priority to increase, but give lock before blocking acquire
1736 26d3fd2f Michael Hanselmann
      timeouts = range(12, 100, 14)
1737 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1738 26d3fd2f Michael Hanselmann
1739 26d3fd2f Michael Hanselmann
      self.assertFalse(self.done_lock_before_blocking)
1740 26d3fd2f Michael Hanselmann
1741 26d3fd2f Michael Hanselmann
    elif self.curop == 4:
1742 26d3fd2f Michael Hanselmann
      self.assert_(self.done_lock_before_blocking)
1743 26d3fd2f Michael Hanselmann
1744 26d3fd2f Michael Hanselmann
      # Timeouts, but no need to retry
1745 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
1746 26d3fd2f Michael Hanselmann
      self.retries = 0
1747 26d3fd2f Michael Hanselmann
1748 26d3fd2f Michael Hanselmann
    elif self.curop == 5:
1749 26d3fd2f Michael Hanselmann
      # Normal retry
1750 26d3fd2f Michael Hanselmann
      timeouts = range(19, 100, 11)
1751 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1752 26d3fd2f Michael Hanselmann
1753 26d3fd2f Michael Hanselmann
    else:
1754 26d3fd2f Michael Hanselmann
      timeouts = []
1755 26d3fd2f Michael Hanselmann
      self.retries = 0
1756 26d3fd2f Michael Hanselmann
1757 26d3fd2f Michael Hanselmann
    assert len(job.ops) == 10
1758 26d3fd2f Michael Hanselmann
    assert self.retries <= len(timeouts)
1759 26d3fd2f Michael Hanselmann
1760 26d3fd2f Michael Hanselmann
    ts = _FakeTimeoutStrategy(timeouts)
1761 26d3fd2f Michael Hanselmann
1762 26d3fd2f Michael Hanselmann
    self.timeout_strategy = ts
1763 26d3fd2f Michael Hanselmann
    self.prev_tsop = self.curop
1764 26d3fd2f Michael Hanselmann
    self.prev_prio = job.ops[self.curop].priority
1765 26d3fd2f Michael Hanselmann
1766 26d3fd2f Michael Hanselmann
    return ts
1767 26d3fd2f Michael Hanselmann
1768 26d3fd2f Michael Hanselmann
  def testTimeout(self):
1769 26d3fd2f Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1770 26d3fd2f Michael Hanselmann
           for i in range(10)]
1771 26d3fd2f Michael Hanselmann
1772 26d3fd2f Michael Hanselmann
    # Create job
1773 26d3fd2f Michael Hanselmann
    job_id = 15801
1774 26d3fd2f Michael Hanselmann
    job = self._CreateJob(self.queue, job_id, ops)
1775 26d3fd2f Michael Hanselmann
    self.job = job
1776 26d3fd2f Michael Hanselmann
1777 26d3fd2f Michael Hanselmann
    self.opcounter = itertools.count(0)
1778 26d3fd2f Michael Hanselmann
1779 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1780 ebb2a2a3 Michael Hanselmann
                                    self._AfterStart)
1781 26d3fd2f Michael Hanselmann
    tsf = self._NewTimeoutStrategy
1782 26d3fd2f Michael Hanselmann
1783 26d3fd2f Michael Hanselmann
    self.assertFalse(self.done_lock_before_blocking)
1784 26d3fd2f Michael Hanselmann
1785 5fd6b694 Michael Hanselmann
    while True:
1786 26d3fd2f Michael Hanselmann
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1787 26d3fd2f Michael Hanselmann
                                  _timeout_strategy_factory=tsf)
1788 26d3fd2f Michael Hanselmann
1789 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1790 5fd6b694 Michael Hanselmann
1791 5fd6b694 Michael Hanselmann
      if self.curop is not None:
1792 5fd6b694 Michael Hanselmann
        self.prev_status = self.job.ops[self.curop].status
1793 5fd6b694 Michael Hanselmann
1794 5fd6b694 Michael Hanselmann
      self.lock_acq_prio = None
1795 5fd6b694 Michael Hanselmann
1796 26d3fd2f Michael Hanselmann
      result = proc(_nextop_fn=self._NextOpcode)
1797 5fd6b694 Michael Hanselmann
      assert self.curop is not None
1798 5fd6b694 Michael Hanselmann
1799 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
1800 5fd6b694 Michael Hanselmann
        # Got lock and/or job is done, result must've been written
1801 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1802 5fd6b694 Michael Hanselmann
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1803 5fd6b694 Michael Hanselmann
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
1804 5fd6b694 Michael Hanselmann
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1805 5fd6b694 Michael Hanselmann
        self.assert_(job.ops[self.curop].exec_timestamp)
1806 5fd6b694 Michael Hanselmann
1807 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1808 26d3fd2f Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1809 26d3fd2f Michael Hanselmann
        break
1810 26d3fd2f Michael Hanselmann
1811 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1812 26d3fd2f Michael Hanselmann
1813 5fd6b694 Michael Hanselmann
      if self.curop == 0:
1814 5fd6b694 Michael Hanselmann
        self.assertEqual(job.ops[self.curop].start_timestamp,
1815 5fd6b694 Michael Hanselmann
                         job.start_timestamp)
1816 5fd6b694 Michael Hanselmann
1817 26d3fd2f Michael Hanselmann
      if self.gave_lock:
1818 5fd6b694 Michael Hanselmann
        # Opcode finished, but job not yet done
1819 5fd6b694 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1820 26d3fd2f Michael Hanselmann
      else:
1821 5fd6b694 Michael Hanselmann
        # Did not get locks
1822 26d3fd2f Michael Hanselmann
        self.assert_(job.cur_opctx)
1823 26d3fd2f Michael Hanselmann
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1824 26d3fd2f Michael Hanselmann
                         self.timeout_strategy.NextAttempt)
1825 5fd6b694 Michael Hanselmann
        self.assertFalse(job.ops[self.curop].exec_timestamp)
1826 5fd6b694 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1827 5fd6b694 Michael Hanselmann
1828 5fd6b694 Michael Hanselmann
        # If priority has changed since acquiring locks, the job must've been
1829 5fd6b694 Michael Hanselmann
        # updated
1830 5fd6b694 Michael Hanselmann
        if self.lock_acq_prio != job.ops[self.curop].priority:
1831 5fd6b694 Michael Hanselmann
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1832 5fd6b694 Michael Hanselmann
1833 5fd6b694 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1834 26d3fd2f Michael Hanselmann
1835 26d3fd2f Michael Hanselmann
      self.assert_(job.start_timestamp)
1836 26d3fd2f Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1837 26d3fd2f Michael Hanselmann
1838 26d3fd2f Michael Hanselmann
    self.assertEqual(self.curop, len(job.ops) - 1)
1839 26d3fd2f Michael Hanselmann
    self.assertEqual(self.job, job)
1840 26d3fd2f Michael Hanselmann
    self.assertEqual(self.opcounter.next(), len(job.ops))
1841 26d3fd2f Michael Hanselmann
    self.assert_(self.done_lock_before_blocking)
1842 26d3fd2f Michael Hanselmann
1843 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1844 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1845 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1846 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1847 26d3fd2f Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1848 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1849 26d3fd2f Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1850 26d3fd2f Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1851 26d3fd2f Michael Hanselmann
                            for op in job.ops))
1852 26d3fd2f Michael Hanselmann
1853 66bd7445 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1854 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
1855 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1856 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1857 26d3fd2f Michael Hanselmann
1858 26d3fd2f Michael Hanselmann
1859 b95479a5 Michael Hanselmann
class TestJobDependencyManager(unittest.TestCase):
1860 b95479a5 Michael Hanselmann
  class _FakeJob:
1861 b95479a5 Michael Hanselmann
    def __init__(self, job_id):
1862 b95479a5 Michael Hanselmann
      self.id = str(job_id)
1863 b95479a5 Michael Hanselmann
1864 b95479a5 Michael Hanselmann
  def setUp(self):
1865 b95479a5 Michael Hanselmann
    self._status = []
1866 b95479a5 Michael Hanselmann
    self._queue = []
1867 b95479a5 Michael Hanselmann
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
1868 b95479a5 Michael Hanselmann
1869 b95479a5 Michael Hanselmann
  def _GetStatus(self, job_id):
1870 b95479a5 Michael Hanselmann
    (exp_job_id, result) = self._status.pop(0)
1871 b95479a5 Michael Hanselmann
    self.assertEqual(exp_job_id, job_id)
1872 b95479a5 Michael Hanselmann
    return result
1873 b95479a5 Michael Hanselmann
1874 b95479a5 Michael Hanselmann
  def _Enqueue(self, jobs):
1875 b95479a5 Michael Hanselmann
    self._queue.append(jobs)
1876 b95479a5 Michael Hanselmann
1877 b95479a5 Michael Hanselmann
  def testNotFinalizedThenCancel(self):
1878 b95479a5 Michael Hanselmann
    job = self._FakeJob(17697)
1879 b95479a5 Michael Hanselmann
    job_id = str(28625)
1880 b95479a5 Michael Hanselmann
1881 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1882 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1883 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
1884 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1885 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1886 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
1887 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
1888 b95479a5 Michael Hanselmann
      job_id: set([job]),
1889 b95479a5 Michael Hanselmann
      })
1890 b95479a5 Michael Hanselmann
1891 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1892 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1893 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CANCEL)
1894 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1895 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1896 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
1897 b95479a5 Michael Hanselmann
1898 b95479a5 Michael Hanselmann
  def testRequireCancel(self):
1899 b95479a5 Michael Hanselmann
    job = self._FakeJob(5278)
1900 b95479a5 Michael Hanselmann
    job_id = str(9610)
1901 b95479a5 Michael Hanselmann
    dep_status = [constants.JOB_STATUS_CANCELED]
1902 b95479a5 Michael Hanselmann
1903 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1904 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1905 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
1906 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1907 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1908 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
1909 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
1910 b95479a5 Michael Hanselmann
      job_id: set([job]),
1911 b95479a5 Michael Hanselmann
      })
1912 b95479a5 Michael Hanselmann
1913 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1914 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1915 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
1916 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1917 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1918 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
1919 b95479a5 Michael Hanselmann
1920 b95479a5 Michael Hanselmann
  def testRequireError(self):
1921 b95479a5 Michael Hanselmann
    job = self._FakeJob(21459)
1922 b95479a5 Michael Hanselmann
    job_id = str(25519)
1923 b95479a5 Michael Hanselmann
    dep_status = [constants.JOB_STATUS_ERROR]
1924 b95479a5 Michael Hanselmann
1925 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1926 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1927 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
1928 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1929 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1930 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
1931 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
1932 b95479a5 Michael Hanselmann
      job_id: set([job]),
1933 b95479a5 Michael Hanselmann
      })
1934 b95479a5 Michael Hanselmann
1935 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
1936 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1937 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
1938 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1939 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1940 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
1941 b95479a5 Michael Hanselmann
1942 b95479a5 Michael Hanselmann
  def testRequireMultiple(self):
1943 b95479a5 Michael Hanselmann
    dep_status = list(constants.JOBS_FINALIZED)
1944 b95479a5 Michael Hanselmann
1945 b95479a5 Michael Hanselmann
    for end_status in dep_status:
1946 b95479a5 Michael Hanselmann
      job = self._FakeJob(21343)
1947 b95479a5 Michael Hanselmann
      job_id = str(14609)
1948 b95479a5 Michael Hanselmann
1949 b95479a5 Michael Hanselmann
      self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1950 b95479a5 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1951 b95479a5 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
1952 b95479a5 Michael Hanselmann
      self.assertFalse(self._status)
1953 b95479a5 Michael Hanselmann
      self.assertFalse(self._queue)
1954 b95479a5 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
1955 b95479a5 Michael Hanselmann
      self.assertEqual(self.jdm._waiters, {
1956 b95479a5 Michael Hanselmann
        job_id: set([job]),
1957 b95479a5 Michael Hanselmann
        })
1958 b95479a5 Michael Hanselmann
1959 b95479a5 Michael Hanselmann
      self._status.append((job_id, end_status))
1960 b95479a5 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1961 b95479a5 Michael Hanselmann
      self.assertEqual(result, self.jdm.CONTINUE)
1962 b95479a5 Michael Hanselmann
      self.assertFalse(self._status)
1963 b95479a5 Michael Hanselmann
      self.assertFalse(self._queue)
1964 b95479a5 Michael Hanselmann
      self.assertFalse(self.jdm.JobWaiting(job))
1965 b95479a5 Michael Hanselmann
1966 b95479a5 Michael Hanselmann
  def testNotify(self):
1967 b95479a5 Michael Hanselmann
    job = self._FakeJob(8227)
1968 b95479a5 Michael Hanselmann
    job_id = str(4113)
1969 b95479a5 Michael Hanselmann
1970 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1971 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1972 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
1973 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1974 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1975 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
1976 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
1977 b95479a5 Michael Hanselmann
      job_id: set([job]),
1978 b95479a5 Michael Hanselmann
      })
1979 b95479a5 Michael Hanselmann
1980 b95479a5 Michael Hanselmann
    self.jdm.NotifyWaiters(job_id)
1981 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1982 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm._waiters)
1983 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
1984 b95479a5 Michael Hanselmann
    self.assertEqual(self._queue, [set([job])])
1985 b95479a5 Michael Hanselmann
1986 b95479a5 Michael Hanselmann
  def testWrongStatus(self):
1987 b95479a5 Michael Hanselmann
    job = self._FakeJob(10102)
1988 b95479a5 Michael Hanselmann
    job_id = str(1271)
1989 b95479a5 Michael Hanselmann
1990 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
1991 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
1992 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
1993 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
1994 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1995 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1996 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
1997 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
1998 b95479a5 Michael Hanselmann
      job_id: set([job]),
1999 b95479a5 Michael Hanselmann
      })
2000 b95479a5 Michael Hanselmann
2001 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2002 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2003 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2004 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WRONGSTATUS)
2005 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2006 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2007 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2008 b95479a5 Michael Hanselmann
2009 b95479a5 Michael Hanselmann
  def testCorrectStatus(self):
2010 b95479a5 Michael Hanselmann
    job = self._FakeJob(24273)
2011 b95479a5 Michael Hanselmann
    job_id = str(23885)
2012 b95479a5 Michael Hanselmann
2013 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2014 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2015 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2016 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2017 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2018 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2019 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2020 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2021 b95479a5 Michael Hanselmann
      job_id: set([job]),
2022 b95479a5 Michael Hanselmann
      })
2023 b95479a5 Michael Hanselmann
2024 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2025 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2026 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2027 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2028 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2029 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2030 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2031 b95479a5 Michael Hanselmann
2032 b95479a5 Michael Hanselmann
  def testFinalizedRightAway(self):
2033 b95479a5 Michael Hanselmann
    job = self._FakeJob(224)
2034 b95479a5 Michael Hanselmann
    job_id = str(3081)
2035 b95479a5 Michael Hanselmann
2036 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2037 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2038 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2039 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2040 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2041 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2042 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2043 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2044 b95479a5 Michael Hanselmann
      job_id: set(),
2045 b95479a5 Michael Hanselmann
      })
2046 b95479a5 Michael Hanselmann
2047 b95479a5 Michael Hanselmann
    # Force cleanup
2048 b95479a5 Michael Hanselmann
    self.jdm.NotifyWaiters("0")
2049 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm._waiters)
2050 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2051 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2052 b95479a5 Michael Hanselmann
2053 b95479a5 Michael Hanselmann
  def testSelfDependency(self):
2054 b95479a5 Michael Hanselmann
    job = self._FakeJob(18937)
2055 b95479a5 Michael Hanselmann
2056 b95479a5 Michael Hanselmann
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2057 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2058 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.ERROR)
2059 b95479a5 Michael Hanselmann
2060 b95479a5 Michael Hanselmann
  def testJobDisappears(self):
2061 b95479a5 Michael Hanselmann
    job = self._FakeJob(30540)
2062 b95479a5 Michael Hanselmann
    job_id = str(23769)
2063 b95479a5 Michael Hanselmann
2064 b95479a5 Michael Hanselmann
    def _FakeStatus(_):
2065 b95479a5 Michael Hanselmann
      raise errors.JobLost("#msg#")
2066 b95479a5 Michael Hanselmann
2067 b95479a5 Michael Hanselmann
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2068 b95479a5 Michael Hanselmann
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2069 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.ERROR)
2070 b95479a5 Michael Hanselmann
    self.assertFalse(jdm.JobWaiting(job))
2071 b95479a5 Michael Hanselmann
2072 b95479a5 Michael Hanselmann
2073 989a8bee Michael Hanselmann
if __name__ == "__main__":
2074 989a8bee Michael Hanselmann
  testutils.GanetiTestProgram()